Channels for use in async code
Recipe | Crates | Categories |
---|---|---|
OneShot | ||
Multiple Producer, Single Consumer |
The most common form of synchronization in an async program is message passing. Two tasks operate independently and send messages to each other to synchronize. Doing so has the advantage of avoiding shared state. Message passing is implemented using async channels.
Tokio's sync
⮳ module provides channels that work well with async code.
OneShot
tokio::sync::oneshot
⮳ sends a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter.
use tokio::sync::oneshot; async fn some_computation(input: u32) -> String { format!("the result of computation is {}", input) } async fn one_shot() { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let res = some_computation(0).await; tx.send(res).unwrap(); // Alternatively, return the value via the joinhandle returned // by `spawn` }); // Do other work while the computation is happening in the background // Wait for the computation result let res = rx.await.unwrap(); println!("{}", res); } #[tokio::main] async fn main() { one_shot().await; }
Another example:
use std::time::Duration; use tokio::sync::oneshot; async fn download_file() -> Result<String, std::io::Error> { // Simulate downloading a file let filename = "data.txt"; tokio::time::sleep(Duration::from_secs(2)).await; println!("Downloaded file: {}", filename); Ok(filename.to_owned()) } async fn process_file(filename: String) { // Simulate processing the downloaded file println!("Processing file: {}", filename); tokio::time::sleep(Duration::from_secs(1)).await; println!("Finished processing file."); } async fn async_main() -> Result<(), Box<dyn std::error::Error>> { let (sender, receiver) = oneshot::channel(); // Spawn the download task tokio::spawn(async move { let filename = download_file().await?; sender.send(filename).expect("Failed to send filename"); Ok::<(), std::io::Error>(()) }); // Wait for the downloaded filename from the receiver let filename = receiver.await?; // Spawn the processing task with the filename tokio::spawn(async move { process_file(filename).await; }); Ok(()) } fn main() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { async_main().await }).unwrap(); }
Send messages from multiple producers to a single consumer
use tokio::sync::mpsc; async fn some_computation(input: u32) -> String { format!("the result of computation is {}", input) } pub async fn multi_producer_single_receiver() { let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { for i in 1..=10 { let res = some_computation(i).await; tx.send(res).await.unwrap(); } }); while let Some(res) = rx.recv().await { println!("{}", res); } } #[tokio::main] async fn main() { multi_producer_single_receiver().await; }
Send messages from multiple producers to one of multiple consumers
async-channel
offers two kinds of async multi-producer multi-consumer channel, where each message can be received by only one of all existing consumers.
- Bounded channel with limited capacity,
- Unbounded channel with unlimited capacity.
The Sender and Receiver sides are cloneable and can be shared among multiple threads.
When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received. The channel can also be closed manually by calling Sender::close()
or Receiver::close()
.
use async_channel::Receiver; use async_channel::Sender; use async_channel::TryRecvError; use async_channel::bounded; use rand::Rng; use tokio::task; use tokio::time; use tokio::time::Duration; async fn producer(id: usize, tx: Sender<String>) { for i in 0..5 { let msg = format!("Producer {}: Message {}", id, i); // Sends messages to the channel. // It creates messages in a loop, sends them to the channel. // If the channel is full, this method awaits until there is space for a // message. if let Err(err) = tx.send(msg).await { // The channel is closed. eprintln!("Failed to send message: {}", err); break; } // Simulate work let sleep_duration = rand::thread_rng().gen_range(10..50); time::sleep(Duration::from_millis(sleep_duration)).await; } } async fn consumer(id: usize, rx: Receiver<String>) { // Receives a message from the channel. // If the channel is empty, awaits until there is a message. // If the channel is closed, receives a message or returns an error if there // are no more messages. while let Ok(msg) = rx.recv().await { println!("Consumer {}: Received {}", id, msg); // Simulate processing let sleep_duration = rand::thread_rng().gen_range(30..100); time::sleep(Duration::from_millis(sleep_duration)).await; } assert_eq!(rx.try_recv(), Err(TryRecvError::Closed)); } #[tokio::main] async fn main() { let (tx, rx) = bounded(2); // Create a bounded channel with a capacity of 2 // You may also use an unbounded queue. assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); // Create 3 producer tasks let mut producer_tasks = vec![]; for i in 0..3 { let tx = tx.clone(); producer_tasks.push(task::spawn(producer(i, tx))); } assert_eq!(tx.sender_count(), 4); // Create 2 consumer tasks let mut consumer_tasks = vec![]; for i in 0..2 { let rx = rx.clone(); consumer_tasks.push(task::spawn(consumer(i, rx))); } assert_eq!(rx.receiver_count(), 3); for task in producer_tasks { let _ = task.await; } println!( "The current number of messages in the channel is {}", tx.len() ); // Close the channel to signal consumers that no more messages will be sent drop(tx); // or tx.close(); assert!(rx.is_closed()); for task in consumer_tasks { let _ = task.await; } // The channel is empty. assert!(rx.is_empty()); }
Broadcast messages from multiple producers to multiple consumers
postage
is a feature-rich, portable async channel library, with different options than Tokio. postage::broadcast
provides a lossless MPMC channel, which all receivers are guaranteed to receive each message.
use postage::broadcast; use postage::prelude::Stream; use postage::sink::Sink; use tokio::task; use tokio::time::Duration; async fn broadcaster(id: usize, mut tx: broadcast::Sender<String>) { for i in 0..2 { let msg = format!("Broadcaster {}'s message {}", id, i); if let Err(err) = tx.send(msg.clone()).await { // `send` returns Err(SendError(value)) // if the sink rejected the message. eprintln!("Failed to send message: {}", err); break; } println!("Sent: {}", msg); // Simulate work tokio::time::sleep(Duration::from_millis(10)).await; } } async fn receiver(name: &'static str, mut rx: broadcast::Receiver<String>) { while let Some(msg) = rx.recv().await { println!("{} receive {}", name, msg); } } #[tokio::main] async fn main() { // The broadcast channel provides reliable broadcast delivery between // multiple senders and multiple receivers. The channel has a fixed // capacity, and senders are suspended if the buffer is filled. let (tx, rx) = broadcast::channel(10); let mut broadcaster_tasks = vec![]; for i in 0..2 { let tx = tx.clone(); broadcaster_tasks.push(task::spawn(broadcaster(i, tx))); } // Let's create a couple of receivers: let rx2 = rx.clone(); task::spawn(receiver("A", rx)); task::spawn(receiver("B", rx2)); tokio::time::sleep(Duration::from_millis(50)).await; // We may also subscribe to the channel. // The receiver will observe all messages sent _after the call to // subscribe_. Messages currently in the buffer will not be received. let rx3 = tx.subscribe(); task::spawn(receiver("C", rx3)); let mut tx2 = tx.clone(); tx2.send("Last message".into()).await.ok(); // Wait for all the receivers to print tokio::time::sleep(Duration::from_millis(25)).await; }
kanal
Fast sync and async channel:
fn main() { todo!(); }