Parallel Tasks

rayon

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

Simple work-stealing parallelism for Rust using rayon.

rayon makes it easy to write parallel code. It provides data parallelism through iterators and task parallelism through scoped threads, allowing developers to convert sequential code to parallel versions with minimal changes. Rayon manages the thread pool and workload distribution, simplifying parallel programming and improving performance on multi-core processors.

Iterate in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

Convert calls to iter or iter_mut or into_iter into par_iter or par_iter_mut or into_par_iter to execute in parallel.

use rayon::prelude::*;

/// Calculates the sum of the squares of the elements in the input slice.
///
/// This function uses Rayon's parallel iterator to efficiently compute the sum
/// of squares across multiple threads.
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter().map(|i| i * i).sum()
}

fn increment_all(input: &mut [i32]) {
    input.par_iter_mut().for_each(|p| *p += 1);
}

fn main() {
    let mut v = [1, 2, 3];
    increment_all(&mut v[..]);
    println!("{}", sum_of_squares(&v[..]));
}

Sort in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

rayon simplifies parallel sorting in Rust by providing parallel iterators and functions that can be used to sort collections concurrently.

use rayon::prelude::*;

/// Sorts the vector in parallel using Rayon's `par_sort` method.
fn main() {
    let mut v = [-5, 4, 1, -3, 2];
    v.par_sort();
    println!("{:#?}", v);
}

Implement Custom Parallel Tasks

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

rayon implements rayon::join⮳, rayon::join⮳, rayon::spawn⮳ that may run on the global or a custom Rayon threadpool⮳.

fn main() {
    // Build the threadpool
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();
    // `install` executes the closure within the threadpool. Any attempts
    // to use join, scope, or parallel iterators will then operate
    // within that threadpool.
    let n = pool.install(|| fib(20));
    println!("{}", n);
}

/// Calculates the nth fibonacci number
fn fib(n: usize) -> usize {
    if n == 0 || n == 1 {
        return n;
    }
    // Conceptually, calling join() is similar to spawning two threads,
    // one executing each of the two closures.
    let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
    a + b
}

Mutate the Elements of an Array in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

The example uses the rayon⮳ crate, which is a data parallelism library for Rust. rayon⮳ provides the rayon::iter::IntoParallelRefMutIterator::par_iter_mut⮳ method for any parallel iterable data type. This is an iterator-like chain that potentially executes in parallel.

use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];

    // `par_iter_mut()` creates a parallel iterator over mutable references to
    // the elements of the array. `for_each()` applies a closure to each
    // element in parallel. In this case, the closure decrements each
    // element by 1. The order of execution is not guaranteed, but all
    // elements will be processed.
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

Test in Parallel if Any or All Elements of a Collection Match a Given Predicate

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example demonstrates using the rayon::iter::ParallelIterator::any⮳ and rayon::iter::ParallelIterator::any⮳ methods, which are parallelized counterparts to std::iter::Iterator::any⮳ and std::iter::Iterator::all⮳. rayon::iter::ParallelIterator::any⮳ checks in parallel whether any element of the iterator matches the predicate, and returns as soon as one is found. rayon::iter::ParallelIterator::any⮳ checks in parallel whether all elements of the iterator match the predicate, and returns as soon as a non-matching element is found.

use rayon::prelude::*;

/// This example demonstrates the use of `any` and `all` methods on a parallel
/// iterator.
fn main() {
    let mut vec = vec![2, 4, 6, 8];

    // Check if any element is odd (should be false) and if all elements are
    // even (should be true). Also check if any element is greater than 8
    // (should be false) and if all elements are less than or equal to 8 (should
    // be true).
    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8));
    assert!(vec.par_iter().all(|n| *n <= 8));

    vec.push(9);

    assert!(vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(!vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(vec.par_iter().any(|n| *n > 8));
    assert!(!vec.par_iter().all(|n| *n <= 8));

    println!("{:?}", vec);
}

Search Items Using a Given Predicate in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example uses rayon::iter::ParallelIterator::find_any⮳ and rayon::iter::ParallelIterator::find_any⮳ to search a vector in parallel for an element satisfying the predicate in the given closure.

If there are multiple elements satisfying the predicate defined in the closure argument of rayon::iter::ParallelIterator::find_anyrayon⮳ returns the first one found, not necessarily the first one.

Also note that the argument to the closure is a reference to a reference (&&x). See the discussion on std::iter::Iterator::find⮳ for additional details.

use rayon::prelude::*;

/// Demonstrates parallel searching using Rayon's `par_iter().find_any()`.
fn main() {
    let v = vec![6, 2, 1, 9, 3, 8, 11];

    // Search for specific elements in parallel.
    let f1 = v.par_iter().find_any(|&&x| x == 9);
    let f2 = v.par_iter().find_any(|&&x| x % 2 == 0 && x > 6);
    let f3 = v.par_iter().find_any(|&&x| x > 8);

    assert_eq!(f1, Some(&9));
    assert_eq!(f2, Some(&8));
    assert!(f3 > Some(&8));
    println!("{:?}", v);
}

Sort a Vector in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example will sort in parallel a vector of Strings.

Allocate a vector of empty Strings. par_iter_mut().for_each populates random values in parallel. Although multiple options⮳ exist to sort an enumerable data type, rayon::slice::ParallelSliceMut::par_sort_unstable⮳ is usually faster than stable sort ⮳ algorithms.

use rand::Rng;
use rand::distr::Alphanumeric;
use rand::rng;
use rayon::prelude::*;

/// This example demonstrates parallel sorting of a vector of strings using
/// Rayon.
fn main() {
    let mut vec = vec![String::new(); 100];

    // Generate random strings in parallel and populate the vector.
    vec.par_iter_mut().for_each(|p| {
        // Create a thread-local random number generator.
        let mut rng = rng();
        *p = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect();
    });
    vec.par_sort_unstable();
    println!("{:?}", vec);
}

Map-reduce in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example uses rayon::iter::ParallelIterator::filterrayon::iter::ParallelIterator::map⮳ and rayon::iter::ParallelIterator::reduce⮳ to calculate the average age of Person objects whose age is over 30.

rayon::iter::ParallelIterator::filter⮳ returns elements from a collection that satisfy the given predicate. rayon::iter::ParallelIterator::map⮳ performs an operation on every element, creating a new iteration, and rayon::iter::ParallelIterator::reduce⮳ performs an operation given the previous reduction and the current element. Also shows use of rayon::iter::ParallelIterator::sum⮳ which has the same result as the reduce operation in this example.

use rayon::prelude::*;

struct Person {
    age: u32,
}

fn main() {
    // Create a vector of `Person` instances.
    let v: Vec<Person> = vec![
        Person { age: 23 },
        Person { age: 19 },
        Person { age: 42 },
        Person { age: 17 },
        Person { age: 17 },
        Person { age: 31 },
        Person { age: 30 },
    ];
    // Calculate the number of people over 30.
    let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;

    // Calculate the sum of ages of people over 30.
    let sum_over_30 = v
        .par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .reduce(|| 0, |x, y| x + y);

    // Alternative method:
    let alt_sum_30: u32 = v.par_iter().map(|x| x.age).filter(|&x| x > 30).sum();

    // Calculate the average age of people over 30.
    let avg_over_30 = sum_over_30 as f32 / num_over_30;
    let alt_avg_over_30 = alt_sum_30 as f32 / num_over_30;

    assert!((avg_over_30 - alt_avg_over_30).abs() < f32::EPSILON);
    println!("The average age of people older than 30 is {}", avg_over_30);
}

Generate JPEG Thumbnails in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency glob image cat-concurrency cat-filesystem

This example generates thumbnails for all jpg files in the current directory then saves them in a new folder called thumbnails.

glob::glob_with::glob_with⮳ finds jpeg files in current directory. rayon resizes images in parallel using rayon::iter::IntoParallelRefIterator::par_iter⮳ calling image::DynamicImage::resize

use std::fs::File;
use std::fs::create_dir_all;
use std::io::Write;
use std::path::Path;

use anyhow::Context;
use anyhow::Result;
use glob::MatchOptions;
use glob::glob_with;
use image::imageops::FilterType;
use rayon::prelude::*;

fn main() -> Result<()> {
    // Create test files for the test function.
    create_test_files()?;

    let options: MatchOptions = Default::default();
    let files: Vec<_> = glob_with("*.jpg", options)?
        .filter_map(|x| x.ok())
        .collect();

    if files.is_empty() {
        println!("No .jpg files found in current directory");
        return Ok(());
    }

    let thumb_dir = "thumbnails";
    create_dir_all(thumb_dir)?;

    println!("Saving {} thumbnails into '{}'...", files.len(), thumb_dir);

    let image_failures: Vec<_> = files
        .par_iter()
        .map(|path| {
            make_thumbnail(path, thumb_dir, 300).with_context(|| {
                format!("Failed to create thumbnail for: {}", path.display())
            })
        })
        .filter_map(|x| x.err())
        .collect();

    image_failures.iter().for_each(|x| println!("{}", x));

    println!(
        "{} thumbnails saved successfully",
        files.len() - image_failures.len()
    );
    Ok(())
}

fn make_thumbnail<PA, PB>(
    original: PA,
    thumb_dir: PB,
    longest_edge: u32,
) -> Result<()>
where
    PA: AsRef<Path>,
    PB: AsRef<Path>,
{
    let img = image::open(original.as_ref())?;
    let file_path = thumb_dir
        .as_ref()
        .join(original.as_ref().file_name().unwrap());

    Ok(img
        .resize(longest_edge, longest_edge, FilterType::Nearest)
        .save(file_path)?)
}

fn create_test_files() -> Result<()> {
    // Create a directory for test files.
    let test_dir = Path::new("temp/test_images");
    if !test_dir.exists() {
        std::fs::create_dir_all(test_dir)?;
    }

    // Create a few dummy .jpg files.
    for i in 0..3 {
        let file_path = test_dir.join(format!("test_{}.jpg", i));
        let mut file = File::create(&file_path)?;
        // Write some dummy data to the file (not a real image).
        file.write_all(b"dummy image data")?;
    }
    Ok(())
}

#[test]
fn test() -> anyhow::Result<()> {
    main()?;
    Ok(())
}
// [review; rayon_thumbnails: address the need for test jpg data_parallelism: rayon_thumbnails.rs is noplayground - linking with cc](https://github.com/john-cd/rust_howto/issues/261)