Futures Crate

RecipeCratesCategories
Selecting Futuresfuturescat-asynchronous
Joining Futuresfuturescat-asynchronous
Map, then, either, flattenfuturescat-asynchronous
futures-util[![futures-util][c-futures-util-badge]][c-futures-util]cat-asynchronous

futures-website futures futures-crates.io futures-github futures-lib.rs cat-asynchronous

The futures⮳ crate provides a number of core abstractions for writing asynchronous code.

In most cases, you will use this crate directly only when writing async code intended to work for multiple runtimes. Otherwise, use the utilities provided by the ecosystem of your choice - Tokio for example.

Selecting Futures

futures-website futures futures-crates.io futures-github futures-lib.rs cat-asynchronous

futures::future::Select⮳ polls multiple futures and streams simultaneously, executing the branch for the future that finishes first. If multiple futures are ready, one will be pseudo-randomly selected at runtime.


use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() {
    // ...
}
async fn task_two() {
    // ...
}

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

#[tokio::main]
async fn main() {
    race_tasks().await;
}

Joining Futures

futures-website futures futures-crates.io futures-github futures-lib.rs cat-asynchronous

use futures::join;

async fn foo(i: u32) -> u32 {
    println!("{i}");
    i
}

#[tokio::main]
async fn main() {
    // The `join!` macro polls multiple futures simultaneously, returning
    // a tuple of all results once complete.
    assert_eq!(join!(foo(1), foo(2)), (1, 2));
    // `join!` is variadic, so you can pass any number of futures

    // `join_all` create a future which represents a collection of the
    // outputs of the futures given.
    let futures = vec![foo(1), foo(2), foo(3)];
    assert_eq!(futures::future::join_all(futures).await, [1, 2, 3]);
}

Map, then, either, flatten

futures-website futures futures-crates.io futures-github futures-lib.rs cat-asynchronous

The futures⮳ crate provides an extension trait that provides a variety of convenient adapters.

use anyhow::Result;
use futures::future::FutureExt;

#[tokio::main]
async fn main() -> Result<()> {
    let future_of_1 = async { 1 };

    // Map this future's output to a (possibly) different type, returning
    // a new future of the resulting type.
    let new_future = future_of_1.map(|x| x + 3);

    // Chain on a computation for when a future finished, passing the
    // result of the future to the provided closure f.
    let future_of_7 = new_future.then(|x| async move { x + 3 });
    let seven = future_of_7.await;
    println!("{}", seven);
    assert_eq!(seven, 7);

    // Conditional `Either` future
    let x = 6;
    let future = if x > 10 {
        async { true }.left_future()
    } else {
        async { false }.right_future()
    };
    let not_true: bool = future.await;
    assert!(!not_true);

    // Flatten nested futures
    let nested_future = async { async { 1 } };
    let future = nested_future.flatten();
    let flat = future.await;
    println!("{flat}");
    assert_eq!(flat, 1);
    Ok(())
}

futures-util

future-utils future-utils-crates.io future-utils-github future-utils-lib.rs cat-asynchronous cat-network-programming

Common utilities and extension traits for the futures-rs library. Extensions to Rust's Future and Stream traits. Combinators and utilities for working with Futures, Streams, Sinks, and the AsyncRead and AsyncWrite traits.

use std::time::Duration;

use futures::future;
use futures_util::future::FutureExt;
use futures_util::future::TryFutureExt;
use futures_util::pin_mut;
use tokio::time;

// This example showcases various utilities for working with asynchronous code.

// Add these dependencies to your Cargo.toml:
// [dependencies]
// futures = "0.3"
// futures-util = "0.3"
// tokio = { version = "1", features = ["full"] }

// Simulated async function that returns a `Result`:
async fn fetch_data(id: u32) -> Result<String, String> {
    // Simulate network delay
    time::sleep(Duration::from_millis(100)).await;
    if id % 3 == 0 {
        Err(format!("Error fetching data for id {}", id))
    } else {
        Ok(format!("Data for id {}", id))
    }
}

#[tokio::main]
async fn main() {
    println!("\n===== Combining Futures =====");
    // `select` for racing futures,
    // `join_all` for waiting on multiple futures,
    // `try_join` for handling errors in combined futures.

    // `ready` creates a future that is immediately ready with a value.
    let future1 =
        time::sleep(Duration::from_millis(100)).then(|_| future::ready(1));
    let future2 =
        time::sleep(Duration::from_millis(50)).then(|_| future::ready(2));

    // `select` requires `Future` + `Unpin` bounds.
    pin_mut!(future1);
    pin_mut!(future2);

    // Select between multiple futures:
    match future::select(future1, future2).await {
        future::Either::Left((val, _)) => {
            println!("Future 1 completed first with: {}", val)
        }
        future::Either::Right((val, _)) => {
            println!("Future 2 completed first with: {}", val)
        }
    }

    // Joining futures:
    let futures = vec![fetch_data(1), fetch_data(2)];
    // The future returned by `join_all` will drive execution for all of its
    // underlying futures, collecting the results into a destination Vec<T>
    // in the same order as they were provided.
    let results: Vec<Result<String, String>> = future::join_all(futures).await;
    println!("Join results: {:?}", results);

    // `try_join`:
    // If successful, the returned future will finish with a tuple of both
    // results. If unsuccessful, it will complete with the first error
    // encountered.
    let results = future::try_join(fetch_data(1), fetch_data(4)).await;
    println!("try_join results: {:?}", results);

    println!("===== Future Extensions =====");
    // `map` to transform future outputs,
    // `and_then` for chaining asynchronous operations,
    // `or_else` for error handling.

    // Using `map` to transform the output of a Future
    let future = fetch_data(1).map(|result| match result {
        Ok(data) => Ok(format!("Processed: {}", data)),
        Err(e) => Err(e),
    });
    println!("Map result: {:?}", future.await);

    // Using `and_then` for chaining futures
    let future = fetch_data(2).and_then(|data| async move {
        // Simulate additional processing
        time::sleep(Duration::from_millis(50)).await;
        Ok(format!("Enhanced: {}", data))
    });
    println!("AndThen result: {:?}", future.await);

    // Using `or_else` for error handling
    let future = fetch_data(3).or_else(|err| async move {
        println!("Recovering from error: {}", err);
        // Return a fallback value
        Ok::<_, String>(String::from("Fallback data"))
    });
    println!("OrElse result: {:?}", future.await);
}

Related Topics

  • Tokio.

See Also

futures_executor