Streams
Recipe | Crates | Categories |
---|---|---|
Streams |
[streams.incl: fix (P1)](https://github.com/john-cd/rust_howto/issues/219)
Futures are about a single value that will eventually be produced, but many event sources naturally produce a futures::stream::Stream
of values over time.
use futures::Stream; use futures::stream; use futures::stream::StreamExt; async fn count_to_five() -> impl Stream<Item = u32> { stream::iter(1..=5) } #[tokio::main] async fn main() { let mut stream = count_to_five().await; // `for` loops are not usable with Streams, but for imperative-style // code, `while let` and the `next`/`try_next` functions can be used: while let Some(num) = stream.next().await { println!("{}", num); } }
There are combinator-style methods such as futures::prelude::stream::StreamExt::map
⮳, futures::prelude::stream::StreamExt::filter
⮳, and futures::prelude::stream::StreamExt::fold
⮳, and their early-exit-on-error cousins futures::prelude::stream::TryStreamExt::try_filter
⮳, and futures::prelude::stream::TryStreamExt::try_fold
⮳.
To process multiple items from a stream concurrently, use the futures::prelude::stream::StreamExt::for_each_concurrent
⮳ and futures::prelude::stream::TryStreamExt::try_for_each_concurrent
⮳ methods:
use std::fs;
use futures::StreamExt;
use tokio::fs::File;
use tokio::io;
type Result = std::result::Result<(), anyhow::Error>;
async fn download_file(url: &str, filename: &str) -> Result {
let response = reqwest::get(url).await?;
let content = response.bytes().await?;
let mut file = File::create(filename).await?;
io::copy(&mut content.as_ref(), &mut file).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result {
let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
let filenames = ["temp/file1.txt"]; // add more here...
if !fs::exists("temp")? {
fs::create_dir("temp")?;
}
let futures = urls
.iter()
.zip(filenames.iter())
.map(|(url, filename)| download_file(url, filename));
let fut = futures::stream::iter(futures).for_each_concurrent(
4,
|fut| async move {
match fut.await {
Err(e) => {
println!("Error: {}", e);
match e.source() {
Some(source) => {
println!(" Caused by: {}", source);
}
_ => {}
}
}
_ => {}
}
},
);
fut.await;
println!("Downloaded files successfully!");
Ok(())
}
See also
[P1 add more. streams2.rs is noplayground because it requires a network. rewrite](https://github.com/john-cd/rust_howto/issues/645)