Streams, StreamController & Operators
Real-time Data Flows — From Chat to Sensors
Open interactive version (quiz + challenge)Real-world analogy
A Future is like ordering one pizza — you get one delivery. A Stream is like subscribing to a meal service — pizzas keep arriving over time until you cancel. You can filter (only pepperoni), transform (cut into slices with map), and stop anytime (cancel subscription).
What is it?
Streams are Dart's mechanism for delivering multiple async values over time. StreamController creates streams, async*/yield generates them, and operators like .map/.where/.distinct transform them. They're the backbone of reactive programming in Flutter, especially with BLoC state management.
Real-world relevance
In a real-time collaboration app, WebSocket messages arrive as a Stream. Each new chat message is a stream event. The app uses .where() to filter messages for the current channel, .map() to convert JSON to Message objects, and StreamBuilder to rebuild the chat UI. Without streams, real-time features are impossible.
Key points
- Stream vs Future — Future delivers ONE value later. Stream delivers ZERO or MORE values over time. Use Future for single async operations (API call). Use Stream for ongoing data (WebSocket messages, sensor data, user input, BLoC state changes).
- Single-Subscription vs Broadcast — Single-subscription: only ONE listener allowed (default). Broadcast: multiple listeners allowed. File reads are single-sub. UI events should be broadcast. Interview: What happens if you listen to a single-subscription stream twice? Answer: StateError.
- StreamController — StreamController is how you CREATE streams. Add events with .sink.add(). Listen with .stream.listen(). Always close controllers to prevent memory leaks. In production, BLoC internally uses StreamControllers for event/state management.
- async* and yield — Generator functions use async* and yield to produce streams. yield emits one value. yield* delegates to another stream. This is cleaner than manually creating StreamControllers for simple stream sources.
- Stream Operators — .map() transforms events. .where() filters events. .distinct() removes consecutive duplicates. .debounceTime() (rxdart) waits for a pause. .switchMap() (rxdart) cancels previous and starts new. These are real-time app essentials.
- Stream Listening and Cancellation — .listen() returns a StreamSubscription. ALWAYS store it and call .cancel() in dispose(). Memory leak interview question: What happens if you don't cancel a stream subscription? Answer: the listener keeps running, wasting memory and potentially causing setState-after-dispose errors.
- StreamBuilder in Flutter — StreamBuilder listens to a Stream and rebuilds the UI on each event. Like FutureBuilder but for ongoing data. Used with BLoC pattern: StreamBuilder listens to bloc.state stream. Has snapshot.connectionState and snapshot.data.
- Error and Done Events — Streams can emit data, error, or done events. .addError() sends an error. .close() sends done. StreamBuilder handles errors in snapshot.hasError. Always handle all three states in production code.
- Stream Transformers — StreamTransformer converts one stream type to another. Used internally by operators like .map() and .where(). You can create custom transformers for complex operations like batching, throttling, or retry logic.
- Practical Stream Patterns — Polling: Stream.periodic(). Merge: StreamGroup.merge() from async package. Buffer: collect events into batches. Debounce: wait for user to stop typing before searching. These patterns appear in production apps constantly.
Code example
// Streams — Real-time Data Flows
import 'dart:async';
// Creating a stream with async* generator
Stream<int> countDown(int from) async* {
for (var i = from; i >= 0; i--) {
await Future.delayed(Duration(seconds: 1));
yield i; // Emits one value at a time
}
}
// Listening to a stream
void main() async {
final subscription = countDown(5).listen(
(value) => print('Count: $value'),
onError: (e) => print('Error: $e'),
onDone: () => print('Done!'),
);
// Cancel after 3 seconds (important for memory!)
await Future.delayed(Duration(seconds: 3));
await subscription.cancel();
}
// StreamController — manual stream creation
class ChatService {
final _controller = StreamController<String>.broadcast();
Stream<String> get messages => _controller.stream;
void sendMessage(String msg) {
_controller.sink.add(msg);
}
void sendError(String error) {
_controller.addError(Exception(error));
}
void dispose() {
_controller.close(); // ALWAYS close to prevent memory leaks
}
}
// Stream operators — transform the flow
void processMessages(ChatService chat) {
chat.messages
.where((msg) => msg.isNotEmpty) // Filter empty
.map((msg) => msg.trim().toUpperCase()) // Transform
.distinct() // Skip consecutive dupes
.listen((msg) => print('Processed: $msg'));
}
// Stream.periodic — polling pattern
Stream<DateTime> clock = Stream.periodic(
Duration(seconds: 1),
(_) => DateTime.now(),
);
// await for — consuming a stream
Future<void> processStream(Stream<int> stream) async {
await for (final value in stream) {
print(value);
if (value == 0) break; // Exit early
}
}
// Single-subscription vs broadcast
final singleSub = StreamController<int>(); // ONE listener only
final broadcast = StreamController<int>.broadcast(); // MANY listeners OK
// StreamBuilder in Flutter
// StreamBuilder<String>(
// stream: chatService.messages,
// builder: (context, snapshot) {
// if (snapshot.hasError) return Text('Error!');
// if (!snapshot.hasData) return CircularProgressIndicator();
// return Text(snapshot.data!);
// },
// )Line-by-line walkthrough
- 1. Defining an async* generator function that produces a Stream of ints
- 2. Loop from the starting number down to zero
- 3. Wait one second between emissions
- 4. yield sends a single value into the stream
- 5. Subscribing to the stream with .listen()
- 6. onError callback handles any errors in the stream
- 7. onDone callback fires when the stream closes
- 8. Cancelling the subscription — critical for preventing memory leaks
- 9. StreamController.broadcast() allows multiple listeners
- 10. Exposing the stream as a read-only getter
- 11. Adding a value to the stream through the sink
- 12. Always close the controller in dispose() to prevent leaks
- 13. Chaining operators: where filters, map transforms, distinct deduplicates
- 14. Stream.periodic creates a stream that emits on a timer interval
Spot the bug
class DataService {
final _controller = StreamController<String>();
Stream<String> get data => _controller.stream;
void addData(String value) {
_controller.add(value);
}
}
// In a widget:
final service = DataService();
service.data.listen((d) => print('Listener 1: $d'));
service.data.listen((d) => print('Listener 2: $d'));
service.addData('hello');Need a hint?
How many listeners can a default StreamController support?
Show answer
Default StreamController is single-subscription — the second .listen() throws a StateError. Fix: Use StreamController<String>.broadcast() to allow multiple listeners. Also missing: dispose method to close the controller.
Explain like I'm 5
A Future is like getting a letter in the mail — one letter, one delivery. A Stream is like subscribing to a magazine — a new issue shows up every month until you cancel. You can tell the mailman: only give me the science magazines (filter), and mark the important articles (transform). But remember to cancel when you move, or magazines pile up at your old address (memory leak)!
Fun fact
The reactive programming paradigm that Streams enable was formalized by Erik Meijer at Microsoft, who called it 'the dual of the Iterator pattern.' Instead of pulling values from a collection (Iterator), you push values to observers (Stream). This insight led to Rx (ReactiveX) which influenced Dart's Stream API.
Hands-on challenge
Build a SearchService class with a StreamController that debounces search queries (using a Timer to wait 300ms after the last keystroke) and emits only distinct, non-empty queries. Include proper disposal.
More resources
- Asynchronous Programming: Streams (Dart Official)
- StreamController API (Dart Official)
- Creating Streams in Dart (Dart Official)
- RxDart Package (pub.dev)