Parallel Tasks

rayon

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

Simple work-stealing parallelism for Rust using rayon.

rayon makes it easy to write parallel code. It provides data parallelism through iterators and task parallelism through scoped threads, allowing developers to convert sequential code to parallel versions with minimal changes. Rayon manages the thread pool and workload distribution, simplifying parallel programming and improving performance on multi-core processors.

Iterate in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

Convert calls to iter or iter_mut or into_iter into par_iter or par_iter_mut or into_par_iter to execute in parallel.

use rayon::prelude::*;

fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter().map(|i| i * i).sum()
}

fn increment_all(input: &mut [i32]) {
    input.par_iter_mut().for_each(|p| *p += 1);
}

fn main() {
    let mut v = [1, 2, 3];
    increment_all(&mut v[..]);
    println!("{}", sum_of_squares(&v[..]));
}

Sort in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

rayon simplifies parallel sorting in Rust by providing parallel iterators and functions that can be used to sort collections concurrently.

use rayon::prelude::*;

fn main() {
    let mut v = [-5, 4, 1, -3, 2];
    v.par_sort();
    println!("{:#?}", v);
}

Implement Custom Parallel Tasks

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

rayon implements rayon::join⮳, rayon::join⮳, rayon::spawn⮳ that may run on the global or a custom Rayon threadpool⮳.

fn main() {
    // Build the threadpool
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();
    // `install` executes the closure within the threadpool. Any attempts
    // to use join, scope, or parallel iterators will then operate
    // within that threadpool.
    let n = pool.install(|| fib(20));
    println!("{}", n);
}

fn fib(n: usize) -> usize {
    if n == 0 || n == 1 {
        return n;
    }
    // Conceptually, calling join() is similar to spawning two threads,
    // one executing each of the two closures.
    let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
    a + b
}

Mutate the Elements of an Array in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

The example uses the rayon⮳ crate, which is a data parallelism library for Rust. rayon⮳ provides the rayon::iter::IntoParallelRefIterator::par_iter_mut⮳ method for any parallel iterable data type. This is an iterator-like chain that potentially executes in parallel.

use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

Test in Parallel if Any or All Elements of a Collection Match a Given Predicate

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example demonstrates using the rayon::iter::ParallelIterator::any⮳ and rayon::iter::ParallelIterator::any⮳ methods, which are parallelized counterparts to std::iter::Iterator::any⮳ and std::iter::Iterator::all⮳. rayon::iter::ParallelIterator::any⮳ checks in parallel whether any element of the iterator matches the predicate, and returns as soon as one is found. rayon::iter::ParallelIterator::any⮳ checks in parallel whether all elements of the iterator match the predicate, and returns as soon as a non-matching element is found.

use rayon::prelude::*;

fn main() {
    let mut vec = vec![2, 4, 6, 8];

    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8));
    assert!(vec.par_iter().all(|n| *n <= 8));

    vec.push(9);

    assert!(vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(!vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(vec.par_iter().any(|n| *n > 8));
    assert!(!vec.par_iter().all(|n| *n <= 8));

    println!("{:?}", vec);
}

Search Items Using a Given Predicate in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example uses rayon::iter::ParallelIterator::find_any⮳ and rayon::iter::ParallelIterator::find_any⮳ to search a vector in parallel for an element satisfying the predicate in the given closure.

If there are multiple elements satisfying the predicate defined in the closure argument of rayon::iter::ParallelIterator::find_anyrayon⮳ returns the first one found, not necessarily the first one.

Also note that the argument to the closure is a reference to a reference (&&x). See the discussion on std::iter::Iterator::find⮳ for additional details.

use rayon::prelude::*;

fn main() {
    let v = vec![6, 2, 1, 9, 3, 8, 11];

    let f1 = v.par_iter().find_any(|&&x| x == 9);
    let f2 = v.par_iter().find_any(|&&x| x % 2 == 0 && x > 6);
    let f3 = v.par_iter().find_any(|&&x| x > 8);

    assert_eq!(f1, Some(&9));
    assert_eq!(f2, Some(&8));
    assert!(f3 > Some(&8));
    println!("{:?}", v);
}

Sort a Vector in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example will sort in parallel a vector of Strings.

Allocate a vector of empty Strings. par_iter_mut().for_each populates random values in parallel. Although multiple options⮳ exist to sort an enumerable data type, rayon::slice::ParallelSliceMut::par_sort_unstable⮳ is usually faster than stable sort ⮳ algorithms.

use rand::Rng;
use rand::distr::Alphanumeric;
use rand::rng;
use rayon::prelude::*;

fn main() {
    let mut vec = vec![String::new(); 100];

    vec.par_iter_mut().for_each(|p| {
        let mut rng = rng();
        *p = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect();
    });
    vec.par_sort_unstable();
    println!("{:?}", vec);
}

Map-reduce in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency

This example uses rayon::iter::ParallelIterator::filterrayon::iter::ParallelIterator::map⮳ and rayon::iter::ParallelIterator::reduce⮳ to calculate the average age of Person objects whose age is over 30.

rayon::iter::ParallelIterator::filter⮳ returns elements from a collection that satisfy the given predicate. rayon::iter::ParallelIterator::map⮳ performs an operation on every element, creating a new iteration, and rayon::iter::ParallelIterator::reduce⮳ performs an operation given the previous reduction and the current element. Also shows use of rayon::iter::ParallelIterator::sum⮳ which has the same result as the reduce operation in this example.

use rayon::prelude::*;

struct Person {
    age: u32,
}

fn main() {
    let v: Vec<Person> = vec![
        Person { age: 23 },
        Person { age: 19 },
        Person { age: 42 },
        Person { age: 17 },
        Person { age: 17 },
        Person { age: 31 },
        Person { age: 30 },
    ];

    let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
    let sum_over_30 = v
        .par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .reduce(|| 0, |x, y| x + y);

    let alt_sum_30: u32 = v.par_iter().map(|x| x.age).filter(|&x| x > 30).sum();

    let avg_over_30 = sum_over_30 as f32 / num_over_30;
    let alt_avg_over_30 = alt_sum_30 as f32 / num_over_30;

    assert!((avg_over_30 - alt_avg_over_30).abs() < f32::EPSILON);
    println!("The average age of people older than 30 is {}", avg_over_30);
}

Generate JPEG Thumbnails in Parallel

rayon rayon-crates.io rayon-github rayon-lib.rs cat-concurrency glob image cat-concurrency cat-filesystem

This example generates thumbnails for all jpg files in the current directory then saves them in a new folder called thumbnails.

glob::glob_with::glob_with⮳ finds jpeg files in current directory. rayon resizes images in parallel using rayon::iter::IntoParallelRefIterator::par_iter⮳ calling image::DynamicImage::resize

use std::fs::create_dir_all;
use std::path::Path;

use anyhow::Context;
use anyhow::Result;
use glob::MatchOptions;
use glob::glob_with;
use image::imageops::FilterType;
use rayon::prelude::*;

fn main() -> Result<()> {
    let options: MatchOptions = Default::default();
    let files: Vec<_> = glob_with("*.jpg", options)?
        .filter_map(|x| x.ok())
        .collect();

    if files.is_empty() {
        println!("No .jpg files found in current directory");
        return Ok(());
    }

    let thumb_dir = "thumbnails";
    create_dir_all(thumb_dir)?;

    println!("Saving {} thumbnails into '{}'...", files.len(), thumb_dir);

    let image_failures: Vec<_> = files
        .par_iter()
        .map(|path| {
            make_thumbnail(path, thumb_dir, 300)
                .with_context(|| path.display().to_string())
        })
        .filter_map(|x| x.err())
        .collect();

    image_failures.iter().for_each(|x| println!("{}", x));

    println!(
        "{} thumbnails saved successfully",
        files.len() - image_failures.len()
    );
    Ok(())
}

fn make_thumbnail<PA, PB>(
    original: PA,
    thumb_dir: PB,
    longest_edge: u32,
) -> Result<()>
where
    PA: AsRef<Path>,
    PB: AsRef<Path>,
{
    let img = image::open(original.as_ref())?;
    let file_path = thumb_dir.as_ref().join(original);

    Ok(img
        .resize(longest_edge, longest_edge, FilterType::Nearest)
        .save(file_path)?)
}