When one receiver isn't enough
You are building a real-time dashboard. A sensor streams temperature readings every hundred milliseconds. Three widgets need to display the data: a gauge, a graph, and a log. You cannot send the reading to one widget and hope it forwards to the others. That creates a fragile chain where a single failure breaks the whole system. You need a mechanism where one sender pushes data, and multiple receivers each get their own copy independently.
That is the job of a broadcast channel.
Rust's standard library and tokio provide mpsc channels for one-to-one communication. mpsc routes a message to a single receiver. If you need one-to-many, mpsc falls short. tokio::sync::broadcast fills that gap. It allows multiple receivers to subscribe to a single sender. Every message is delivered to every active receiver. If a receiver is slow, it falls behind. If a receiver is disconnected, it misses messages. The sender never blocks.
Think of a radio station. The station transmits a signal. Every radio tuned to the frequency receives the same audio. If a radio is turned off, it misses the transmission. If a radio processes audio slowly, it falls behind the live stream. The station does not pause for slow radios. Broadcast channels work the same way. They prioritize throughput and independence over synchronization.
How broadcast channels work
A broadcast channel consists of a Sender and multiple Receiver instances. The channel holds a buffer with a fixed capacity. When the sender calls send, the message is placed in the buffer. The channel then notifies all receivers. Each receiver pulls the message from the buffer and clones the payload.
The payload type must implement Clone. The compiler enforces this requirement. If you try to send a type that does not implement Clone, you get E0277 (trait bound not satisfied). This is by design. Every receiver needs its own copy of the data. The channel cannot move the value to multiple places.
The buffer operates as a ring buffer. It has a head index pointing to the next write position and a tail index for each receiver. When the sender writes, the head advances. When a receiver reads, its tail advances. The distance between head and tail represents the lag. If the lag exceeds the buffer capacity, the receiver has fallen too far behind. The channel cannot recover the missing messages. The receiver enters a lagged state.
Convention aside: channel capacities should be powers of two. The underlying implementation uses bitwise masking for index calculations. Powers of two avoid expensive modulo operations. If you pick 16, 32, or 64, you get a small performance win for free.
Minimal example
Create a channel, spawn a sender, and spawn multiple receivers. Each receiver runs in its own task.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
// Capacity 16. Power of two helps the allocator and index math.
let (tx, mut rx1) = broadcast::channel::<String>(16);
// Create a second receiver. Each receiver is independent.
// Convention: use tx.subscribe() for clarity. rx1.clone() works but looks like a deep copy.
let mut rx2 = tx.subscribe();
// Spawn a sender task
tokio::spawn(async move {
// send returns Result. Unwrap is acceptable in examples.
// In production, handle the error to detect full buffers or disconnected receivers.
tx.send("Update A".to_string()).unwrap();
tx.send("Update B".to_string()).unwrap();
});
// Receiver 1 task
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("Widget 1: {msg}");
}
});
// Receiver 2 task
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("Widget 2: {msg}");
}
});
// Keep main alive briefly to let tasks run
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
The channel function returns a Sender and an initial Receiver. The subscribe method creates additional receivers. Each receiver maintains its own position in the stream. Dropping a receiver does not affect others. The sender continues until all receivers are dropped or the sender is dropped.
Anatomy of a message
Messages flow through the channel via atomic operations. The sender increments a head counter. Receivers read from the buffer at their tail counter and increment the tail. This design allows high concurrency. Multiple receivers can read simultaneously without locking.
The send method returns Result<(), SendError<T>>. The error occurs in two cases. First, the buffer is full. The sender cannot write because the oldest messages have not been consumed by at least one receiver. Second, all receivers have been dropped. There is no one to receive the message. The SendError does not distinguish between these cases. You must check tx.receiver_count() if you need to know whether the channel is closed.
The recv method returns Result<T, RecvError>. The error enum has two variants. Lagged indicates the receiver fell behind. Closed indicates the sender was dropped and the buffer is empty. A Lagged error is not fatal. The receiver remains valid. You can continue receiving. The next call to recv might return a message or another Lagged error.
Convention aside: never unwrap the result of recv in production code. A Lagged error will panic your task. Handle the error explicitly. Decide whether to skip lagged messages, log a warning, or resubscribe.
Handling lag and errors
Lag is the defining characteristic of broadcast channels. Receivers can fall behind. When that happens, you must choose a recovery strategy.
If your application can tolerate missing data, skip the lagged messages. Continue the loop. The receiver will eventually catch up if the sender slows down or the receiver speeds up. If the lag persists, the receiver will keep receiving Lagged errors. You might want to cap the number of retries or resubscribe to get a fresh stream.
If your application requires consistency, resubscribe. Dropping the lagged receiver and calling tx.subscribe() creates a new receiver at the current head. You lose the lagged messages, but you also reset the position. This is useful for dashboards where showing the latest state is better than showing stale data with gaps.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<i32>(8);
// Sender pushes data rapidly
tokio::spawn(async move {
for i in 0..20 {
// Check send error. If all receivers are gone, stop sending.
if tx.send(i).is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});
// Receiver simulates slow processing
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(val) => println!("Received: {val}"),
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Missed {n} messages. Resubscribing to catch up.");
// Resubscribe gets a fresh receiver at the current head.
// This discards the lagged state and starts fresh.
break;
}
Err(broadcast::error::RecvError::Closed) => {
println!("Channel closed.");
break;
}
}
}
});
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Resubscribing is a reset button. It does not recover lost data. It only recovers the ability to receive new data. Use it when the cost of lag is higher than the cost of skipping.
Realistic dashboard
Combine the concepts into a dashboard scenario. A sensor sends readings. Two widgets consume them. One widget updates frequently. The other updates slowly and handles lag by resubscribing.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
// Capacity 32. Enough for short bursts.
let (tx, mut rx_gauge) = broadcast::channel::<f64>(32);
let mut rx_graph = tx.subscribe();
// Sensor task
tokio::spawn(async move {
let mut temp = 20.0;
for _ in 0..100 {
temp += (rand::random::<f64>() - 0.5);
// send returns Err if buffer is full or no receivers.
// Dropping the message is acceptable for sensor data.
let _ = tx.send(temp);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
});
// Gauge widget: fast, handles lag by skipping
tokio::spawn(async move {
while let Ok(temp) = rx_gauge.recv().await {
println!("Gauge: {temp:.1}°C");
}
});
// Graph widget: slow, resubscribes on lag
tokio::spawn(async move {
loop {
match rx_graph.recv().await {
Ok(temp) => {
println!("Graph: {temp:.1}°C");
// Simulate slow rendering
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Graph lagged {n} messages. Resetting.");
// Resubscribe to get fresh data.
// Note: in a real app, you'd need access to tx to resubscribe.
// This example simplifies by breaking.
break;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
The gauge keeps up and prints every reading. The graph falls behind because it sleeps longer than the sender interval. When lag occurs, the graph resets. In a real application, you would pass the Sender to the graph task so it can call tx.subscribe() directly. The example breaks the loop for brevity.
Pitfalls
Broadcast channels have trade-offs. Understanding them prevents subtle bugs.
The send method does not block. If the buffer is full, send returns an error immediately. This means messages can be dropped silently if you ignore the result. Always check the return value of send. If dropping messages is acceptable, use let _ = tx.send(msg). If dropping is not acceptable, you need backpressure. Broadcast channels do not provide backpressure. You must implement it yourself or choose a different channel.
Receivers are independent. Cloning a receiver does not share state. rx.clone() creates a new receiver at the current head. It does not inherit the lag of the original receiver. This is different from mpsc where cloning a receiver shares the queue. In broadcast, every clone is a fresh subscription.
Large payloads hurt performance. Every message is cloned for every receiver. If you have ten receivers and a large struct, the sender pays the clone cost ten times. Use Arc or Rc inside the channel if the payload is expensive to clone. The channel will clone the smart pointer, which is cheap. The data stays shared.
Convention aside: prefer tx.subscribe() over rx.clone(). Both produce a new receiver. subscribe communicates intent. clone looks like you are copying the receiver object, which is misleading. The community uses subscribe to signal a new subscription.
Decision matrix
Choose the right channel for your communication pattern.
Use broadcast when you have one producer and multiple consumers, and every consumer must receive every message. Use broadcast when consumers can tolerate lag or missing data. Use broadcast when you need high throughput and independent receiver lifecycles.
Use mpsc when a single consumer processes messages. It is faster and simpler. Use mpsc when you need backpressure. The sender blocks when the buffer is full. Use mpsc when messages should be delivered to exactly one receiver.
Use watch when consumers only care about the latest value. It avoids buffering and lag entirely. Use watch for configuration updates, state flags, or settings. Use watch when you want receivers to always see the current state without managing lag.
Use broadcast with small payloads. Large messages get cloned for every receiver, which burns CPU and memory. Wrap large data in Arc if you must use broadcast.
Broadcast channels trade backpressure for simplicity. If your receivers cannot keep up, messages vanish. Design your system to tolerate loss, or pick a different channel.