When data arrives in waves
You are building a dashboard that tracks server metrics. Data arrives from three different APIs at unpredictable times. You cannot block the main thread waiting for the next CPU usage percentage, or the UI freezes. You need a way to handle items as they show up, one by one, without holding up the rest of the program. That is where async streams come in.
A Stream is an asynchronous iterator. An iterator gives you items one by one. A stream does the same thing, but each item might require waiting for I/O, a network packet, or a timer. Think of an iterator as a conveyor belt in a factory where items are already there. You just grab the next one. A stream is like a delivery truck arriving at random intervals. You stand at the dock and wait for the truck. When it arrives, you unload a box. Then you wait for the next truck.
The Stream trait formalizes this waiting pattern. It defines a poll_next method that the runtime calls to check if a new item is ready. If not, the runtime puts your task to sleep and wakes it up later. Your code describes what to do with items. The runtime handles the waiting.
Minimal example
The Stream trait itself is low-level. You almost always work with streams through the StreamExt extension trait, which adds ergonomic methods like next, filter, and map.
use trpl::StreamExt;
/// Consumes a stream of integers and prints each value.
#[trpl::main]
async fn main() {
// Create a stream from a vector.
// This simulates data arriving over time.
let stream = trpl::stream_from_iter(vec![1, 2, 3].into_iter());
// StreamExt provides the next method.
// We need a mutable stream to consume items.
let mut stream = stream;
// Loop until the stream ends.
// next returns Option<T>. Some keeps the loop going.
// None breaks the loop when the stream is exhausted.
while let Some(value) = stream.next().await {
println!("Received: {}", value);
}
}
How the runtime drives the loop
When you call stream.next().await, the runtime checks the stream's internal state. If an item is ready, it returns Some(item). If not, the .await point yields control back to the runtime. The runtime executes other tasks. When the stream produces an item, the runtime wakes your task up and resumes execution.
The while let pattern handles the Option returned by next. Some(value) keeps the loop going. None breaks the loop, signaling the stream is exhausted. This pattern prevents blocking. Your code does not spin-wait. It sleeps until work arrives. The runtime manages the scheduling. You just write the logic for processing items.
Treat the stream as a lazy pipeline. Combinators do not do work until you pull.
Realistic example: lazy pipelines
Streams shine when you chain operations. Combinators like filter and map work on streams just like they do on iterators. They are lazy. Calling filter does not process anything. It returns a new stream that wraps the original. The actual filtering happens inside the next call. This keeps memory usage low. You process one item at a time.
use trpl::StreamExt;
/// Demonstrates that stream combinators are lazy.
/// The filter closure only runs when next is called.
async fn lazy_pipeline() {
println!("Creating stream...");
// Build a pipeline.
// Nothing executes yet. This just constructs the stream object.
let stream = trpl::stream_from_iter(vec![1, 2, 3, 4, 5])
.filter(|x| {
println!("Filtering {}", x);
*x % 2 == 0
})
.map(|x| x * 10);
println!("Stream created. Waiting...");
// A small delay to prove no filtering happened yet.
trpl::sleep(trpl::Duration::from_millis(10)).await;
println!("Consuming first item...");
let mut stream = stream;
// Pulling the first item triggers the pipeline.
// The filter runs on 1 (rejected), then 2 (accepted).
if let Some(val) = stream.next().await {
println!("Got {}", val);
}
}
Run this code. You will see "Stream created" and "Waiting" print immediately. The "Filtering" messages only appear after "Consuming first item". The stream pulls items on demand. This is crucial for performance. If you have a stream of a million log lines but only need the first error, the stream stops after finding it. It never processes the rest.
Combining streams
Real applications often have multiple sources. Streams provide combinators to merge them. merge interleaves items from two streams. Items arrive as soon as they are ready from either source. zip pairs items up, waiting for both streams to produce an item.
use trpl::StreamExt;
/// Merges two streams into one.
/// Items arrive as soon as they are ready from either source.
async fn merge_streams() {
let s1 = trpl::stream_from_iter(vec![1, 2]);
let s2 = trpl::stream_from_iter(vec![10, 20]);
// merge creates a new stream that yields from s1 or s2.
let merged = s1.merge(s2);
let mut merged = merged;
while let Some(item) = merged.next().await {
println!("Merged: {}", item);
}
}
This pattern replaces complex state machines. Instead of tracking which source is active, you let the stream combinators handle the interleaving. The runtime polls both sources and yields whichever is ready.
Backpressure and channels
Streams naturally support backpressure. If the consumer is slow, the producer waits. This prevents memory exhaustion. You can see this with channels. A channel provides a Sender and a Receiver. The receiver implements Stream.
use trpl::StreamExt;
/// Shows backpressure via a channel.
/// The sender blocks if the receiver is slow.
async fn channel_backpressure() {
// Create a bounded channel.
// The buffer size limits how many items can be in flight.
let (tx, rx) = trpl::channel(2);
// Spawn a producer task.
trpl::spawn(async move {
for i in 0..5 {
// send awaits if the buffer is full.
// This yields control, allowing the receiver to catch up.
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
});
// Consumer processes slowly.
let mut rx = rx;
while let Some(item) = rx.next().await {
// Simulate slow processing.
trpl::sleep(trpl::Duration::from_millis(50)).await;
println!("Processed {}", item);
}
}
The tx.send call awaits when the buffer is full. The producer yields control. The runtime switches to the consumer. The consumer processes an item, freeing a slot. The producer wakes up and sends the next item. This flow control happens automatically. You do not need manual locking or condition variables.
Pitfalls and compiler errors
If you forget to import StreamExt, the compiler rejects your code with E0599 (no method named next found). The Stream trait itself does not have next. StreamExt adds it as an extension trait. Always import StreamExt when working with streams.
If you try to call next on an immutable stream, you get E0596 (cannot borrow as mutable). Consuming a stream changes its state. You must declare the stream variable as mut.
A subtle trap is blocking the runtime. If your stream logic calls a synchronous function that takes a long time, you hold up the entire executor. Async streams must yield control frequently. Use tokio::task::spawn_blocking for heavy CPU work, or ensure your I/O is non-blocking. If you block, you break the async contract. The runtime cannot make progress on other tasks while your task is stuck in a blocking call.
Don't block the executor. If you cannot yield, you are breaking the async contract.
Custom streams
If you need to wrap a C library callback or a custom event loop, you implement the Stream trait. The core method is poll_next. The runtime calls this with a Context. You return Poll::Ready(Some(item)) if data is ready, or Poll::Pending if you need to wait.
When returning Pending, you must register a waker so the runtime knows how to wake you up. This is the low-level glue. Most of the time, you do not write this. You use combinators or existing crates. But understanding poll_next explains why streams are non-blocking. The runtime drives the loop, not your code. You just report readiness.
When to use streams
Use Stream when you need to process a sequence of items that arrive asynchronously over time. Use Iterator when all data is available in memory and processing is purely computational. Use Channel when you need to send items from multiple producers to a single consumer across tasks. Use Stream when you are wrapping an existing async API that produces events, like a WebSocket or a file watcher. Use futures::stream::iter when you have a collection and want to turn it into a stream for compatibility with async combinators.
Conventions
The community convention is to import StreamExt explicitly. Do not rely on glob imports. It makes it clear where methods like next and filter come from. When consuming a stream, prefer while let Some(item) = stream.next().await over stream.for_each(|item| async { ... }). The for_each combinator can be tricky with async closures and often hides errors. The while let loop is explicit, easier to debug, and gives you full control over the loop body.
Use while let. It is the standard pattern for a reason.