Mixing Async and Blocking Code

Calling blocking code from async code

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

  • Async code should never spend a long time without reaching an .await.
  • Don't carelessly mix async code and synchronous, blocking calls like std::thread::sleep(Duration::from_secs(N));
  • If you have to block the thread because of expensive CPU-bound computation, call to a synchronous IO API, use the tokio::task::spawn_blocking⮳ function, use rayon⮳, or spawn a dedicated thread.

See Async: What is blocking? blog post⮳.

Tokio spawn_blocking

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

Use tokio::task::spawn_blocking⮳ to run a small portion of synchronous code.

#[tokio::main]
async fn main() {
    // This is running on Tokio. We may not block here.

    let blocking_task = tokio::task::spawn_blocking(|| {
        // This is running on a thread where blocking is fine.
        println!("Inside spawn_blocking");
    });

    blocking_task.await.unwrap();
}

Using the rayon crate

rayon rayon-crates.io rayon-github rayon-lib.rs cat-asynchronous cat-concurrency

use rayon::prelude::*;

async fn parallel_sum(nums: Vec<i32>) -> i32 {
    let (tx, rx) = tokio::sync::oneshot::channel();

    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Perform an expensive computation on this thread...

        // ...or compute the sum on multiple rayon threads.
        let sum = nums.par_iter().sum();

        // Send the result back to Tokio.
        let _ = tx.send(sum);
    });

    // Wait for the rayon task.
    rx.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

Spawn a dedicated thread

rayon rayon-crates.io rayon-github rayon-lib.rs cat-asynchronous cat-concurrency

If a blocking operation keeps running forever, you should run it on a dedicated thread.

async fn parallel_sum(nums: Vec<i32>) -> i32 {
    let (tx, rx) = tokio::sync::oneshot::channel();

    // Spawn a task on a dedicate thread.
    std::thread::spawn(move || {
        // Perform an expensive computation on this thread...
        let sum = nums.into_iter().sum();

        // Send the result back to Tokio.
        let _ = tx.send(sum);
    });

    // Wait for the rayon task.
    rx.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

Call async code from blocking code

Bridging with sync codetokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

In other cases, it may be easier to structure the application as largely synchronous, with smaller or logically distinct asynchronous portions. For instance, a GUI application might want to run the GUI code on the main thread and run a Tokio runtime next to it on another thread.

Futures executor

futures_executor futures_executor-crates.io futures_executor-github futures_executor-lib.rs cat-asynchronous

futures_executor⮳ includes a minimal executor. The futures_executor::block_on⮳ function is useful if you want to run an async function synchronously in codebase that is mostly synchronous.

async fn do_something() {
    println!("hello, world!");
}

fn main() {
    let future = do_something();
    // Futures are lazy - nothing is happening
    // until driven to completion by .await, block_on...

    // `block_on` blocks the current thread until the provided future has
    // run to completion. Other executors provide more complex
    // behavior, like scheduling multiple futures onto the same
    // thread. See `Tokio`.
    futures::executor::block_on(future);
    // `future` is run and "hello, world!" is printed
}

Using the Tokio runtime directly

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

fn main() {
    let runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the async background tasks
    // execute.
    std::thread::sleep(std::time::Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
        // a future, so we can wait for it using `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

// Example async code to execute
async fn my_bg_task(i: u64) {
    // By subtracting, the tasks with larger values of i sleep for a
    // shorter duration.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    tokio::time::sleep(tokio::time::Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}