Channels for use in async code

The most common form of synchronization in an async program is message passing. Two tasks operate independently and send messages to each other to synchronize. Doing so has the advantage of avoiding shared state. Message passing is implemented using async channels.

Tokio's sync⮳ module provides channels that work well with async code.

OneShot

tokio cat-asynchronous

tokio::sync::oneshot⮳ sends a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter.

use tokio::sync::oneshot;

async fn some_computation(input: u32) -> String {
    format!("the result of computation is {}", input)
}

async fn one_shot() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        let res = some_computation(0).await;
        tx.send(res).unwrap();
        // Alternatively, return the value via the joinhandle returned
        // by `spawn`
    });

    // Do other work while the computation is happening in the background

    // Wait for the computation result
    let res = rx.await.unwrap();
    println!("{}", res);
}

#[tokio::main]
async fn main() {
    one_shot().await;
}

Another example:

use std::time::Duration;

use tokio::sync::oneshot;

async fn download_file() -> Result<String, std::io::Error> {
    // Simulate downloading a file
    let filename = "data.txt";
    tokio::time::sleep(Duration::from_secs(2)).await;
    println!("Downloaded file: {}", filename);
    Ok(filename.to_owned())
}

async fn process_file(filename: String) {
    // Simulate processing the downloaded file
    println!("Processing file: {}", filename);
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("Finished processing file.");
}

async fn async_main() -> Result<(), Box<dyn std::error::Error>> {
    let (sender, receiver) = oneshot::channel();

    // Spawn the download task
    tokio::spawn(async move {
        let filename = download_file().await?;
        sender.send(filename).expect("Failed to send filename");
        Ok::<(), std::io::Error>(())
    });

    // Wait for the downloaded filename from the receiver
    let filename = receiver.await?;

    // Spawn the processing task with the filename
    tokio::spawn(async move {
        process_file(filename).await;
    });

    Ok(())
}

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async { async_main().await }).unwrap();
}

Multiple Producer, Single Consumer

tokio cat-asynchronous

use tokio::sync::mpsc;

async fn some_computation(input: u32) -> String {
    format!("the result of computation is {}", input)
}

pub async fn multi_producer_single_receiver() {
    let (tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        for i in 1..=10 {
            let res = some_computation(i).await;
            tx.send(res).await.unwrap();
        }
    });

    while let Some(res) = rx.recv().await {
        println!("{}", res);
    }
}

#[tokio::main]
async fn main() {
    multi_producer_single_receiver().await;
}

See also

async-channel

postage postage-crates.io postage-github postage-lib.rs

Fast sync and async channel:

kanal kanal-crates.io kanal-github kanal-lib.rs