Message passing and channels

One increasingly popular approach to ensuring safe concurrency is message passing, where threads communicate by sending each other messages containing data. The Rust standard library provides channels for message passing that are safe to use in concurrent contexts.

Message passing in async⮳ programming is covered in a separate page: async channels.

Multiple producers, single consumer

std cat-concurrency

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx2 = tx.clone();
    thread::spawn(move || {
        let vals = vec![String::from("hi"), String::from("hi again")];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![String::from("more"), String::from("messages")];

        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    while let Ok(msg) = rx.recv() {
        println!("{msg}");
    }
}

crossbeam-channel

crossbeam-channel-website crossbeam-channel crossbeam-channel-crates.io crossbeam-channel-github crossbeam-channel-lib.rs cat-algorithms cat-concurrency cat-data-structures

Multi-producer multi-consumer channels for message passing. The absolute fastest channel implementation available. Implements Go-like 'select' feature.

use std::thread;

use crossbeam_channel::RecvError;
use crossbeam_channel::TryRecvError;
use crossbeam_channel::unbounded;

fn main() {
    // Create a channel of unbounded capacity.
    let (s1, r1) = unbounded();

    // Alternatively, create a channel that can hold at most n messages at
    // a time. let (s1, r1) = bounded(5);

    // Senders and receivers can be cloned to use them to multiple
    // threads. cloning only creates a new handle to the same sending
    // or receiving side. It does not create a separate stream of
    // messages in any way
    let s2 = s1.clone();

    // Send a message into the channel.
    // Note that the cloned sender is moved into the thread.
    thread::spawn(move || s2.send("Hi!").unwrap());

    // Blocks until receiving the message from the channel.
    assert_eq!(r1.recv(), Ok("Hi!"));

    // Try receiving a message without blocking.
    // The channel is now empty
    assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));

    s1.send("0").unwrap();
    // Receive all remaining messages currently in the channel
    // (non-blocking).
    let v: Vec<_> = r1.try_iter().collect();
    println!("{:?}", v);

    // When all senders or all receivers associated with a channel get
    // dropped, the channel becomes disconnected.
    s1.send("1").unwrap();
    drop(s1);

    // No more messages can be sent...
    // ERROR s1.send("2").unwrap();

    // .. but any remaining messages can still be received.
    println!("{:?}", r1.iter().collect::<Vec<_>>());
    // Note that the call to `collect` would block if the channel were not
    // disconnected.

    // There are no more messages in the channel.
    assert!(r1.is_empty());

    // After disconnection, calling `r1.recv()` does not block
    // Instead, `Err(RecvError)` is returned immediately.
    assert_eq!(r1.recv(), Err(RecvError));
}

Example using specialized channels for tickers and timeout

use std::time::Duration;
use std::time::Instant;

use crossbeam_channel::after;
use crossbeam_channel::select;
use crossbeam_channel::tick;

fn main() {
    let start = Instant::now();
    // Channel that delivers messages periodically.
    let ticker = tick(Duration::from_millis(50));
    // Channel that delivers a single message
    // after a certain duration of time.
    let timeout = after(Duration::from_secs(1));

    loop {
        // `select` wait until any one of the channels becomes ready and
        // execute it.
        select! {
            recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
            recv(timeout) -> _ => break,
            // or use: default(Duration::from_millis(1000)) => break,
        }
    }
}

flume

flume flume-crates.io flume-github flume-lib.rs

Smaller and simpler than crossbeam-channel and almost as fast.

fn main() {
    todo!();
}

tokio

tokio-website tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

Tokio's sync module provides channels for using in async code.

fn main() {
    todo!();
}

See also

Message passing (rust book)