Streams
Recipe | Crates | Categories |
---|---|---|
Streams |
TODO
Futures are about a single value that will eventually be produced, but many event sources naturally produce a futures::stream::Stream
of values over time.
use futures::stream; use futures::stream::StreamExt; use futures::Stream; async fn count_to_five() -> impl Stream<Item = u32> { stream::iter(1..=5) } #[tokio::main] async fn main() { let mut stream = count_to_five().await; // `for` loops are not usable with Streams, but for imperative-style // code, `while let` and the `next`/`try_next` functions can be used: while let Some(num) = stream.next().await { println!("{}", num); } }
There are combinator-style methods such as futures::prelude::stream::StreamExt::map
⮳, futures::prelude::stream::StreamExt::filter
⮳, and futures::prelude::stream::StreamExt::fold
⮳, and their early-exit-on-error cousins futures::prelude::stream::TryStreamExt::try_filter
⮳, and futures::prelude::stream::TryStreamExt::try_fold
⮳.
To process multiple items from a stream concurrently, use the futures::prelude::stream::StreamExt::for_each_concurrent
⮳ and futures::prelude::stream::TryStreamExt::try_for_each_concurrent
⮳ methods:
use futures::StreamExt;
use tokio::fs::File;
use tokio::io;
type Result = std::result::Result<(), anyhow::Error>;
async fn download_file(url: &str, filename: &str) -> Result {
let response = reqwest::get(url).await?;
let content = response.bytes().await?;
let mut file = File::create(filename).await?;
io::copy(&mut content.as_ref(), &mut file).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result {
let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
let filenames = ["temp/file1.txt"]; // add more here...
let futures = urls
.iter()
.zip(filenames.iter())
.map(|(url, filename)| download_file(url, filename));
let fut = futures::stream::iter(futures).for_each_concurrent(
4,
|fut| async move {
if let Err(e) = fut.await {
println!("Error: {}", e);
if let Some(source) = e.source() {
println!(" Caused by: {}", source);
}
}
},
);
fut.await;
println!("Downloaded files successfully!");
Ok(())
}
See also
TODO: add more?