Tokio

RecipeCratesCategories
Basicstokiocat-asynchronous
Jointokiocat-asynchronous
Spawningtokiocat-asynchronous
IOtokiocat-asynchronous
Graceful shutdowntokio_graceful_shutdowncat-asynchronous

Basics

tokio tokio-crates.io tokio-github tokio-lib.rs cat-asynchronous cat-network-programming

Tokio is an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing networking applications. Tokio provides a few major components:

  • Multiple variations of the runtime for executing asynchronous code. Everything from a multi-threaded, work-stealing runtime to a light-weight, single-threaded runtime.

  • An asynchronous version of the standard library.

  • A large ecosystem of libraries.

  • creating and running a runtime, spawning tasks, working with I/O and timers, and handling errors.

Join

By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel. This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to join!.

Spawning

IO

  • read and write data asynchronously with Tokio, using streams, codecs, and futures. It also shows how to handle errors and timeouts.

Current thread runtime

equivalent to

fn main() {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

LocalSet

In some cases, it is necessary to run one or more futures that do not implement Send and thus are unsafe to send between threads. In these cases, a local task set may be used to schedule one or more !Send futures to run together on the same thread.

use std::rc::Rc;

use tokio::task;
use tokio::time;

#[tokio::main]
async fn main() {
    // Data that is not thread-safe:
    let nonsend_data = Rc::new("world");

    // A set of tasks which are executed on the same thread:
    let local = task::LocalSet::new();

    let nonsend_data2 = nonsend_data.clone();
    local.spawn_local(async move {
        // ...
        println!("hello {}", nonsend_data2)
    });

    local.spawn_local(async move {
        time::sleep(time::Duration::from_millis(100)).await;
        println!("goodbye {}", nonsend_data)
    });

    // ...

    local.await;
}

Graceful shutdown

tokio_graceful_shutdown cat-asynchronous

Example from c-tokio_graceful_shutdownc-tokio_graceful_shutdown⮳:

use tokio::time::sleep;
use tokio::time::Duration;
use tokio_graceful_shutdown::SubsystemBuilder;
use tokio_graceful_shutdown::SubsystemHandle;
use tokio_graceful_shutdown::Toplevel;
// use tracing::Level;

async fn countdown() {
    for i in (1..=5).rev() {
        tracing::info!("Shutting down in: {}", i);
        sleep(Duration::from_millis(1000)).await;
    }
}

async fn countdown_subsystem(
    subsys: SubsystemHandle,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
    tokio::select! {
        _ = subsys.on_shutdown_requested() => {
            tracing::info!("Countdown cancelled.");
        },
        _ = countdown() => {
            subsys.request_shutdown();
        }
    };
    Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Init logging
    tracing_subscriber::fmt()
        // .with_max_level(Level::TRACE)
        .init();

    // Setup and execute subsystem tree
    Toplevel::new(|s| async move {
        s.start(SubsystemBuilder::new("Countdown", countdown_subsystem));
    })
    // Signals the Toplevel object to listen for SIGINT/SIGTERM/Ctrl+C
    .catch_signals()
    // Collects all the return values of the subsystems, determines the global error state
    .handle_shutdown_requests(Duration::from_millis(1000))
    .await
    .map_err(|e| e.into())
}