Multithreading with the crossbeam
Crate
Recipe | Crates | Categories |
---|---|---|
Spawn a Short-lived Thread | ||
Create a Parallel Pipeline | ||
Pass Data Between Two Threads |
Spawn a Short-lived Thread
The example uses the crossbeam
⮳ crate, which provides data structures and functions for concurrent and parallel programming. crossbeam::thread::Scope::spawn
⮳ spawns a new scoped thread that is guaranteed to terminate before returning from the closure that passed into crossbeam::scope
⮳ function, meaning that you can reference data from the calling function.
This example splits the array in half and performs the work in separate threads.
/// Finds the maximum value in an array using `crossbeam` for concurrency. fn main() { let arr = &[1, 25, -4, 10]; // Find the maximum value. let max = find_max(arr); assert_eq!(max, Some(25)); println!("The maximum is {:?}", max); } fn find_max(arr: &[i32]) -> Option<i32> { const THRESHOLD: usize = 2; // If the array is small enough, find the maximum sequentially. if arr.len() <= THRESHOLD { return arr.iter().cloned().max(); } // Split the array into two halves. let mid = arr.len() / 2; let (left, right) = arr.split_at(mid); // Use crossbeam to spawn two threads to find the maximum in each half. crossbeam::scope(|s| { // Spawn a thread to find the maximum in the left half. let thread_l = s.spawn(|_| find_max(left)); // Spawn a thread to find the maximum in the right half. let thread_r = s.spawn(|_| find_max(right)); // Wait for the threads to finish and get the maximum from each half. // The `?` operator propagates errors if any of the threads return // `None`. let max_l = thread_l.join().unwrap()?; let max_r = thread_r.join().unwrap()?; Some(max_l.max(max_r)) }) .unwrap() }
Create a Parallel Pipeline
This example uses the crossbeam
⮳ and crossbeam-channel
⮳ crates to create a parallel pipeline, similar to that described in the ZeroMQ guide⮳. There is a data source and a data sink, with data being processed by two worker threads in parallel on its way from the source to the sink.
We use bounded channels with a capacity of one using crossbeam_channel::bounded
⮳. The producer must be on its own thread because it produces messages faster than the workers can process them (since they sleep for half a second) - this means the producer blocks on the call to crossbeam_channel::Sender::send
⮳ for half a second until one of the workers processes the data in the channel. Also note that the data in the channel is consumed by whichever worker calls receive first, so each message is delivered to a single worker rather than both workers.
Reading from the channels via the iterator crossbeam_channel::Receiver::iter
⮳ method will block, either waiting for new messages or until the channel is closed. Because the channels were created within the crossbeam::scope
⮳ we must manually close them via std::ops::Drop
⮳ to prevent the entire program from blocking on the worker for-loops. You can think of the calls to std::ops::Drop
⮳ as signaling that no more messages will be sent.
use std::thread; use std::time::Duration; use crossbeam_channel::bounded; /// Complex crossbeam channel example with multiple producers, workers, and a /// sink. fn main() { // Create two bounded channels with a capacity of 1. // snd1/rcv1: Used for communication between the producer and workers. // snd2/rcv2: Used for communication between the workers and the sink. let (snd1, rcv1) = bounded(1); let (snd2, rcv2) = bounded(1); let n_msgs = 4; let n_workers = 2; // Create a new scope for spawning threads. crossbeam::scope(|s| { // Producer thread: s.spawn(|_| { for i in 0..n_msgs { snd1.send(i).unwrap(); println!("Source sent {}", i); } // Close the channel - this is necessary to exit // the for-loop in the worker. drop(snd1); }); // Parallel processing by multiple worker threads: for _ in 0..n_workers { // Clone the sender for the sink and the receiver from the source. let (sendr, recvr) = (snd2.clone(), rcv1.clone()); // Spawn worker threads. // Each worker receives messages from rcv1, processes them, // and sends the results to snd2. s.spawn(move |_| { thread::sleep(Duration::from_millis(500)); // Receive until channel closes. for msg in recvr.iter() { println!( "Worker {:?} received {}.", thread::current().id(), msg ); sendr.send(msg * 2).unwrap(); } }); } // Close the snd2 channel. // This is necessary to signal to the sink that no more messages will be // sent. drop(snd2); // Sink: receives and processes messages from the workers. for msg in rcv2.iter() { println!("Sink received {}", msg); } }) .unwrap(); }
Pass Data Between Two Threads
This example demonstrates the use of crossbeam_channel
⮳ in a single producer, single consumer (SPSC) setting. We build off the crossbeam spawn
⮳ example by using crossbeam::scope
⮳ and crossbeam::thread::Scope::spawn
⮳ to manage the producer thread. Data is exchanged between the two threads using a crossbeam::scope
⮳ channel, meaning there is no limit to the number of storable messages. The producer thread sleeps for half a second in between messages.
use std::thread; use std::time; use crossbeam_channel::unbounded; /// Example of using crossbeam's unbounded channel for single-producer, /// single-consumer (SPSC) communication. fn main() { let (snd, rcv) = unbounded(); let n_msgs = 5; crossbeam::scope(|s| { s.spawn(|_| { for i in 0..n_msgs { snd.send(i).unwrap(); thread::sleep(time::Duration::from_millis(100)); } }); }) .unwrap(); for _ in 0..n_msgs { let msg = rcv.recv().unwrap(); println!("Received {}", msg); } }