Lesson 9 of 83 intermediate

Flow Fundamentals: Cold Streams, Operators & Collection

Kotlin Flow — reactive streams for modern Android data pipelines

Open interactive version (quiz + challenge)

Real-world analogy

Flow is like a newspaper subscription: nothing happens until someone subscribes (cold). Each subscriber gets their own fresh newspaper starting from the beginning. The operators (map/filter/onEach) are like editors who transform the newspaper before it reaches you.

What is it?

Kotlin Flow is a cold asynchronous stream that emits multiple values over time. It is built on coroutines and is the idiomatic way to model reactive data streams in Android. Flow is used for database queries (Room returns Flow>), network polling, sensor data, and UI state streams. Understanding cold vs hot, operator placement, and flowOn is essential for senior interviews.

Real-world relevance

In a school management platform using Room + Firebase, Room returns Flow> — a cold flow that re-emits whenever the database changes. The ViewModel uses map to convert entities to UI models, filter to show only enrolled students, combine to merge attendance data, and flowOn(Dispatchers.IO) for the database work. The UI collects using repeatOnLifecycle to avoid collecting in the background.

Key points

Code example

// Room returns Flow — re-emits on every DB change
@Dao
interface StudentDao {
    @Query("SELECT * FROM students WHERE section = :section")
    fun getStudentsBySection(section: String): Flow<List<StudentEntity>>
}

// Repository — transform and combine flows
class StudentRepository(
    private val dao: StudentDao,
    private val api: StudentApi
) {
    fun getEnrolledStudents(section: String): Flow<List<Student>> =
        dao.getStudentsBySection(section)
            .map { entities -> entities.map { it.toDomain() } }
            .filter { students -> students.isNotEmpty() }
            .flowOn(Dispatchers.IO)  // DB work on IO; collect on caller's dispatcher

    // Combine two flows — emits when either changes
    fun getDashboardData(sectionId: String): Flow<DashboardData> =
        combine(
            dao.getStudentsBySection(sectionId).map { it.map { e -> e.toDomain() } },
            dao.getAttendanceFlow(sectionId)
        ) { students, attendance ->
            DashboardData(students, attendance)
        }.flowOn(Dispatchers.IO)
}

// Retry with exponential backoff
fun fetchWithRetry(id: String): Flow<Document> = flow {
    emit(api.fetchDocument(id))
}.retry(3) { e ->
    if (e is IOException) {
        delay(1000)
        true  // retry
    } else false  // don't retry other exceptions
}.catch { e ->
    emit(Document.empty())  // fallback on persistent failure
}

// flatMapLatest for search-as-you-type
fun searchResults(queryFlow: Flow<String>): Flow<List<Student>> =
    queryFlow
        .debounce(300)
        .flatMapLatest { query ->
            if (query.isEmpty()) flowOf(emptyList())
            else dao.searchStudents(query).flowOn(Dispatchers.IO)
        }

// ViewModel collecting Flow
class StudentViewModel(private val repo: StudentRepository) : ViewModel() {
    val students: StateFlow<UiState<List<Student>>> =
        repo.getEnrolledStudents("A1")
            .map<List<Student>, UiState<List<Student>>> { UiState.Success(it) }
            .catch { emit(UiState.Error(it.message ?: "Error")) }
            .stateIn(
                scope = viewModelScope,
                started = SharingStarted.WhileSubscribed(5000),
                initialValue = UiState.Loading
            )
}

Line-by-line walkthrough

  1. 1. Flow> from Room — Room automatically re-runs the query when students table changes; the Flow re-emits
  2. 2. .map { entities -> entities.map { it.toDomain() } } — transforms each DB entity list to domain model list; lazy, runs per emission
  3. 3. .filter { students -> students.isNotEmpty() } — drops empty lists from reaching the UI; upstream from flowOn
  4. 4. .flowOn(Dispatchers.IO) — all operators above this line run on IO thread pool; collect{} runs on caller's dispatcher
  5. 5. combine(flow1, flow2) { students, attendance -> } — emits a new DashboardData whenever students OR attendance changes
  6. 6. .retry(3) { e -> if (e is IOException) { delay(1000); true } else false } — retries with delay on IO errors only
  7. 7. .catch { e -> emit(Document.empty()) } — if 3 retries fail, emit a safe fallback; prevents crash
  8. 8. queryFlow.debounce(300) — waits 300ms of silence before emitting; prevents over-querying on each keystroke
  9. 9. .flatMapLatest { query -> dao.searchStudents(query) } — cancels previous DB query when new query arrives
  10. 10. .stateIn(viewModelScope, WhileSubscribed(5000), Loading) — converts cold Flow to hot StateFlow; 5s grace period on config change

Spot the bug

fun getStudents(): Flow<List<Student>> =
    dao.getAllStudents()
        .flowOn(Dispatchers.IO)
        .map { it.map { e -> e.toDomain() } }  // after flowOn
        .catch { e -> emit(emptyList()) }

fun collectInViewModel() {
    viewModelScope.launch {
        repository.getStudents().collect { students ->
            throw RuntimeException("UI error")  // bug
        }
    }
}
Need a hint?
flowOn placement affects which operators run on which dispatcher. catch doesn't catch collect{} exceptions.
Show answer
Bug 1: .map{} is placed AFTER .flowOn(Dispatchers.IO) — this means map runs on the CALLER's dispatcher (Main), not IO. The domain mapping (CPU work) should run on IO or Default. Fix: move .map{} before .flowOn(Dispatchers.IO). Bug 2: throw RuntimeException inside collect{} is NOT caught by the .catch{} operator — catch only catches upstream exceptions. The exception will propagate to viewModelScope's CoroutineExceptionHandler. Fix: wrap collect{} lambda body in try/catch, or move error-prone logic to an upstream operator.

Explain like I'm 5

Flow is like a water tap — nothing comes out until you turn it on (collect). Turning the tap is 'collecting'. The pipes before the tap are operators (map/filter) that shape the water. flowOn is like choosing which water source (hot spring, cold mountain) feeds the pipes. Each person who turns their own tap gets their own fresh stream from the beginning.

Fun fact

Room Database's Flow> query uses SQLite's notification mechanism to re-run the query whenever the relevant table changes. This means a single @Query Flow in Room gives you a reactive database — no manual refresh needed. Google's architecture samples from 2022 onwards exclusively use Flow> from Room instead of LiveData>.

Hands-on challenge

Build a real-time document search for the SaaS collaboration app. Create a Flow pipeline that: 1) Takes a Flow of search queries. 2) Debounces by 300ms. 3) Uses flatMapLatest to cancel stale searches. 4) Queries Room with flowOn(Dispatchers.IO). 5) Maps entities to UI models. 6) Uses retry(2) for transient failures. 7) Uses catch to emit emptyList() on persistent failure. 8) Uses onEach to log search analytics. Convert the final Flow to StateFlow using stateIn with WhileSubscribed(5000).

More resources

Open interactive version (quiz + challenge) ← Back to course: Android Interview Mastery