Flow Fundamentals: Cold Streams, Operators & Collection
Kotlin Flow — reactive streams for modern Android data pipelines
Open interactive version (quiz + challenge)Real-world analogy
What is it?
Kotlin Flow is a cold asynchronous stream that emits multiple values over time. It is built on coroutines and is the idiomatic way to model reactive data streams in Android. Flow is used for database queries (Room returns Flow>), network polling, sensor data, and UI state streams. Understanding cold vs hot, operator placement, and flowOn is essential for senior interviews.
Real-world relevance
In a school management platform using Room + Firebase, Room returns Flow> — a cold flow that re-emits whenever the database changes. The ViewModel uses map to convert entities to UI models, filter to show only enrolled students, combine to merge attendance data, and flowOn(Dispatchers.IO) for the database work. The UI collects using repeatOnLifecycle to avoid collecting in the background.
Key points
- Flow is cold — A Flow does nothing until it is collected — the producer code inside flow { } runs fresh for EACH collector, and only when collect {} is called. This is fundamentally different from hot streams (SharedFlow/StateFlow) which produce values regardless of collectors.
- flow {} builder — flow { emit(value) } creates a cold flow. The lambda runs only when collected. Can call suspend functions and delay inside. Each collect{} call re-runs the entire producer from scratch — like re-running a function.
- flowOf and asFlow — flowOf(1, 2, 3) creates a finite flow from fixed values. listOf(1,2,3).asFlow() converts a collection to a Flow. channelFlow { } for producer-consumer patterns with concurrent emissions.
- Terminal operators — collect { }, first(), toList(), reduce(), fold() — these are terminal operators that trigger collection. They are suspend functions. The flow does not produce values until one of these is called. collect {} collects every value; first() collects just one.
- Intermediate operators: map/filter/onEach — map { transform it } returns a new Flow with transformed values. filter { predicate } returns only matching values. onEach { sideEffect(it) } performs a side effect for each item without transforming (use for logging, analytics). All lazy — no execution until collect.
- catch operator — flow.catch { e -> emit(defaultValue) } catches exceptions from upstream and can emit fallback values. ONLY catches upstream exceptions — not exceptions in collect{}. Place catch before the terminal operator.
- retry and retryWhen — flow.retry(3) { e -> e is IOException } retries the entire flow up to 3 times when the exception matches. retryWhen { cause, attempt -> delay(exponentialBackoff(attempt)); true } for custom retry strategy with exponential backoff.
- flowOn operator — flow.flowOn(Dispatchers.IO) changes the dispatcher for all UPSTREAM operators. The downstream collect{} runs on the caller's dispatcher. flowOn is the correct way to change Flow execution context — NOT launching on a different dispatcher.
- Flow vs Sequence vs Collection — Collection: eager, all in memory. Sequence: lazy, synchronous (no suspend). Flow: lazy, asynchronous (suspend-capable, works across coroutines, can use delay). Use Flow for async data pipelines, Sequence for sync lazy transformations, Collection when you need random access.
- combine and zip operators — combine(flow1, flow2) { a, b -> f(a,b) } — re-emits when EITHER flow emits. zip(flow1, flow2) { a, b -> f(a,b) } — emits only when BOTH emit one value (paired). Use combine for reactive UI that depends on multiple independent data sources.
- flatMapLatest and flatMapConcat — flatMapLatest { searchTerm -> searchFlow(searchTerm) } — cancels previous inner flow when a new value arrives. Perfect for search-as-you-type. flatMapConcat processes inner flows sequentially. flatMapMerge processes concurrently.
- Flow completion and onCompletion — flow.onCompletion { cause -> } runs when the flow completes (normally or due to exception/cancellation). cause is null on normal completion. Use for cleanup, analytics, hiding loading indicators regardless of success/failure.
Code example
// Room returns Flow — re-emits on every DB change
@Dao
interface StudentDao {
@Query("SELECT * FROM students WHERE section = :section")
fun getStudentsBySection(section: String): Flow<List<StudentEntity>>
}
// Repository — transform and combine flows
class StudentRepository(
private val dao: StudentDao,
private val api: StudentApi
) {
fun getEnrolledStudents(section: String): Flow<List<Student>> =
dao.getStudentsBySection(section)
.map { entities -> entities.map { it.toDomain() } }
.filter { students -> students.isNotEmpty() }
.flowOn(Dispatchers.IO) // DB work on IO; collect on caller's dispatcher
// Combine two flows — emits when either changes
fun getDashboardData(sectionId: String): Flow<DashboardData> =
combine(
dao.getStudentsBySection(sectionId).map { it.map { e -> e.toDomain() } },
dao.getAttendanceFlow(sectionId)
) { students, attendance ->
DashboardData(students, attendance)
}.flowOn(Dispatchers.IO)
}
// Retry with exponential backoff
fun fetchWithRetry(id: String): Flow<Document> = flow {
emit(api.fetchDocument(id))
}.retry(3) { e ->
if (e is IOException) {
delay(1000)
true // retry
} else false // don't retry other exceptions
}.catch { e ->
emit(Document.empty()) // fallback on persistent failure
}
// flatMapLatest for search-as-you-type
fun searchResults(queryFlow: Flow<String>): Flow<List<Student>> =
queryFlow
.debounce(300)
.flatMapLatest { query ->
if (query.isEmpty()) flowOf(emptyList())
else dao.searchStudents(query).flowOn(Dispatchers.IO)
}
// ViewModel collecting Flow
class StudentViewModel(private val repo: StudentRepository) : ViewModel() {
val students: StateFlow<UiState<List<Student>>> =
repo.getEnrolledStudents("A1")
.map<List<Student>, UiState<List<Student>>> { UiState.Success(it) }
.catch { emit(UiState.Error(it.message ?: "Error")) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = UiState.Loading
)
}Line-by-line walkthrough
- 1. Flow> from Room — Room automatically re-runs the query when students table changes; the Flow re-emits
- 2. .map { entities -> entities.map { it.toDomain() } } — transforms each DB entity list to domain model list; lazy, runs per emission
- 3. .filter { students -> students.isNotEmpty() } — drops empty lists from reaching the UI; upstream from flowOn
- 4. .flowOn(Dispatchers.IO) — all operators above this line run on IO thread pool; collect{} runs on caller's dispatcher
- 5. combine(flow1, flow2) { students, attendance -> } — emits a new DashboardData whenever students OR attendance changes
- 6. .retry(3) { e -> if (e is IOException) { delay(1000); true } else false } — retries with delay on IO errors only
- 7. .catch { e -> emit(Document.empty()) } — if 3 retries fail, emit a safe fallback; prevents crash
- 8. queryFlow.debounce(300) — waits 300ms of silence before emitting; prevents over-querying on each keystroke
- 9. .flatMapLatest { query -> dao.searchStudents(query) } — cancels previous DB query when new query arrives
- 10. .stateIn(viewModelScope, WhileSubscribed(5000), Loading) — converts cold Flow to hot StateFlow; 5s grace period on config change
Spot the bug
fun getStudents(): Flow<List<Student>> =
dao.getAllStudents()
.flowOn(Dispatchers.IO)
.map { it.map { e -> e.toDomain() } } // after flowOn
.catch { e -> emit(emptyList()) }
fun collectInViewModel() {
viewModelScope.launch {
repository.getStudents().collect { students ->
throw RuntimeException("UI error") // bug
}
}
}Need a hint?
Show answer
Explain like I'm 5
Fun fact
Hands-on challenge
More resources
- Kotlin Flow Documentation (kotlinlang.org)
- Android Flow in ViewModel (developer.android.com)
- Room with Flow (developer.android.com)
- StateFlow and SharedFlow (developer.android.com)
- Flow operators reference (kotlinlang.org)