Mixing Async and Blocking Code
Call Blocking Code from Async Code
- Async code should never spend a long time without reaching an
.await
⮳.await
⮳ . - Don't carelessly mix async code and synchronous, blocking calls like
std::thread::sleep(Duration::from_secs(N));
. - If you have to block the thread because of expensive CPU-bound computation, call to a synchronous IO API, use the
tokio::task::spawn_blocking
⮳ function, userayon
⮳, or spawn a dedicated thread.
See Async: What is blocking? blog post⮳.
Use spawn_blocking
Use tokio::task::spawn_blocking
⮳ to run a small portion of synchronous code.
/// This example demonstrates how to call a blocking function from an /// asynchronous context using `tokio::task::spawn_blocking`. /// /// In asynchronous programming, blocking operations can halt the progress of /// the entire asynchronous runtime. To avoid this, /// `tokio::task::spawn_blocking` allows you to offload blocking tasks to a /// separate thread pool. #[tokio::main] async fn main() { // This is running on the Tokio runtime. We should avoid blocking here. // `spawn_blocking` moves the closure to a separate thread pool where // blocking is acceptable. let blocking_task = tokio::task::spawn_blocking(|| { println!("Inside spawn_blocking"); }); // Await the completion of the blocking task. blocking_task.await.unwrap(); }
Use the rayon
Crate
use rayon::prelude::*; /// Computes the sum of a vector of numbers in parallel using Rayon. /// /// This function spawns a Rayon task to perform the sum and uses a `oneshot` /// channel to return the result to the Tokio runtime. async fn parallel_sum(nums: Vec<i32>) -> i32 { let (tx, rx) = tokio::sync::oneshot::channel(); // Spawn a task on rayon. rayon::spawn(move || { // Perform an expensive computation on this thread... // ...or compute the sum on multiple rayon threads. let sum = nums.par_iter().sum(); // Send the result back to Tokio. let _ = tx.send(sum); }); // Wait for the rayon task. rx.await.expect("Panic in rayon::spawn") } #[tokio::main] async fn main() { let nums = vec![1; 1024 * 1024]; println!("{}", parallel_sum(nums).await); }
Spawn a Dedicated Thread
If a blocking operation keeps running forever, you should run it on a dedicated thread.
async fn parallel_sum(nums: Vec<i32>) -> i32 { let (tx, rx) = tokio::sync::oneshot::channel(); // Spawn a task on a dedicate thread. std::thread::spawn(move || { // Perform an expensive computation on this thread... let sum = nums.into_iter().sum(); // Send the result back to the main async task. // The underscore is used to ignore the result of the send operation. // If the receiver is dropped before the sender sends a value, the send // operation will return an error. let _ = tx.send(sum); }); // Wait for the result from the dedicated thread. rx.await.expect("Panic in rayon::spawn") } #[tokio::main] async fn main() { let nums = vec![1; 1024 * 1024]; println!("{}", parallel_sum(nums).await); }
Call Async Code from Blocking Code
In other cases, it may be easier to structure the application as largely synchronous, with smaller or logically distinct asynchronous portions. For instance, a GUI application might want to run the GUI code on the main thread and run a Tokio runtime next to it on another thread.
Use the futures
Executor
futures_executor
⮳ includes a minimal executor. The futures_executor::block_on
⮳ function is useful if you want to run an async function synchronously in codebase that is mostly synchronous.
async fn do_something() { println!("hello, world!"); } fn main() { let future = do_something(); // Futures are lazy - nothing is happening // until driven to completion by .await, block_on... // `block_on` blocks the current thread until the provided future has // run to completion. Other executors provide more complex // behavior, like scheduling multiple futures onto the same // thread. See `Tokio`. futures::executor::block_on(future); // `future` is run and "hello, world!" is printed }
Use the Tokio Runtime Directly
fn main() { // Create a new multi-threaded runtime. The runtime is used to // execute asynchronous tasks. let runtime = tokio::runtime::Builder::new_multi_thread() // Set the number of worker threads. In this case, we only // need one. .worker_threads(1) .enable_all() .build() .unwrap(); let mut handles = Vec::with_capacity(10); for i in 0..10 { handles.push(runtime.spawn(my_bg_task(i))); } // Do something time-consuming while the async background tasks // execute. std::thread::sleep(std::time::Duration::from_millis(750)); println!("Finished time-consuming task."); // Wait for all of them to complete. for handle in handles { // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is // a future, so we can wait for it using `block_on`. runtime.block_on(handle).unwrap(); } } // Example async code to execute async fn my_bg_task(i: u64) { // By subtracting, the tasks with larger values of i sleep for a // shorter duration. let millis = 1000 - 50 * i; println!("Task {} sleeping for {} ms.", i, millis); tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await; println!("Task {} stopping.", i); }
Related Topics
- Concurrency.