Channels for use in async code
Recipe | Crates | Categories |
---|---|---|
OneShot | ||
Multiple Producer, Single Consumer |
TODO
The most common form of synchronization in an async program is message passing. Two tasks operate independently and send messages to each other to synchronize. Doing so has the advantage of avoiding shared state. Message passing is implemented using async channels.
Tokio's sync
⮳ module provides channels that work well with async code.
OneShot
tokio::sync::oneshot
⮳ sends a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter.
use tokio::sync::oneshot; async fn some_computation(input: u32) -> String { format!("the result of computation is {}", input) } async fn one_shot() { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let res = some_computation(0).await; tx.send(res).unwrap(); // Alternatively, return the value via the joinhandle returned // by `spawn` }); // Do other work while the computation is happening in the background // Wait for the computation result let res = rx.await.unwrap(); println!("{}", res); } #[tokio::main] async fn main() { one_shot().await; }
Another example:
use std::time::Duration; use tokio::sync::oneshot; async fn download_file() -> Result<String, std::io::Error> { // Simulate downloading a file let filename = "data.txt"; tokio::time::sleep(Duration::from_secs(2)).await; println!("Downloaded file: {}", filename); Ok(filename.to_owned()) } async fn process_file(filename: String) { // Simulate processing the downloaded file println!("Processing file: {}", filename); tokio::time::sleep(Duration::from_secs(1)).await; println!("Finished processing file."); } async fn async_main() -> Result<(), Box<dyn std::error::Error>> { let (sender, receiver) = oneshot::channel(); // Spawn the download task tokio::spawn(async move { let filename = download_file().await?; sender.send(filename).expect("Failed to send filename"); Ok::<(), std::io::Error>(()) }); // Wait for the downloaded filename from the receiver let filename = receiver.await?; // Spawn the processing task with the filename tokio::spawn(async move { process_file(filename).await; }); Ok(()) } fn main() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { async_main().await }).unwrap(); }
Multiple Producer, Single Consumer
use tokio::sync::mpsc; async fn some_computation(input: u32) -> String { format!("the result of computation is {}", input) } pub async fn multi_producer_single_receiver() { let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { for i in 1..=10 { let res = some_computation(i).await; tx.send(res).await.unwrap(); } }); while let Some(res) = rx.recv().await { println!("{}", res); } } #[tokio::main] async fn main() { multi_producer_single_receiver().await; }
See also
Fast sync and async channel:
TODO: review