Threads

Spawn a short-lived thread

crossbeam cat-concurrency

The example uses the crossbeam⮳ crate, which provides data structures and functions for concurrent and parallel programming. crossbeam::thread::Scope::spawn⮳ spawns a new scoped thread that is guaranteed to terminate before returning from the closure that passed into crossbeam::scope⮳ function, meaning that you can reference data from the calling function.

This example splits the array in half and performs the work in separate threads.

fn main() {
    let arr = &[1, 25, -4, 10];
    let max = find_max(arr);
    assert_eq!(max, Some(25));
}

fn find_max(arr: &[i32]) -> Option<i32> {
    const THRESHOLD: usize = 2;

    if arr.len() <= THRESHOLD {
        return arr.iter().cloned().max();
    }

    let mid = arr.len() / 2;
    let (left, right) = arr.split_at(mid);

    crossbeam::scope(|s| {
        let thread_l = s.spawn(|_| find_max(left));
        let thread_r = s.spawn(|_| find_max(right));

        let max_l = thread_l.join().unwrap()?;
        let max_r = thread_r.join().unwrap()?;

        Some(max_l.max(max_r))
    })
    .unwrap()
}

Create a parallel pipeline

crossbeam cat-concurrency

This example uses the crossbeam⮳ and crossbeam-channel⮳ crates to create a parallel pipeline, similar to that described in the ZeroMQ guide⮳. There is a data source and a data sink, with data being processed by two worker threads in parallel on its way from the source to the sink.

We use bounded channels with a capacity of one using crossbeam_channel::bounded⮳. The producer must be on its own thread because it produces messages faster than the workers can process them (since they sleep for half a second) - this means the producer blocks on the call to crossbeam_channel::Sender::send⮳ for half a second until one of the workers processes the data in the channel. Also note that the data in the channel is consumed by whichever worker calls receive first, so each message is delivered to a single worker rather than both workers.

Reading from the channels via the iterator crossbeam_channel::Receiver::iter⮳ method will block, either waiting for new messages or until the channel is closed. Because the channels were created within the crossbeam::scope⮳ we must manually close them via std::ops::Drop⮳ to prevent the entire program from blocking on the worker for-loops. You can think of the calls to std::ops::Drop⮳ as signaling that no more messages will be sent.

extern crate crossbeam;
extern crate crossbeam_channel;

use std::thread;
use std::time::Duration;

use crossbeam_channel::bounded;

fn main() {
    let (snd1, rcv1) = bounded(1);
    let (snd2, rcv2) = bounded(1);
    let n_msgs = 4;
    let n_workers = 2;

    crossbeam::scope(|s| {
        // Producer thread
        s.spawn(|_| {
            for i in 0..n_msgs {
                snd1.send(i).unwrap();
                println!("Source sent {}", i);
            }
            // Close the channel - this is necessary to exit
            // the for-loop in the worker
            drop(snd1);
        });

        // Parallel processing by 2 threads
        for _ in 0..n_workers {
            // Send to sink, receive from source
            let (sendr, recvr) = (snd2.clone(), rcv1.clone());
            // Spawn workers in separate threads
            s.spawn(move |_| {
                thread::sleep(Duration::from_millis(500));
                // Receive until channel closes
                for msg in recvr.iter() {
                    println!(
                        "Worker {:?} received {}.",
                        thread::current().id(),
                        msg
                    );
                    sendr.send(msg * 2).unwrap();
                }
            });
        }
        // Close the channel, otherwise sink will never
        // exit the for-loop
        drop(snd2);

        // Sink
        for msg in rcv2.iter() {
            println!("Sink received {}", msg);
        }
    })
    .unwrap();
}

Pass data between two threads

crossbeam cat-concurrency

This example demonstrates the use of crossbeam_channel⮳ in a single producer, single consumer (SPSC) setting. We build off the crossbeam spawn⮳ example by using crossbeam::scope⮳ and crossbeam::thread::Scope::spawn⮳ to manage the producer thread. Data is exchanged between the two threads using a crossbeam::scope⮳ channel, meaning there is no limit to the number of storable messages. The producer thread sleeps for half a second in between messages.

use std::thread;
use std::time;

use crossbeam_channel::unbounded;

fn main() {
    let (snd, rcv) = unbounded();
    let n_msgs = 5;
    crossbeam::scope(|s| {
        s.spawn(|_| {
            for i in 0..n_msgs {
                snd.send(i).unwrap();
                thread::sleep(time::Duration::from_millis(100));
            }
        });
    })
    .unwrap();
    for _ in 0..n_msgs {
        let msg = rcv.recv().unwrap();
        println!("Received {}", msg);
    }
}

Maintain global mutable state

lazy_static cat-rust-patterns cat-concurrency

Declare global state using lazy static. lazy static⮳ creates a globally available static ref which requires a std::sync::Mutex⮳ to allow mutation (also see std::sync::RwLock⮳). The std::sync::Mutex⮳ wrap ensures the state cannot be simultaneously accessed by multiple threads, preventing race conditions. A std::sync::MutexGuard⮳ must be acquired to read or mutate the value stored in a std::sync::Mutex⮳.

use std::sync::Mutex;

use anyhow::anyhow;
use anyhow::Result;
use lazy_static::lazy_static;

lazy_static! {
    static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
    let mut db = FRUIT
        .lock()
        .map_err(|_| anyhow!("Failed to acquire MutexGuard"))?;
    db.push(fruit.to_string());
    Ok(())
}

fn main() -> Result<()> {
    insert("apple")?;
    insert("orange")?;
    insert("peach")?;
    {
        let db = FRUIT
            .lock()
            .map_err(|_| anyhow!("Failed to acquire MutexGuard"))?;

        db.iter().enumerate().for_each(|(i, item)| {
            println!(
                "{}:
    {}",
                i, item
            )
        });
    }
    insert("grape")?;
    Ok(())
}

Calculate SHA256 sum of iso files concurrently

threadpool num_cpus walkdir ring cat-concurrencycat-filesystem

This example calculates the SHA256 for every file with iso extension in the current directory. A threadpool generates threads equal to the number of cores present in the system found with num_cpus::get⮳. walkdir::WalkDir::new⮳ iterates the current directory and calls walkdir::WalkDir::new⮳ to perform the operations of reading and computing SHA256 hash.

use std::fs::File;
use std::io::BufReader;
use std::io::Error;
use std::io::Read;
use std::path::Path;
use std::sync::mpsc::channel;

use ring::digest::Context;
use ring::digest::Digest;
use ring::digest::SHA256;
use threadpool::ThreadPool;
use walkdir::WalkDir;

// Verify the iso extension
fn is_iso(entry: &Path) -> bool {
    matches!(entry.extension(), Some(e) if e.to_string_lossy().to_lowercase() == "iso")
}

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
    let mut buf_reader = BufReader::new(File::open(&filepath)?);
    let mut context = Context::new(&SHA256);
    let mut buffer = [0; 1024];

    loop {
        let count = buf_reader.read(&mut buffer)?;
        if count == 0 {
            break;
        }
        context.update(&buffer[..count]);
    }

    Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
    let pool = ThreadPool::new(num_cpus::get());

    let (tx, rx) = channel();

    for entry in WalkDir::new("/home/user/Downloads")
        .follow_links(true)
        .into_iter()
        .filter_map(|e| e.ok())
        .filter(|e| !e.path().is_dir() && is_iso(e.path()))
    {
        let path = entry.path().to_owned();
        let tx = tx.clone();
        pool.execute(move || {
            let digest = compute_digest(path);
            tx.send(digest).expect("Could not send data!");
        });
    }

    drop(tx);
    for t in rx.iter() {
        let (sha, path) = t?;
        println!("{:?} {:?}", sha, path);
    }
    Ok(())
}

Draw fractal dispatching work to a thread pool

threadpool num num_cpus image cat-concurrency cat-science cat-rendering

This example generates an image by drawing a fractal from the Julia set⮳ with a thread pool for distributed computation.

julia-set

Allocate memory for output image of given width and height with image::ImageBuffer::new⮳. image::Rgb::from_channels⮳ calculates RGB pixel values. Create threadpool::ThreadPool⮳ with thread count equal to number of cores with num_cpus::get⮳. threadpool::ThreadPool::execute⮳ receives each pixel as a separate job.

std::sync::mpsc::channel⮳ receives the jobs and std::sync::mpsc::Receiver::recv⮳ retrieves them. image::ImageBuffer::put_pixel⮳ uses the data to set the pixel color. image::ImageBuffer::save⮳ writes the image to output.png.

use std::sync::mpsc::channel;

use anyhow::Result;
use image::ImageBuffer;
use image::Rgb;
use num::complex::Complex;
use threadpool::ThreadPool;

// Function converting intensity values to RGB
fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> {
    let wave = wavelength as f32;
    let (r, g, b) = match wavelength {
        380..=439 => ((440. - wave) / (440. - 380.), 0.0, 1.0),
        440..=489 => (0.0, (wave - 440.) / (490. - 440.), 1.0),
        490..=509 => (0.0, 1.0, (510. - wave) / (510. - 490.)),
        510..=579 => ((wave - 510.) / (580. - 510.), 1.0, 0.0),
        580..=644 => (1.0, (645. - wave) / (645. - 580.), 0.0),
        645..=780 => (1.0, 0.0, 0.0),
        _ => (0.0, 0.0, 0.0),
    };
    let factor = match wavelength {
        380..=419 => 0.3 + 0.7 * (wave - 380.) / (420. - 380.),
        701..=780 => 0.3 + 0.7 * (780. - wave) / (780. - 700.),
        _ => 1.0,
    };
    let (r, g, b) = (
        normalize(r, factor),
        normalize(g, factor),
        normalize(b, factor),
    );
    Rgb([r, g, b])
}
// Maps Julia set distance estimation to intensity values
fn julia(
    c: Complex<f32>,
    x: u32,
    y: u32,
    width: u32,
    height: u32,
    max_iter: u32,
) -> u32 {
    let width = width as f32;
    let height = height as f32;
    let mut z = Complex {
        // scale and translate the point to image coordinates
        re: 3.0 * (x as f32 - 0.5 * width) / width,
        im: 2.0 * (y as f32 - 0.5 * height) / height,
    };
    let mut i = 0;
    for t in 0..max_iter {
        if z.norm() >= 2.0 {
            break;
        }
        z = z * z + c;
        i = t;
    }
    i
}
// Normalizes color intensity values within RGB range
fn normalize(color: f32, factor: f32) -> u8 {
    ((color * factor).powf(0.8) * 255.) as u8
}

fn main() -> Result<()> {
    let (width, height) = (1920, 1080);
    let mut img = ImageBuffer::new(width, height);
    let iterations = 300;

    let c = Complex::new(-0.8, 0.156);

    let pool = ThreadPool::new(num_cpus::get());
    let (tx, rx) = channel();

    for y in 0..height {
        let tx = tx.clone();
        pool.execute(move || {
            for x in 0..width {
                let i = julia(c, x, y, width, height, iterations);
                let pixel = wavelength_to_rgb(380 + i * 400 / iterations);
                tx.send((x, y, pixel)).expect("Could not send data!");
            }
        });
    }

    for _ in 0..(width * height) {
        let (x, y, pixel) = rx.recv()?;
        img.put_pixel(x, y, pixel);
    }
    img.save("temp/output.png")?;
    Ok(())
}