How to use async streams

Use the await keyword on async functions to handle non-blocking I/O operations efficiently in Rust.

When one value isn't enough

You are building a log watcher. It needs to read lines from a file as they appear, or a WebSocket client that receives messages indefinitely. You write an async function, but you hit a wall. An async function returns one value. You need a sequence of values that arrive over time. You can't just return multiple times. You need a stream.

In Python, you'd use an async generator with yield. In JavaScript, you'd use an async iterator. Rust has Stream. It gives you the same capability: produce values lazily over time, interleaved with async work, without blocking the executor.

Reach for Stream when you need to model a sequence of events, a chunked download, or a real-time feed. A single Future resolves once. A Stream resolves many times.

Streams are async iterators

An iterator produces values synchronously. You call .next() and get the next item immediately. A stream produces values asynchronously. You call .next().await and the stream might pause while it waits for I/O, a timer, or a network packet before handing you the next item.

Think of an iterator as a deck of cards sitting on the table. You flip them one by one instantly. A stream is a card dealer who fetches each card from a vault. You ask for a card, the dealer runs to the vault, brings it back, and hands it to you. You can't get the next card until the dealer returns.

The Stream trait defines this behavior. It lives in the futures crate, and Tokio re-exports it as tokio_stream::Stream. The trait looks intimidating because it involves Pin and Poll, but you rarely implement it by hand. You use macros or combinators instead.

A Future is a promise of one result. A Stream is a pipeline of results.

Creating a stream with async-stream

The standard library does not include a stream! macro. The community standard is the async-stream crate. It provides a macro that lets you write stream logic that looks like a normal async function, using yield to emit values.

Add async-stream and futures to your dependencies. The futures crate provides the StreamExt trait, which adds methods like .next(), .filter(), and .map() to streams.

use async_stream::stream;
use futures::StreamExt;
use std::time::Duration;

/// Yields numbers from 0 to 4 with a delay between each.
async fn count_with_delay() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..5 {
            // Yield pauses the stream and sends the value to the consumer.
            // The stream resumes here when the consumer calls .next().await again.
            yield i;
            
            // Async work inside the stream is allowed.
            // This simulates waiting for external data.
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    // Consume the stream by awaiting .next() repeatedly.
    // StreamExt::next returns Option<Item>.
    let mut stream = count_with_delay();
    
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

The stream! macro generates a state machine behind the scenes. It captures your code, tracks where you are, and implements the Stream trait for you. You write imperative logic; the macro handles the plumbing.

Let the macro manage the state. You focus on the logic.

How the state machine works

When you call count_with_delay(), you don't get numbers immediately. You get a Stream object. This object holds the code and the current position. It knows it hasn't started yet.

When you call .next().await, the stream runs until it hits yield. It hands you the value 0 and pauses. The stream object now remembers it is paused right after the first yield.

The next .next().await resumes execution. It runs the sleep, then hits the next yield, hands you 1, and pauses again. This continues until the loop finishes. When the stream reaches the end of the stream! block, .next().await returns None.

The macro generates an enum to represent these states. Something like Running, Yielded, Finished. Each yield point becomes a variant. The compiler ensures the state machine is correct and safe. You don't need to write the enum yourself.

Async streams turn complex I/O loops into clean, reusable pipelines.

Real-world pattern: filtering async data

Streams shine when you need to filter, transform, or combine async data sources. You can chain combinators just like iterators, but every step can await.

Imagine a function that reads lines from a file and yields only those containing "ERROR". You want to process errors as they appear, not wait for the whole file to be read.

use async_stream::stream;
use futures::StreamExt;
use tokio::fs;
use tokio::io::AsyncBufReadExt;

/// Yields lines from a file that contain "ERROR".
/// The stream emits values as soon as they are found.
async fn error_lines(path: &str) -> impl Stream<Item = String> {
    // Open the file asynchronously.
    // Unwrap is used here for brevity; production code should handle errors.
    let file = fs::File::open(path).await.unwrap();
    let reader = tokio::io::BufReader::new(file);
    
    // Lines iterator yields Result<String, Error>.
    let mut lines = reader.lines();

    stream! {
        while let Some(line_result) = lines.next().await {
            // Unwrap the line result.
            let line = line_result.unwrap();
            
            // Filter logic inside the stream.
            // Only yield lines that match the condition.
            if line.contains("ERROR") {
                yield line;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    // Create the stream.
    let mut errors = error_lines("app.log");
    
    // Process errors as they arrive.
    // This loop runs concurrently with the file reading.
    while let Some(error_line) = errors.next().await {
        println!("Alert: {}", error_line);
    }
}

The stream! macro lets you mix async I/O with stream logic naturally. You can use if, while, match, and await inside the block. The generated stream handles backpressure and state automatically.

Convention aside: The community prefers async-stream for user code because it reads like normal Rust. Library authors sometimes implement Stream manually to avoid the macro overhead, but that requires writing a state machine by hand and handling Pin manually. For 99% of use cases, async-stream is the right choice.

Async streams let you write reactive code without callback hell.

Pitfalls and compiler errors

Streams have rules. The compiler enforces them strictly. The most common issues involve borrowing and moving values.

Yielding references fails

You cannot yield a reference to a local variable. The stream might pause, but the local variable goes out of scope when the stream function returns or when the scope ends. The stream object lives longer than the yield point.

use async_stream::stream;

async fn bad_stream() -> impl Stream<Item = &'static str> {
    stream! {
        let msg = String::from("hello");
        // This fails. msg is local to the stream function.
        // The stream might outlive msg, so a reference is unsafe.
        yield &msg;
    }
}

The compiler rejects this with E0515 (cannot return value referencing local variable). The error message will point out that the stream might outlive the local data.

Fix this by yielding owned data. Clone the string or move it into the yield.

use async_stream::stream;

async fn good_stream() -> impl Stream<Item = String> {
    stream! {
        let msg = String::from("hello");
        // Yield takes ownership of msg.
        // The stream now owns the String.
        yield msg;
    }
}

Yield owned values. The borrow checker will enforce it, and your code will be safer for the effort.

Yielding moves values

yield moves the value. You cannot yield the same variable twice.

use async_stream::stream;

async fn double_yield() -> impl Stream<Item = u32> {
    stream! {
        let x = 42;
        yield x;
        // This fails. x was moved into the first yield.
        // E0382: use of moved value `x`.
        yield x;
    }
}

The compiler rejects this with E0382 (use of moved value). If you need to yield the same data multiple times, clone it or use a reference type like Rc<T> if the data is shared.

Trait bounds and return types

When you return impl Stream<Item = T>, the compiler checks that the stream satisfies the trait bounds. If you use a type that doesn't implement Send, the stream won't be Send. This matters if you want to move the stream across threads.

use async_stream::stream;
use std::rc::Rc;

// This stream is not Send because Rc is not Send.
async fn non_send_stream() -> impl Stream<Item = Rc<u32>> {
    stream! {
        yield Rc::new(42);
    }
}

// If you try to send this to another thread, you get E0277.
// The error says the trait bound Send is not satisfied.

The compiler rejects cross-thread usage with E0277 (trait bound not satisfied). Use Arc<T> instead of Rc<T> if you need thread safety.

Convention aside: Use Rc::clone(&data) instead of data.clone() when working with reference-counted types in streams. The explicit form signals to readers that you are cloning the pointer, not the underlying data. It prevents confusion when scanning the code.

Choosing the right stream tool

Rust offers several ways to work with streams. Pick the tool that matches your complexity.

Use async-stream when you need to write imperative logic with yield, if, and loops to produce a sequence of async values. This is the default choice for most applications.

Use futures::stream combinators when you already have a stream and need to map, filter, or chain it without writing a custom generator. Combinators like .filter_map() and .take() compose cleanly.

Use a manual Stream implementation when you are building a low-level library and need to avoid the overhead of the async-stream macro. This requires writing a state machine enum and handling Pin manually.

Use tokio_stream when your stream is tightly coupled to Tokio types like mpsc::Receiver or watch::Receiver. The crate provides adapters that turn Tokio channels into streams seamlessly.

Choose the tool that matches your complexity. Don't write a state machine if a macro does it for you.

Where to go next