Streams
Recipe | Crates | Categories |
---|---|---|
Streams |
Futures are about a single value that will eventually be produced, but many event sources naturally produce a futures::stream::Stream
of values over time.
use futures::Stream; use futures::stream; use futures::stream::StreamExt; async fn count_to_five() -> impl Stream<Item = u32> { stream::iter(1..=5) } #[tokio::main] async fn main() { let mut stream = count_to_five().await; // `for` loops are not usable with Streams, but for imperative-style // code, `while let` and the `next`/`try_next` functions can be used: while let Some(num) = stream.next().await { println!("{}", num); } }
There are combinator-style methods such as futures::prelude::stream::StreamExt::map
⮳, futures::prelude::stream::StreamExt::filter
⮳, and futures::prelude::stream::StreamExt::fold
⮳, and their early-exit-on-error cousins futures::prelude::stream::TryStreamExt::try_filter
⮳, and futures::prelude::stream::TryStreamExt::try_fold
⮳.
To process multiple items from a stream concurrently, use the futures::prelude::stream::StreamExt::for_each_concurrent
⮳ and futures::prelude::stream::TryStreamExt::try_for_each_concurrent
⮳ methods:
use std::fs;
use futures::StreamExt;
use tokio::fs::File;
use tokio::io;
type Result = std::result::Result<(), anyhow::Error>;
async fn download_file(url: &str, filename: &str) -> Result {
let response = reqwest::get(url).await?;
let content = response.bytes().await?;
let mut file = File::create(filename).await?;
io::copy(&mut content.as_ref(), &mut file).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result {
let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
let filenames = ["temp/file1.txt"]; // add more here...
if !fs::exists("temp")? {
fs::create_dir("temp")?;
}
let futures = urls
.iter()
.zip(filenames.iter())
.map(|(url, filename)| download_file(url, filename));
let fut = futures::stream::iter(futures).for_each_concurrent(
4,
|fut| async move {
match fut.await {
Err(e) => {
println!("Error: {}", e);
match e.source() {
Some(source) => {
println!(" Caused by: {}", source);
}
_ => {}
}
}
_ => {}
}
},
);
fut.await;
println!("Downloaded files successfully!");
Ok(())
}
The following example showcases various utilities for working with streams:
use std::time::Duration;
use futures_util::stream;
use futures_util::stream::StreamExt;
// You may also need:
// use futures_util::stream::TryStreamExt;
use tokio::time;
// Add these dependencies to your Cargo.toml:
// [dependencies]
// futures = "0.3"
// futures-util = "0.3"
// tokio = { version = "1", features = ["full"] }
// Simulated async function that returns a `Result`:
async fn fetch_data(id: u32) -> Result<String, String> {
// Simulate network delay
time::sleep(Duration::from_millis(100)).await;
if id % 3 == 0 {
Err(format!("Error fetching data for id {}", id))
} else {
Ok(format!("Data for id {}", id))
}
}
#[tokio::main]
async fn main() {
println!("\n===== Stream Utilities =====");
// Create a stream of futures:
// `iter` converts an `Iterator` into a `Stream`
let stream = stream::iter(vec![1, 2, 3, 4, 5]).map(fetch_data);
// Process stream with `for_each_concurrent`, which
// runs this stream to completion, executing the provided asynchronous
// closure for each element on the stream concurrently as elements become
// available. This is similar to `StreamExt::for_each`, but the futures
// produced by the closure are run concurrently (but not in parallel - this
// combinator does not introduce any threads).
stream
.for_each_concurrent(2, |future| async {
match future.await {
Ok(data) => println!("Processed concurrently: {}", data),
Err(e) => println!("Error: {}", e),
}
})
.await;
println!("\n===== Stream Transformations =====");
// Create a stream with successful and error results.
let stream = stream::iter(vec![
Ok::<_, String>("Item 1"),
Ok("Item 2"),
Err("Error 1".to_string()),
Ok("Item 3"),
Err("Error 2".to_string()),
]);
// Using `filter_map` on a stream.
// Filters the values produced by this stream
// while simultaneously mapping them to a different type.
let filtered: Vec<_> = stream
.filter_map(|res| async move {
match res {
Ok(item) => Some(format!("Filtered: {}", item)),
Err(_) => None, // Filter out errors
}
})
.collect()
.await;
println!("Filtered stream results: {:?}", filtered);
}
See Also
See also Tokio async-stream
⮳.