Futures Crate
Recipe | Crates | Categories |
---|---|---|
Selecting Futures | ||
Joining Futures | ||
Map, then , either , flatten | ||
futures-util | [![futures-util][c-futures-util-badge]][c-futures-util] |
The futures
⮳ crate provides a number of core abstractions for writing asynchronous code.
In most cases, you will use this crate directly only when writing async code intended to work for multiple runtimes. Otherwise, use the utilities provided by the ecosystem of your choice - Tokio for example.
Selecting Futures
futures::future::Select
⮳ polls multiple futures and streams simultaneously, executing the branch for the future that finishes first. If multiple futures are ready, one will be pseudo-randomly selected at runtime.
use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { // ... } async fn task_two() { // ... } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } #[tokio::main] async fn main() { race_tasks().await; }
Joining Futures
use futures::join; async fn foo(i: u32) -> u32 { println!("{i}"); i } #[tokio::main] async fn main() { // The `join!` macro polls multiple futures simultaneously, returning // a tuple of all results once complete. assert_eq!(join!(foo(1), foo(2)), (1, 2)); // `join!` is variadic, so you can pass any number of futures // `join_all` create a future which represents a collection of the // outputs of the futures given. let futures = vec![foo(1), foo(2), foo(3)]; assert_eq!(futures::future::join_all(futures).await, [1, 2, 3]); }
Map, then
, either
, flatten
The futures
⮳ crate provides an extension trait that provides a variety of convenient adapters.
use anyhow::Result; use futures::future::FutureExt; #[tokio::main] async fn main() -> Result<()> { let future_of_1 = async { 1 }; // Map this future's output to a (possibly) different type, returning // a new future of the resulting type. let new_future = future_of_1.map(|x| x + 3); // Chain on a computation for when a future finished, passing the // result of the future to the provided closure f. let future_of_7 = new_future.then(|x| async move { x + 3 }); let seven = future_of_7.await; println!("{}", seven); assert_eq!(seven, 7); // Conditional `Either` future let x = 6; let future = if x > 10 { async { true }.left_future() } else { async { false }.right_future() }; let not_true: bool = future.await; assert!(!not_true); // Flatten nested futures let nested_future = async { async { 1 } }; let future = nested_future.flatten(); let flat = future.await; println!("{flat}"); assert_eq!(flat, 1); Ok(()) }
futures-util
Common utilities and extension traits for the futures-rs
⮳ library. Extensions to Rust's Future
and Stream
traits. Combinators and utilities for working with Futures
⮳, Streams
, Sinks
, and the AsyncRead
and AsyncWrite
traits.
use std::time::Duration; use futures::future; use futures_util::future::FutureExt; use futures_util::future::TryFutureExt; use futures_util::pin_mut; use tokio::time; // This example showcases various utilities for working with asynchronous code. // Add these dependencies to your Cargo.toml: // [dependencies] // futures = "0.3" // futures-util = "0.3" // tokio = { version = "1", features = ["full"] } // Simulated async function that returns a `Result`: async fn fetch_data(id: u32) -> Result<String, String> { // Simulate network delay time::sleep(Duration::from_millis(100)).await; if id % 3 == 0 { Err(format!("Error fetching data for id {}", id)) } else { Ok(format!("Data for id {}", id)) } } #[tokio::main] async fn main() { println!("\n===== Combining Futures ====="); // `select` for racing futures, // `join_all` for waiting on multiple futures, // `try_join` for handling errors in combined futures. // `ready` creates a future that is immediately ready with a value. let future1 = time::sleep(Duration::from_millis(100)).then(|_| future::ready(1)); let future2 = time::sleep(Duration::from_millis(50)).then(|_| future::ready(2)); // `select` requires `Future` + `Unpin` bounds. pin_mut!(future1); pin_mut!(future2); // Select between multiple futures: match future::select(future1, future2).await { future::Either::Left((val, _)) => { println!("Future 1 completed first with: {}", val) } future::Either::Right((val, _)) => { println!("Future 2 completed first with: {}", val) } } // Joining futures: let futures = vec![fetch_data(1), fetch_data(2)]; // The future returned by `join_all` will drive execution for all of its // underlying futures, collecting the results into a destination Vec<T> // in the same order as they were provided. let results: Vec<Result<String, String>> = future::join_all(futures).await; println!("Join results: {:?}", results); // `try_join`: // If successful, the returned future will finish with a tuple of both // results. If unsuccessful, it will complete with the first error // encountered. let results = future::try_join(fetch_data(1), fetch_data(4)).await; println!("try_join results: {:?}", results); println!("===== Future Extensions ====="); // `map` to transform future outputs, // `and_then` for chaining asynchronous operations, // `or_else` for error handling. // Using `map` to transform the output of a Future let future = fetch_data(1).map(|result| match result { Ok(data) => Ok(format!("Processed: {}", data)), Err(e) => Err(e), }); println!("Map result: {:?}", future.await); // Using `and_then` for chaining futures let future = fetch_data(2).and_then(|data| async move { // Simulate additional processing time::sleep(Duration::from_millis(50)).await; Ok(format!("Enhanced: {}", data)) }); println!("AndThen result: {:?}", future.await); // Using `or_else` for error handling let future = fetch_data(3).or_else(|err| async move { println!("Recovering from error: {}", err); // Return a fallback value Ok::<_, String>(String::from("Fallback data")) }); println!("OrElse result: {:?}", future.await); }
Related Topics
- Tokio.