Message passing and channels
Recipe | Crates | Categories |
---|---|---|
Multiple producers, single consumer | ||
crossbeam-channel | ||
flume |
One increasingly popular approach to ensuring safe concurrency is message passing, where threads communicate by sending each other messages containing data. The Rust standard library provides channels for message passing that are safe to use in concurrent contexts.
Message passing in async
⮳ programming is covered in a separate page: async channels.
Multiple producers, single consumer
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx2 = tx.clone(); thread::spawn(move || { let vals = vec![String::from("hi"), String::from("hi again")]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![String::from("more"), String::from("messages")]; for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); while let Ok(msg) = rx.recv() { println!("{msg}"); } }
crossbeam-channel
Multi-producer multi-consumer channels for message passing. The absolute fastest channel implementation available. Implements Go-like 'select' feature.
use std::thread; use crossbeam_channel::RecvError; use crossbeam_channel::TryRecvError; use crossbeam_channel::unbounded; fn main() { // Create a channel of unbounded capacity. let (s1, r1) = unbounded(); // Alternatively, create a channel that can hold at most n messages at // a time. let (s1, r1) = bounded(5); // Senders and receivers can be cloned to use them to multiple // threads. cloning only creates a new handle to the same sending // or receiving side. It does not create a separate stream of // messages in any way let s2 = s1.clone(); // Send a message into the channel. // Note that the cloned sender is moved into the thread. thread::spawn(move || s2.send("Hi!").unwrap()); // Blocks until receiving the message from the channel. assert_eq!(r1.recv(), Ok("Hi!")); // Try receiving a message without blocking. // The channel is now empty assert_eq!(r1.try_recv(), Err(TryRecvError::Empty)); s1.send("0").unwrap(); // Receive all remaining messages currently in the channel // (non-blocking). let v: Vec<_> = r1.try_iter().collect(); println!("{:?}", v); // When all senders or all receivers associated with a channel get // dropped, the channel becomes disconnected. s1.send("1").unwrap(); drop(s1); // No more messages can be sent... // ERROR s1.send("2").unwrap(); // .. but any remaining messages can still be received. println!("{:?}", r1.iter().collect::<Vec<_>>()); // Note that the call to `collect` would block if the channel were not // disconnected. // There are no more messages in the channel. assert!(r1.is_empty()); // After disconnection, calling `r1.recv()` does not block // Instead, `Err(RecvError)` is returned immediately. assert_eq!(r1.recv(), Err(RecvError)); }
Example using specialized channels for tickers
and timeout
use std::time::Duration; use std::time::Instant; use crossbeam_channel::after; use crossbeam_channel::select; use crossbeam_channel::tick; fn main() { let start = Instant::now(); // Channel that delivers messages periodically. let ticker = tick(Duration::from_millis(50)); // Channel that delivers a single message // after a certain duration of time. let timeout = after(Duration::from_secs(1)); loop { // `select` wait until any one of the channels becomes ready and // execute it. select! { recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()), recv(timeout) -> _ => break, // or use: default(Duration::from_millis(1000)) => break, } } }
flume
The flume
crate is a library that provides multiple-producer, multiple-consumer (MPMC) channels. It is similar to the std::sync::mpsc
module, but with additional features and improved performance. It is smaller and simpler than crossbeam-channel
and almost as fast.
use std::thread; use std::time::Duration; use flume::Receiver; use flume::RecvTimeoutError; use flume::Sender; use flume::bounded; use flume::unbounded; // Features: // - Unbounded, bounded and rendezvous queues // - Drop-in replacement for `std::sync::mpsc` // - Additional features like MPMC support and send timeouts/deadlines // - `Sender` and `Receiver` both implement `Send + Sync + Clone` // - Asynchronous support, including mix and match with sync code // - `select`-like interface // - No `unsafe` code fn main() { // Create a bounded channel: let (tx, rx): (Sender<i32>, Receiver<i32>) = bounded(2); let tx2 = tx.clone(); // Spawn a producer thread let producer_handle = thread::spawn(move || { for i in 1..6 { // If there is no space left for new messages, calls to // `Sender::send` will block // (unblocking once a receiver has made space). tx2.send(i).expect("All receivers have been dropped"); println!("Produced: {}", i); // Send a value into the channel, returning an error if all // receivers have been dropped or the timeout has expired. match tx2.send_timeout(i + 10, Duration::from_millis(5)) { Ok(_) => println!("Produced: {}", i + 10), Err(e) => eprintln!("{}", e), } } }); let rx2 = rx.clone(); // Spawn a consumer thread let consumer_handle = thread::spawn(move || { // Receive values from the channel in a loop. loop { match rx2.recv_timeout(Duration::from_millis(10)) { Ok(value) => println!("Consumed: {}", value), e @ Err(RecvTimeoutError::Timeout) => { eprintln!("The timeout has expired: {:?}.", e) } Err(RecvTimeoutError::Disconnected) => { eprintln!( "All senders are dropped and there are no values left in the channel." ); break; } } thread::sleep(Duration::from_millis(10)); } }); // Wait for producer thread to finish match producer_handle.join() { Ok(_) => println!("Producer thread finished."), Err(e) => eprintln!("Producer thread error: {:?}", e), } // Drop the original sender as well to signal to the consumer // that no more values will be sent. drop(tx); // All senders for this channel have been dropped. assert!(rx.is_disconnected()); match consumer_handle.join() { Ok(_) => println!("Consumer thread finished."), Err(e) => eprintln!("Consumer thread error: {:?}", e), } select(); } // Requires the "select" feature to be enabled. fn select() { let (tx0, rx0) = unbounded(); let (tx1, rx1) = unbounded(); std::thread::spawn(move || { tx0.send(true).unwrap(); tx1.send(42).unwrap(); }); // `Selector` allows a thread to wait // upon the result of more than one operation at once. flume::Selector::new() .recv(&rx0, |b| println!("Received {:?}", b)) .recv(&rx1, |n| println!("Received {:?}", n)) .wait(); }