Multithreading

Spawn, join

std cat-concurrency

use std::thread;
use std::time::Duration;

fn main() {
    let thread_one = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    let thread_two = thread::spawn(|| { /* ... */ });
    // More stufff

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

When the main thread of a Rust program completes, all spawned threads are shut down, whether or not they have finished running.

Scoped threads

std cat-concurrency

Scoped threads

use std::error::Error;
use std::path::Path;
use std::sync::mpsc;
use std::thread;

// Our error type needs to be `Send` to be used in a channel
fn read_contents<T: AsRef<Path>>(
    file: T,
) -> Result<String, Box<dyn Error + Send>> {
    Ok(file.as_ref().to_string_lossy().into_owned())
}

fn main() {
    // To share state between threads, consider using a channel
    let (tx, rx) = mpsc::channel();

    thread::scope(|scope| {
        // Creates a “fork-join” scope
        let tx2 = tx.clone();
        scope.spawn(move || {
            println!("hello from the first scoped thread");
            let contents = read_contents("foo.txt");
            tx.send(contents).unwrap();
        });
        scope.spawn(move || {
            println!("hello from the second scoped thread");
            let contents = read_contents("bar.txt");
            tx2.send(contents).unwrap();
        });
    });
    // No join.
    // Spawned threads get joined automatically once the scope ends!

    // Receive messages from the channel
    println!("hello from the main thread");

    for received in rx {
        println!("Got: {:?}", received);
    }
}

Rayon - parallel processing

rayon rayon-github cat-concurrency

Parallel iteration

Convert .iter() or iter_mut() or into_iter() into par_iter() or par_iter_mut() or into_par_iter() to execute in parallel.

use rayon::prelude::*;

fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter().map(|i| i * i).sum()
}

fn increment_all(input: &mut [i32]) {
    input.par_iter_mut().for_each(|p| *p += 1);
}

fn main() {
    let mut v = [1, 2, 3];
    increment_all(&mut v[..]);
    println!("{}", sum_of_squares(&v[..]));
}

Parallel sorting

cat-concurrency

use rayon::prelude::*;

fn main() {
    let mut v = [-5, 4, 1, -3, 2];
    v.par_sort();
    println!("{:#?}", v);
}

Custom parallel tasks

cat-concurrency

Rayon implements rayon::join⮳, rayon::join⮳, rayon::spawn⮳ that may run on the global or a custom Rayon threadpool⮳.

fn main() {
    // Build the threadpool
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();
    // `install` executes the closure within the threadpool. Any attempts
    // to use join, scope, or parallel iterators will then operate
    // within that threadpool.
    let n = pool.install(|| fib(20));
    println!("{}", n);
}

fn fib(n: usize) -> usize {
    if n == 0 || n == 1 {
        return n;
    }
    // Conceptually, calling join() is similar to spawning two threads,
    // one executing each of the two closures.
    let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
    a + b
}

See also

threadpool