Channels for Use in Async Code

The most common form of synchronization in an async program is message passing. Two tasks operate independently and send messages to each other to synchronize. Doing so has the advantage of avoiding shared state. Message passing is implemented using async channels.

tokio's Async Channels

tokio-website tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

Tokio's sync⮳ module provides channels that work well with async code.

OneShot

tokio::sync::oneshot⮳ sends a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter.

use tokio::sync::oneshot;

/// Simulate some computation that takes a `u32` and returns a `String`.
async fn some_computation(input: u32) -> String {
    format!("The result of computation is {}", input)
}

/// Demonstrates the use of a `oneshot` channel for single-value communication.
async fn one_shot() {
    // Create a `oneshot` channel.
    // `tx` is the sending end, and `rx` is the receiving end.
    let (tx, rx) = oneshot::channel();

    // Spawn a new asynchronous task.
    tokio::spawn(async move {
        // Perform some computation.
        let res = some_computation(0).await;
        // Send the result through the channel.
        // `unwrap()` is used here for simplicity, but in a real application,
        // you should handle the potential error (e.g., if the receiver is
        // dropped).
        tx.send(res).unwrap();
        // Alternatively, return the value via the joinhandle returned
        // by `spawn`
    });

    // Do other work while the computation is happening in the background.
    // ...

    // Wait for the computation result.
    // `await` will block until a value is received or the channel is closed.
    // `unwrap()` is used here for simplicity, but in a real application,
    // you should handle the potential error (e.g., if the sender is dropped).
    let res = rx.await.unwrap();
    println!("{}", res);
}

#[tokio::main]
async fn main() {
    one_shot().await;
}

Another example:

//! This example demonstrates the use of `tokio::sync::oneshot` channels to
//! communicate between asynchronous tasks. It simulates downloading a file
//! and then processing it in separate tasks.

use std::time::Duration;

use tokio::sync::oneshot;

async fn download_file() -> Result<String, std::io::Error> {
    // Simulate downloading a file.
    let filename = "data.txt";
    tokio::time::sleep(Duration::from_secs(2)).await;
    println!("Downloaded file: {}", filename);
    Ok(filename.to_owned())
}

async fn process_file(filename: String) {
    // Simulate processing the downloaded file.
    println!("Processing file: {}", filename);
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("Finished processing file.");
}

async fn async_main() -> Result<(), Box<dyn std::error::Error>> {
    let (sender, receiver) = oneshot::channel();

    // Spawn the download task.
    tokio::spawn(async move {
        let filename = download_file().await?;
        sender.send(filename).expect("Failed to send filename");
        Ok::<(), std::io::Error>(())
    });

    // Wait for the downloaded filename from the receiver.
    let filename = receiver.await?;

    // Spawn the processing task with the filename
    tokio::spawn(async move {
        process_file(filename).await;
    });

    Ok(())
}

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async { async_main().await }).unwrap();
}

Send Messages from Multiple Producers to a Single Consumer

tokio cat-asynchronous

tokio

use tokio::sync::mpsc;

/// Simulates some computation that takes a `u32` and returns a `String`.
async fn some_computation(input: u32) -> String {
    format!("The result of computation is {}", input)
}

/// Demonstrates a multi-producer, single-consumer (MPSC) channel.
/// Multiple producers send data to a single receiver.
pub async fn multi_producer_single_receiver() {
    let (tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        for i in 1..=10 {
            let res = some_computation(i).await;
            tx.send(res).await.unwrap();
        }
    });

    while let Some(res) = rx.recv().await {
        println!("{}", res);
    }
}

#[tokio::main]
async fn main() {
    multi_producer_single_receiver().await;
}

Send Messages from Multiple Producers to One of Multiple Consumers

async-channel async-channel-crates.io async-channel-github async-channel-lib.rs cat-asynchronous cat-concurrency

async-channel offers two kinds of async multi-producer multi-consumer channel, where each message can be received by only one of all existing consumers.

  • Bounded channel with limited capacity,
  • Unbounded channel with unlimited capacity.

The Sender and Receiver sides are cloneable and can be shared among multiple threads.

When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received. The channel can also be closed manually by calling Sender::close() or Receiver::close().

use async_channel::Receiver;
use async_channel::Sender;
use async_channel::TryRecvError;
use async_channel::bounded;
use rand::Rng;
use tokio::task;
use tokio::time;
use tokio::time::Duration;

/// `producer` function:
///
/// This function simulates a producer that sends messages to a channel.
///
/// # Arguments
/// * `id` - The ID of the producer.
async fn producer(id: usize, tx: Sender<String>) {
    for i in 0..5 {
        let msg = format!("Producer {}: Message {}", id, i);
        // Sends messages to the channel.
        // It creates messages in a loop, sends them to the channel.
        // If the channel is full, this method awaits until there is space for a
        // message.
        if let Err(err) = tx.send(msg).await {
            // The channel is closed.
            eprintln!("Failed to send message: {}", err);
            break;
        }
        // Simulate work
        let sleep_duration = rand::rng().random_range(10..50);
        time::sleep(Duration::from_millis(sleep_duration)).await;
    }
}

/// `consumer` function:
///
/// This function simulates a consumer that receives messages from a channel.
///
/// # Arguments
///
/// * `id` - The ID of the consumer.
async fn consumer(id: usize, rx: Receiver<String>) {
    // Receives a message from the channel.
    // If the channel is empty, awaits until there is a message.
    // If the channel is closed, receives a message or returns an error if there
    // are no more messages.
    while let Ok(msg) = rx.recv().await {
        println!("Consumer {}: Received {}", id, msg);
        // Simulate processing
        let sleep_duration = rand::rng().random_range(30..100);
        time::sleep(Duration::from_millis(sleep_duration)).await;
    }
    assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
}

#[tokio::main]
async fn main() {
    let (tx, rx) = bounded(2); // Create a bounded channel with a capacity of 2
    // You may also use an unbounded queue.
    assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));

    // Create 3 producer tasks
    let mut producer_tasks = vec![];
    for i in 0..3 {
        let tx = tx.clone();
        producer_tasks.push(task::spawn(producer(i, tx)));
    }
    assert_eq!(tx.sender_count(), 4);

    // Create 2 consumer tasks
    let mut consumer_tasks = vec![];
    for i in 0..2 {
        let rx = rx.clone();
        consumer_tasks.push(task::spawn(consumer(i, rx)));
    }
    assert_eq!(rx.receiver_count(), 3);

    for task in producer_tasks {
        let _ = task.await;
    }
    println!(
        "The current number of messages in the channel is {}",
        tx.len()
    );

    // Close the channel to signal consumers that no more messages will be sent
    drop(tx);
    // or tx.close();
    assert!(rx.is_closed());

    for task in consumer_tasks {
        let _ = task.await;
    }
    // The channel is empty.
    assert!(rx.is_empty());
}

Broadcast Messages from Multiple Producers to Multiple Consumers

postage postage-crates.io postage-github postage-lib.rs

postage postage is a feature-rich, portable async channel library, with different options than Tokio. postage::broadcast provides a lossless MPMC channel, which all receivers are guaranteed to receive each message.

use postage::broadcast;
use postage::prelude::Stream;
use postage::sink::Sink;
use tokio::task;
use tokio::time::Duration;

/// This function simulates a broadcaster that sends messages to a broadcast
/// channel. The broadcaster sends two messages, each with a unique identifier,
/// and then sleeps for a short duration.
async fn broadcaster(id: usize, mut tx: broadcast::Sender<String>) {
    for i in 0..2 {
        let msg = format!("Broadcaster {}'s message {}", id, i);
        if let Err(err) = tx.send(msg.clone()).await {
            // `send` returns Err(SendError(value))
            // if the sink rejected the message.
            eprintln!("Failed to send message: {}", err);
            break;
        }
        println!("Sent: {}", msg);
        // Simulate work
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
}

/// This function simulates a receiver that receives messages from a broadcast
/// channel. The receiver continuously receives messages and prints them until
/// the channel is closed.
async fn receiver(name: &'static str, mut rx: broadcast::Receiver<String>) {
    while let Some(msg) = rx.recv().await {
        println!("{} receive {}", name, msg);
    }
}

#[tokio::main]
async fn main() {
    // Create a broadcast channel with a capacity of 10 messages.
    // The broadcast channel provides reliable broadcast delivery between
    // multiple senders and multiple receivers. The channel has a fixed
    // capacity, and senders are suspended if the buffer is filled.
    let (tx, rx) = broadcast::channel(10);

    let mut broadcaster_tasks = vec![];
    for i in 0..2 {
        let tx = tx.clone();
        broadcaster_tasks.push(task::spawn(broadcaster(i, tx)));
    }

    // Create a couple of receivers:
    let rx2 = rx.clone();
    task::spawn(receiver("A", rx));
    task::spawn(receiver("B", rx2));

    // Let the broadcasters and receivers run for a short duration.
    tokio::time::sleep(Duration::from_millis(50)).await;

    // Subscribe to the channel.
    // The receiver will observe all messages sent _after the call to
    // subscribe_. Messages currently in the buffer will not be received.
    let rx3 = tx.subscribe();
    task::spawn(receiver("C", rx3));
    // Send a last message to the channel.
    let mut tx2 = tx.clone();
    tx2.send("Last message".into()).await.ok();

    // Wait for all the receivers to print
    tokio::time::sleep(Duration::from_millis(25)).await;
}

kanal

kanal kanal-crates.io kanal-github kanal-lib.rs

kanal offers fast sync and async channels:

use kanal::AsyncReceiver;
use kanal::AsyncSender;
use tokio::task;

/// The `kanal` crate is used for inter-thread communication, similar to Rust's
/// standard `std::sync::mpsc` module but with additional features and
/// capabilities. It provides channels for sending messages between threads.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create an async bounded channel with a capacity of 0.
    // This means that the sender will block until a receiver is ready to
    // receive.
    let (tx, rx): (AsyncSender<i32>, AsyncReceiver<i32>) =
        kanal::bounded_async(0);

    // Spawn two producer tasks.
    // `tx.clone()` is used to create a new sender
    // that can be moved into the task.
    let producer1 = task::spawn(producer(tx.clone(), 1));
    let producer2 = task::spawn(producer(tx, 2));

    // Spawn a consumer task that will receive messages from the channel.
    let consumer = task::spawn(consumer(rx));

    // Wait for all tasks to finish.
    producer1.await??;
    producer2.await??;
    consumer.await??;

    Ok(())
}

/// The producer function sends 5 messages to the channel.
async fn producer(tx: AsyncSender<i32>, id: i32) -> anyhow::Result<()> {
    for i in 0..5 {
        tx.send(i).await?;
        println!("Producer {} sent: {}", id, i);
    }
    Ok(())
}

/// The consumer function receives messages from the channel.
/// It continues to receive messages until the channel is closed.
async fn consumer(rx: AsyncReceiver<i32>) -> anyhow::Result<()> {
    while let Ok(value) = rx.recv().await {
        println!("Consumer received: {}", value);
    }
    Ok(())
}

Related Topics

  • Concurrency.
  • Concurrent Data Structures.