Streams

RecipeCratesCategories
Streamsfuturescat-asynchronous

futures cat-asynchronous

Futures are about a single value that will eventually be produced, but many event sources naturally produce a futures::stream::Stream of values over time.

use futures::Stream;
use futures::stream;
use futures::stream::StreamExt;

async fn count_to_five() -> impl Stream<Item = u32> {
    stream::iter(1..=5)
}

#[tokio::main]
async fn main() {
    let mut stream = count_to_five().await;
    // `for` loops are not usable with Streams, but for imperative-style
    // code, `while let` and the `next`/`try_next` functions can be used:
    while let Some(num) = stream.next().await {
        println!("{}", num);
    }
}

There are combinator-style methods such as futures::prelude::stream::StreamExt::map⮳, futures::prelude::stream::StreamExt::filter⮳, and futures::prelude::stream::StreamExt::fold⮳, and their early-exit-on-error cousins futures::prelude::stream::TryStreamExt::try_filter⮳, and futures::prelude::stream::TryStreamExt::try_fold⮳.

To process multiple items from a stream concurrently, use the futures::prelude::stream::StreamExt::for_each_concurrent⮳ and futures::prelude::stream::TryStreamExt::try_for_each_concurrent⮳ methods:

use std::fs;

use futures::StreamExt;
use tokio::fs::File;
use tokio::io;

type Result = std::result::Result<(), anyhow::Error>;

async fn download_file(url: &str, filename: &str) -> Result {
    let response = reqwest::get(url).await?;
    let content = response.bytes().await?;
    let mut file = File::create(filename).await?;
    io::copy(&mut content.as_ref(), &mut file).await?;
    Ok(())
}

#[tokio::main]
async fn main() -> Result {
    let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
    let filenames = ["temp/file1.txt"]; // add more here...
    if !fs::exists("temp")? {
        fs::create_dir("temp")?;
    }

    let futures = urls
        .iter()
        .zip(filenames.iter())
        .map(|(url, filename)| download_file(url, filename));

    let fut = futures::stream::iter(futures).for_each_concurrent(
        4,
        |fut| async move {
            match fut.await {
                Err(e) => {
                    println!("Error: {}", e);
                    match e.source() {
                        Some(source) => {
                            println!("  Caused by: {}", source);
                        }
                        _ => {}
                    }
                }
                _ => {}
            }
        },
    );

    fut.await;

    println!("Downloaded files successfully!");
    Ok(())
}

See also

See also Tokio async-stream.

async-stream async_stream-github