Lesson 75 of 83 advanced

Advanced Coroutines: callbackFlow, channelFlow & Custom Dispatchers

Master coroutine internals for complex async patterns and concurrent data streams

Open interactive version (quiz + challenge)

Real-world analogy

Think of coroutines like a restaurant kitchen. Regular flows are a single chef cooking dishes one at a time. channelFlow is having multiple chefs cooking simultaneously and putting dishes on a shared counter. callbackFlow is having an external delivery driver (callback API) dropping off ingredients through a window — you need awaitClose to keep the window open. Custom dispatchers are like deciding which kitchen stations (threads) each chef works at. Mutex is the single knife that chefs must take turns using, preventing them from cutting at the same time.

What is it?

Advanced Coroutines covers the lower-level coroutine and Flow APIs needed for complex async patterns: callbackFlow bridges callback-based APIs to Flow, channelFlow enables concurrent emissions, custom Dispatchers control threading, Mutex/Semaphore manage shared state, and Flow operators like conflate/buffer/debounce handle backpressure and timing. These are essential tools for production Android apps dealing with real-world async complexity.

Real-world relevance

callbackFlow is everywhere: wrapping Firebase Realtime Database listeners, location updates from FusedLocationProviderClient, Bluetooth scan results, sensor data streams, and WebSocket connections. channelFlow powers concurrent data aggregation like loading user profile + posts + stories simultaneously into a single stream. Mutex protects in-memory caches in multi-threaded environments. debounce powers every search bar. SharedFlow with WhileSubscribed manages expensive data streams in ViewModels.

Key points

Code example

// === callbackFlow: Wrapping Location Updates ===
fun locationUpdates(
    client: FusedLocationProviderClient,
    request: LocationRequest
): Flow<Location> = callbackFlow {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult) {
            result.lastLocation?.let { location ->
                trySend(location).isSuccess
            }
        }
    }

    // Check permission before requesting
    client.requestLocationUpdates(
        request,
        callback,
        Looper.getMainLooper()
    ).addOnFailureListener { e ->
        close(e) // Close the flow with error
    }

    // CRITICAL: keeps flow alive & cleans up on cancel
    awaitClose {
        client.removeLocationUpdates(callback)
    }
}

// === channelFlow: Concurrent Data Loading ===
fun loadDashboard(
    userApi: UserApi,
    postApi: PostApi,
    analyticsApi: AnalyticsApi
): Flow<DashboardState> = channelFlow {
    // All three load concurrently
    launch {
        val user = userApi.getProfile()
        send(DashboardState.UserLoaded(user))
    }
    launch {
        val posts = postApi.getRecent(limit = 10)
        send(DashboardState.PostsLoaded(posts))
    }
    launch {
        val stats = analyticsApi.getWeeklyStats()
        send(DashboardState.StatsLoaded(stats))
    }
}

// === Mutex for Thread-Safe Cache ===
class InMemoryCache<K, V> {
    private val mutex = Mutex()
    private val map = mutableMapOf<K, V>()

    suspend fun getOrPut(key: K, compute: suspend () -> V): V {
        // Fast path: read without lock
        map[key]?.let { return it }

        // Slow path: compute with lock
        return mutex.withLock {
            // Double-check after acquiring lock
            map[key] ?: compute().also { map[key] = it }
        }
    }

    suspend fun evict(key: K): V? = mutex.withLock {
        map.remove(key)
    }

    suspend fun clear() = mutex.withLock {
        map.clear()
    }
}

// === Flow Operators: Search with debounce ===
class SearchViewModel(
    private val repository: SearchRepository
) : ViewModel() {

    private val _query = MutableStateFlow("")
    val query: StateFlow<String> = _query.asStateFlow()

    val searchResults: StateFlow<SearchUiState> = _query
        .debounce(300L)
        .distinctUntilChanged()
        .filter { it.length >= 2 }
        .flatMapLatest { query ->
            flow {
                emit(SearchUiState.Loading)
                val results = repository.search(query)
                emit(SearchUiState.Success(results))
            }.catch { e ->
                emit(SearchUiState.Error(e.message ?: "Unknown"))
            }
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5_000),
            initialValue = SearchUiState.Idle
        )

    fun onQueryChanged(newQuery: String) {
        _query.value = newQuery
    }
}

// === Semaphore for Limited Concurrency ===
class ImageDownloader(
    private val httpClient: HttpClient,
    maxConcurrent: Int = 4
) {
    private val semaphore = Semaphore(maxConcurrent)

    suspend fun downloadAll(
        urls: List<String>
    ): List<Result<ByteArray>> = coroutineScope {
        urls.map { url ->
            async {
                semaphore.withPermit {
                    runCatching {
                        httpClient.download(url)
                    }
                }
            }
        }.awaitAll()
    }
}

// === Custom Dispatcher with limitedParallelism ===
object AppDispatchers {
    // Max 4 concurrent DB operations
    val database = Dispatchers.IO.limitedParallelism(4)

    // Single-threaded for file I/O ordering
    val fileIO = Dispatchers.IO.limitedParallelism(1)

    // Dedicated for heavy computation
    val computation = Dispatchers.Default.limitedParallelism(2)
}

// Usage
class DatabaseRepository(private val dao: UserDao) {
    suspend fun getUser(id: String): User =
        withContext(AppDispatchers.database) {
            dao.findById(id)
        }
}

// === Testing with Turbine ===
class SearchViewModelTest {
    @Test
    fun `search emits loading then results`() = runTest {
        val fakeRepo = FakeSearchRepository(
            results = listOf("Kotlin", "KSP")
        )
        val viewModel = SearchViewModel(fakeRepo)

        viewModel.searchResults.test {
            assertEquals(SearchUiState.Idle, awaitItem())

            viewModel.onQueryChanged("Kot")
            advanceTimeBy(350) // past debounce

            assertEquals(SearchUiState.Loading, awaitItem())
            assertEquals(
                SearchUiState.Success(listOf("Kotlin", "KSP")),
                awaitItem()
            )

            cancelAndIgnoreRemainingEvents()
        }
    }

    @Test
    fun `search cancels previous on new query`() = runTest {
        val slowRepo = FakeSearchRepository(
            delay = 1000L,
            results = listOf("Result")
        )
        val viewModel = SearchViewModel(slowRepo)

        viewModel.searchResults.test {
            awaitItem() // Idle

            viewModel.onQueryChanged("first")
            advanceTimeBy(350)
            assertEquals(SearchUiState.Loading, awaitItem())

            // New query before first completes
            viewModel.onQueryChanged("second")
            advanceTimeBy(350)
            assertEquals(SearchUiState.Loading, awaitItem())

            advanceTimeBy(1100)
            val result = awaitItem()
            assertTrue(result is SearchUiState.Success)

            cancelAndIgnoreRemainingEvents()
        }
    }
}

Line-by-line walkthrough

  1. 1. locationUpdates uses callbackFlow to bridge the callback-based FusedLocationProviderClient to a Flow.
  2. 2. trySend() is used inside onLocationResult because it's a regular function, not a suspend function.
  3. 3. addOnFailureListener calls close(e) to terminate the flow with an error if location registration fails.
  4. 4. awaitClose is CRITICAL — without it, the flow would complete immediately. The lambda removes the callback on cancellation.
  5. 5. loadDashboard uses channelFlow to launch 3 concurrent coroutines, each loading different data and sending results.
  6. 6. InMemoryCache uses Mutex.withLock for thread-safe access. The double-check pattern avoids redundant computation.
  7. 7. SearchViewModel chains debounce(300) -> distinctUntilChanged -> filter -> flatMapLatest for search-as-you-type.
  8. 8. flatMapLatest cancels the previous search API call when a new query arrives, preventing stale results.
  9. 9. stateIn with WhileSubscribed(5000) keeps the upstream alive for 5 seconds after the last collector, surviving rotation.
  10. 10. ImageDownloader uses Semaphore(4) to limit concurrent downloads, preventing server overload.
  11. 11. AppDispatchers uses limitedParallelism to create bounded dispatcher views without creating new thread pools.
  12. 12. Tests use Turbine's flow.test {} with advanceTimeBy to precisely control debounce timing and verify emissions.

Spot the bug

fun firebaseMessages(
    ref: DatabaseReference
): Flow<Message> = callbackFlow {
    val listener = object : ValueEventListener {
        override fun onDataChange(snapshot: DataSnapshot) {
            val msg = snapshot.getValue(Message::class.java)
            send(msg!!)
        }
        override fun onCancelled(error: DatabaseError) {
            close(error.toException())
        }
    }
    ref.addValueEventListener(listener)
}
Need a hint?
There are two bugs: one will crash at compile time, the other will cause the flow to complete instantly.
Show answer
Bug 1: `send()` is a suspend function but `onDataChange` is not a suspend function — use `trySend()` instead. Bug 2: Missing `awaitClose { ref.removeEventListener(listener) }` at the end — without it, the callbackFlow completes immediately after registering the listener and also leaks the listener. Also, `msg!!` could crash on null — should use `msg?.let { trySend(it) }` for safety.

Explain like I'm 5

Imagine you're listening to a walkie-talkie (callbackFlow) — someone on the other end talks whenever they want, and you just listen. You keep the walkie-talkie on (awaitClose) until you're done. channelFlow is like having multiple walkie-talkies all feeding into one speaker. Mutex is like a talking stick — only the person holding it can speak. And debounce is like waiting for someone to stop talking before you respond, so you don't interrupt them mid-sentence!

Fun fact

The callbackFlow builder was originally called channelFlow when first proposed, causing massive confusion. The Kotlin team split it into two: callbackFlow for wrapping callback APIs (with the awaitClose requirement) and channelFlow for general concurrent emissions. If you forget awaitClose in callbackFlow, you get an IllegalStateException at runtime — this was a deliberate design choice to prevent the extremely common bug of flows that complete instantly because the callback hasn't fired yet.

Hands-on challenge

Build a real-time stock ticker system: (1) Create a callbackFlow that wraps a WebSocket connection (use OkHttp WebSocketListener), emitting StockPrice updates and handling connection errors with retry, (2) Build a channelFlow that concurrently subscribes to 3 different stock symbols and merges their updates into a single stream, (3) Implement an InMemoryCache using Mutex that stores the latest price for each symbol with TTL expiration, (4) Create a ViewModel that uses debounce for user search, flatMapLatest to switch symbol subscriptions, and stateIn with WhileSubscribed(5000), (5) Write comprehensive tests using Turbine and StandardTestDispatcher that verify debounce timing, flow cancellation on new queries, and error recovery.

More resources

Open interactive version (quiz + challenge) ← Back to course: Android Interview Mastery