Message passing and channels

One increasingly popular approach to ensuring safe concurrency is message passing, where threads communicate by sending each other messages containing data. The Rust standard library provides channels for message passing that are safe to use in concurrent contexts.

Message passing in async⮳ programming is covered in a separate page: async channels.

Multiple producers, single consumer

std cat-concurrency

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx2 = tx.clone();
    thread::spawn(move || {
        let vals = vec![String::from("hi"), String::from("hi again")];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![String::from("more"), String::from("messages")];

        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    while let Ok(msg) = rx.recv() {
        println!("{msg}");
    }
}

crossbeam-channel

crossbeam-channel-website crossbeam-channel crossbeam-channel-crates.io crossbeam-channel-github crossbeam-channel-lib.rs cat-algorithms cat-concurrency cat-data-structures

Multi-producer multi-consumer channels for message passing. The absolute fastest channel implementation available. Implements Go-like 'select' feature.

use std::thread;

use crossbeam_channel::RecvError;
use crossbeam_channel::TryRecvError;
use crossbeam_channel::unbounded;

fn main() {
    // Create a channel of unbounded capacity.
    let (s1, r1) = unbounded();

    // Alternatively, create a channel that can hold at most n messages at
    // a time. let (s1, r1) = bounded(5);

    // Senders and receivers can be cloned to use them to multiple
    // threads. cloning only creates a new handle to the same sending
    // or receiving side. It does not create a separate stream of
    // messages in any way
    let s2 = s1.clone();

    // Send a message into the channel.
    // Note that the cloned sender is moved into the thread.
    thread::spawn(move || s2.send("Hi!").unwrap());

    // Blocks until receiving the message from the channel.
    assert_eq!(r1.recv(), Ok("Hi!"));

    // Try receiving a message without blocking.
    // The channel is now empty
    assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));

    s1.send("0").unwrap();
    // Receive all remaining messages currently in the channel
    // (non-blocking).
    let v: Vec<_> = r1.try_iter().collect();
    println!("{:?}", v);

    // When all senders or all receivers associated with a channel get
    // dropped, the channel becomes disconnected.
    s1.send("1").unwrap();
    drop(s1);

    // No more messages can be sent...
    // ERROR s1.send("2").unwrap();

    // .. but any remaining messages can still be received.
    println!("{:?}", r1.iter().collect::<Vec<_>>());
    // Note that the call to `collect` would block if the channel were not
    // disconnected.

    // There are no more messages in the channel.
    assert!(r1.is_empty());

    // After disconnection, calling `r1.recv()` does not block
    // Instead, `Err(RecvError)` is returned immediately.
    assert_eq!(r1.recv(), Err(RecvError));
}

Example using specialized channels for tickers and timeout

use std::time::Duration;
use std::time::Instant;

use crossbeam_channel::after;
use crossbeam_channel::select;
use crossbeam_channel::tick;

fn main() {
    let start = Instant::now();
    // Channel that delivers messages periodically.
    let ticker = tick(Duration::from_millis(50));
    // Channel that delivers a single message
    // after a certain duration of time.
    let timeout = after(Duration::from_secs(1));

    loop {
        // `select` wait until any one of the channels becomes ready and
        // execute it.
        select! {
            recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
            recv(timeout) -> _ => break,
            // or use: default(Duration::from_millis(1000)) => break,
        }
    }
}

flume

flume flume-crates.io flume-github flume-lib.rs

The flume crate is a library that provides multiple-producer, multiple-consumer (MPMC) channels. It is similar to the std::sync::mpsc module, but with additional features and improved performance. It is smaller and simpler than crossbeam-channel and almost as fast.

use std::thread;
use std::time::Duration;

use flume::Receiver;
use flume::RecvTimeoutError;
use flume::Sender;
use flume::bounded;
use flume::unbounded;

// Features:
// - Unbounded, bounded and rendezvous queues
// - Drop-in replacement for `std::sync::mpsc`
// - Additional features like MPMC support and send timeouts/deadlines
// - `Sender` and `Receiver` both implement `Send + Sync + Clone`
// - Asynchronous support, including mix and match with sync code
// - `select`-like interface
// - No `unsafe` code

fn main() {
    // Create a bounded channel:
    let (tx, rx): (Sender<i32>, Receiver<i32>) = bounded(2);

    let tx2 = tx.clone();
    // Spawn a producer thread
    let producer_handle = thread::spawn(move || {
        for i in 1..6 {
            // If there is no space left for new messages, calls to
            // `Sender::send` will block
            // (unblocking once a receiver has made space).
            tx2.send(i).expect("All receivers have been dropped");
            println!("Produced: {}", i);
            // Send a value into the channel, returning an error if all
            // receivers have been dropped or the timeout has expired.
            match tx2.send_timeout(i + 10, Duration::from_millis(5)) {
                Ok(_) => println!("Produced: {}", i + 10),
                Err(e) => eprintln!("{}", e),
            }
        }
    });

    let rx2 = rx.clone();
    // Spawn a consumer thread
    let consumer_handle = thread::spawn(move || {
        // Receive values from the channel in a loop.
        loop {
            match rx2.recv_timeout(Duration::from_millis(10)) {
                Ok(value) => println!("Consumed: {}", value),
                e @ Err(RecvTimeoutError::Timeout) => {
                    eprintln!("The timeout has expired: {:?}.", e)
                }
                Err(RecvTimeoutError::Disconnected) => {
                    eprintln!(
                        "All senders are dropped and there are no values left in the channel."
                    );
                    break;
                }
            }
            thread::sleep(Duration::from_millis(10));
        }
    });

    // Wait for producer thread to finish
    match producer_handle.join() {
        Ok(_) => println!("Producer thread finished."),
        Err(e) => eprintln!("Producer thread error: {:?}", e),
    }
    // Drop the original sender as well to signal to the consumer
    // that no more values will be sent.
    drop(tx);
    // All senders for this channel have been dropped.
    assert!(rx.is_disconnected());

    match consumer_handle.join() {
        Ok(_) => println!("Consumer thread finished."),
        Err(e) => eprintln!("Consumer thread error: {:?}", e),
    }

    select();
}

// Requires the "select" feature to be enabled.
fn select() {
    let (tx0, rx0) = unbounded();
    let (tx1, rx1) = unbounded();

    std::thread::spawn(move || {
        tx0.send(true).unwrap();
        tx1.send(42).unwrap();
    });

    // `Selector` allows a thread to wait
    // upon the result of more than one operation at once.
    flume::Selector::new()
        .recv(&rx0, |b| println!("Received {:?}", b))
        .recv(&rx1, |n| println!("Received {:?}", n))
        .wait();
}

See also

Message passing (rust book)