Streams

RecipeCratesCategories
Streamsfuturescat-asynchronous

futures cat-asynchronous

Futures are about a single value that will eventually be produced, but many event sources naturally produce a futures::stream::Stream of values over time.

use futures::Stream;
use futures::stream;
use futures::stream::StreamExt;

async fn count_to_five() -> impl Stream<Item = u32> {
    stream::iter(1..=5)
}

#[tokio::main]
async fn main() {
    let mut stream = count_to_five().await;
    // `for` loops are not usable with Streams, but for imperative-style
    // code, `while let` and the `next`/`try_next` functions can be used:
    while let Some(num) = stream.next().await {
        println!("{}", num);
    }
}

There are combinator-style methods such as futures::prelude::stream::StreamExt::map⮳, futures::prelude::stream::StreamExt::filter⮳, and futures::prelude::stream::StreamExt::fold⮳, and their early-exit-on-error cousins futures::prelude::stream::TryStreamExt::try_filter⮳, and futures::prelude::stream::TryStreamExt::try_fold⮳.

To process multiple items from a stream concurrently, use the futures::prelude::stream::StreamExt::for_each_concurrent⮳ and futures::prelude::stream::TryStreamExt::try_for_each_concurrent⮳ methods:

use std::fs;

use futures::StreamExt;
use tokio::fs::File;
use tokio::io;

type Result = std::result::Result<(), anyhow::Error>;

async fn download_file(url: &str, filename: &str) -> Result {
    let response = reqwest::get(url).await?;
    let content = response.bytes().await?;
    let mut file = File::create(filename).await?;
    io::copy(&mut content.as_ref(), &mut file).await?;
    Ok(())
}

#[tokio::main]
async fn main() -> Result {
    let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
    let filenames = ["temp/file1.txt"]; // add more here...
    if !fs::exists("temp")? {
        fs::create_dir("temp")?;
    }

    let futures = urls
        .iter()
        .zip(filenames.iter())
        .map(|(url, filename)| download_file(url, filename));

    let fut = futures::stream::iter(futures).for_each_concurrent(
        4,
        |fut| async move {
            match fut.await {
                Err(e) => {
                    println!("Error: {}", e);
                    match e.source() {
                        Some(source) => {
                            println!("  Caused by: {}", source);
                        }
                        _ => {}
                    }
                }
                _ => {}
            }
        },
    );

    fut.await;

    println!("Downloaded files successfully!");
    Ok(())
}

The following example showcases various utilities for working with streams:

use std::time::Duration;

use futures_util::stream;
use futures_util::stream::StreamExt;
// You may also need:
// use futures_util::stream::TryStreamExt;
use tokio::time;

// Add these dependencies to your Cargo.toml:
// [dependencies]
// futures = "0.3"
// futures-util = "0.3"
// tokio = { version = "1", features = ["full"] }

// Simulated async function that returns a `Result`:
async fn fetch_data(id: u32) -> Result<String, String> {
    // Simulate network delay
    time::sleep(Duration::from_millis(100)).await;
    if id % 3 == 0 {
        Err(format!("Error fetching data for id {}", id))
    } else {
        Ok(format!("Data for id {}", id))
    }
}

#[tokio::main]
async fn main() {
    println!("\n===== Stream Utilities =====");

    // Create a stream of futures:
    // `iter` converts an `Iterator` into a `Stream`
    let stream = stream::iter(vec![1, 2, 3, 4, 5]).map(fetch_data);

    // Process stream with `for_each_concurrent`, which
    // runs this stream to completion, executing the provided asynchronous
    // closure for each element on the stream concurrently as elements become
    // available. This is similar to `StreamExt::for_each`, but the futures
    // produced by the closure are run concurrently (but not in parallel - this
    // combinator does not introduce any threads).
    stream
        .for_each_concurrent(2, |future| async {
            match future.await {
                Ok(data) => println!("Processed concurrently: {}", data),
                Err(e) => println!("Error: {}", e),
            }
        })
        .await;

    println!("\n===== Stream Transformations =====");

    // Create a stream with successful and error results.
    let stream = stream::iter(vec![
        Ok::<_, String>("Item 1"),
        Ok("Item 2"),
        Err("Error 1".to_string()),
        Ok("Item 3"),
        Err("Error 2".to_string()),
    ]);

    // Using `filter_map` on a stream.
    // Filters the values produced by this stream
    // while simultaneously mapping them to a different type.
    let filtered: Vec<_> = stream
        .filter_map(|res| async move {
            match res {
                Ok(item) => Some(format!("Filtered: {}", item)),
                Err(_) => None, // Filter out errors
            }
        })
        .collect()
        .await;

    println!("Filtered stream results: {:?}", filtered);
}

See Also

See also Tokio async-stream.

async-stream async_stream-github