Lesson 4 of 77 intermediate

Streams, StreamController & Operators

Real-time Data Flows — From Chat to Sensors

Open interactive version (quiz + challenge)

Real-world analogy

A Future is like ordering one pizza — you get one delivery. A Stream is like subscribing to a meal service — pizzas keep arriving over time until you cancel. You can filter (only pepperoni), transform (cut into slices with map), and stop anytime (cancel subscription).

What is it?

Streams are Dart's mechanism for delivering multiple async values over time. StreamController creates streams, async*/yield generates them, and operators like .map/.where/.distinct transform them. They're the backbone of reactive programming in Flutter, especially with BLoC state management.

Real-world relevance

In a real-time collaboration app, WebSocket messages arrive as a Stream. Each new chat message is a stream event. The app uses .where() to filter messages for the current channel, .map() to convert JSON to Message objects, and StreamBuilder to rebuild the chat UI. Without streams, real-time features are impossible.

Key points

Code example

// Streams — Real-time Data Flows

import 'dart:async';

// Creating a stream with async* generator
Stream<int> countDown(int from) async* {
  for (var i = from; i >= 0; i--) {
    await Future.delayed(Duration(seconds: 1));
    yield i;  // Emits one value at a time
  }
}

// Listening to a stream
void main() async {
  final subscription = countDown(5).listen(
    (value) => print('Count: $value'),
    onError: (e) => print('Error: $e'),
    onDone: () => print('Done!'),
  );

  // Cancel after 3 seconds (important for memory!)
  await Future.delayed(Duration(seconds: 3));
  await subscription.cancel();
}

// StreamController — manual stream creation
class ChatService {
  final _controller = StreamController<String>.broadcast();

  Stream<String> get messages => _controller.stream;

  void sendMessage(String msg) {
    _controller.sink.add(msg);
  }

  void sendError(String error) {
    _controller.addError(Exception(error));
  }

  void dispose() {
    _controller.close(); // ALWAYS close to prevent memory leaks
  }
}

// Stream operators — transform the flow
void processMessages(ChatService chat) {
  chat.messages
      .where((msg) => msg.isNotEmpty)           // Filter empty
      .map((msg) => msg.trim().toUpperCase())    // Transform
      .distinct()                                 // Skip consecutive dupes
      .listen((msg) => print('Processed: $msg'));
}

// Stream.periodic — polling pattern
Stream<DateTime> clock = Stream.periodic(
  Duration(seconds: 1),
  (_) => DateTime.now(),
);

// await for — consuming a stream
Future<void> processStream(Stream<int> stream) async {
  await for (final value in stream) {
    print(value);
    if (value == 0) break; // Exit early
  }
}

// Single-subscription vs broadcast
final singleSub = StreamController<int>();         // ONE listener only
final broadcast = StreamController<int>.broadcast(); // MANY listeners OK

// StreamBuilder in Flutter
// StreamBuilder<String>(
//   stream: chatService.messages,
//   builder: (context, snapshot) {
//     if (snapshot.hasError) return Text('Error!');
//     if (!snapshot.hasData) return CircularProgressIndicator();
//     return Text(snapshot.data!);
//   },
// )

Line-by-line walkthrough

  1. 1. Defining an async* generator function that produces a Stream of ints
  2. 2. Loop from the starting number down to zero
  3. 3. Wait one second between emissions
  4. 4. yield sends a single value into the stream
  5. 5. Subscribing to the stream with .listen()
  6. 6. onError callback handles any errors in the stream
  7. 7. onDone callback fires when the stream closes
  8. 8. Cancelling the subscription — critical for preventing memory leaks
  9. 9. StreamController.broadcast() allows multiple listeners
  10. 10. Exposing the stream as a read-only getter
  11. 11. Adding a value to the stream through the sink
  12. 12. Always close the controller in dispose() to prevent leaks
  13. 13. Chaining operators: where filters, map transforms, distinct deduplicates
  14. 14. Stream.periodic creates a stream that emits on a timer interval

Spot the bug

class DataService {
  final _controller = StreamController<String>();

  Stream<String> get data => _controller.stream;

  void addData(String value) {
    _controller.add(value);
  }
}

// In a widget:
final service = DataService();
service.data.listen((d) => print('Listener 1: $d'));
service.data.listen((d) => print('Listener 2: $d'));
service.addData('hello');
Need a hint?
How many listeners can a default StreamController support?
Show answer
Default StreamController is single-subscription — the second .listen() throws a StateError. Fix: Use StreamController<String>.broadcast() to allow multiple listeners. Also missing: dispose method to close the controller.

Explain like I'm 5

A Future is like getting a letter in the mail — one letter, one delivery. A Stream is like subscribing to a magazine — a new issue shows up every month until you cancel. You can tell the mailman: only give me the science magazines (filter), and mark the important articles (transform). But remember to cancel when you move, or magazines pile up at your old address (memory leak)!

Fun fact

The reactive programming paradigm that Streams enable was formalized by Erik Meijer at Microsoft, who called it 'the dual of the Iterator pattern.' Instead of pulling values from a collection (Iterator), you push values to observers (Stream). This insight led to Rx (ReactiveX) which influenced Dart's Stream API.

Hands-on challenge

Build a SearchService class with a StreamController that debounces search queries (using a Timer to wait 300ms after the last keystroke) and emits only distinct, non-empty queries. Include proper disposal.

More resources

Open interactive version (quiz + challenge) ← Back to course: Flutter Interview Mastery