Mixing Async and Blocking Code

Call Blocking Code from Async Code

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

  • Async code should never spend a long time without reaching an .await .await .
  • Don't carelessly mix async code and synchronous, blocking calls like std::thread::sleep(Duration::from_secs(N));.
  • If you have to block the thread because of expensive CPU-bound computation, call to a synchronous IO API, use the tokio::task::spawn_blocking⮳ function, use rayon⮳, or spawn a dedicated thread.

See Async: What is blocking? blog post⮳.

Use spawn_blocking

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

Use tokio::task::spawn_blocking⮳ to run a small portion of synchronous code.

/// This example demonstrates how to call a blocking function from an
/// asynchronous context using `tokio::task::spawn_blocking`.
///
/// In asynchronous programming, blocking operations can halt the progress of
/// the entire asynchronous runtime. To avoid this,
/// `tokio::task::spawn_blocking` allows you to offload blocking tasks to a
/// separate thread pool.
#[tokio::main]
async fn main() {
    // This is running on the Tokio runtime. We should avoid blocking here.

    // `spawn_blocking` moves the closure to a separate thread pool where
    // blocking is acceptable.
    let blocking_task = tokio::task::spawn_blocking(|| {
        println!("Inside spawn_blocking");
    });

    // Await the completion of the blocking task.
    blocking_task.await.unwrap();
}

Use the rayon Crate

rayon rayon-crates.io rayon-github rayon-lib.rs cat-asynchronous cat-concurrency

rayon

use rayon::prelude::*;

/// Computes the sum of a vector of numbers in parallel using Rayon.
///
/// This function spawns a Rayon task to perform the sum and uses a `oneshot`
/// channel to return the result to the Tokio runtime.
async fn parallel_sum(nums: Vec<i32>) -> i32 {
    let (tx, rx) = tokio::sync::oneshot::channel();

    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Perform an expensive computation on this thread...

        // ...or compute the sum on multiple rayon threads.
        let sum = nums.par_iter().sum();

        // Send the result back to Tokio.
        let _ = tx.send(sum);
    });

    // Wait for the rayon task.
    rx.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

Spawn a Dedicated Thread

rayon rayon-crates.io rayon-github rayon-lib.rs cat-asynchronous cat-concurrency

rayon

If a blocking operation keeps running forever, you should run it on a dedicated thread.

async fn parallel_sum(nums: Vec<i32>) -> i32 {
    let (tx, rx) = tokio::sync::oneshot::channel();

    // Spawn a task on a dedicate thread.
    std::thread::spawn(move || {
        // Perform an expensive computation on this thread...
        let sum = nums.into_iter().sum();

        // Send the result back to the main async task.
        // The underscore is used to ignore the result of the send operation.
        // If the receiver is dropped before the sender sends a value, the send
        // operation will return an error.
        let _ = tx.send(sum);
    });

    // Wait for the result from the dedicated thread.
    rx.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

Call Async Code from Blocking Code

Bridging with sync codetokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

In other cases, it may be easier to structure the application as largely synchronous, with smaller or logically distinct asynchronous portions. For instance, a GUI application might want to run the GUI code on the main thread and run a Tokio runtime next to it on another thread.

Use the futures Executor

futures_executor futures_executor-crates.io futures_executor-github futures_executor-lib.rs cat-asynchronous

futures_executor⮳ includes a minimal executor. The futures_executor::block_on⮳ function is useful if you want to run an async function synchronously in codebase that is mostly synchronous.

async fn do_something() {
    println!("hello, world!");
}

fn main() {
    let future = do_something();
    // Futures are lazy - nothing is happening
    // until driven to completion by .await, block_on...

    // `block_on` blocks the current thread until the provided future has
    // run to completion. Other executors provide more complex
    // behavior, like scheduling multiple futures onto the same
    // thread. See `Tokio`.
    futures::executor::block_on(future);
    // `future` is run and "hello, world!" is printed
}

Use the Tokio Runtime Directly

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

fn main() {
    // Create a new multi-threaded runtime. The runtime is used to
    // execute asynchronous tasks.
    let runtime = tokio::runtime::Builder::new_multi_thread()
        // Set the number of worker threads. In this case, we only
        // need one.
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the async background tasks
    // execute.
    std::thread::sleep(std::time::Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
        // a future, so we can wait for it using `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

// Example async code to execute
async fn my_bg_task(i: u64) {
    // By subtracting, the tasks with larger values of i sleep for a
    // shorter duration.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}

Related Topics

  • Concurrency.