Advanced Coroutines: callbackFlow, channelFlow & Custom Dispatchers
Master coroutine internals for complex async patterns and concurrent data streams
Open interactive version (quiz + challenge)Real-world analogy
What is it?
Advanced Coroutines covers the lower-level coroutine and Flow APIs needed for complex async patterns: callbackFlow bridges callback-based APIs to Flow, channelFlow enables concurrent emissions, custom Dispatchers control threading, Mutex/Semaphore manage shared state, and Flow operators like conflate/buffer/debounce handle backpressure and timing. These are essential tools for production Android apps dealing with real-world async complexity.
Real-world relevance
callbackFlow is everywhere: wrapping Firebase Realtime Database listeners, location updates from FusedLocationProviderClient, Bluetooth scan results, sensor data streams, and WebSocket connections. channelFlow powers concurrent data aggregation like loading user profile + posts + stories simultaneously into a single stream. Mutex protects in-memory caches in multi-threaded environments. debounce powers every search bar. SharedFlow with WhileSubscribed manages expensive data streams in ViewModels.
Key points
- callbackFlow for Callback APIs — callbackFlow creates a Flow from callback-based APIs (listeners, SDKs, platform APIs). Inside the builder, register callbacks that call `trySend(value)` to emit values. Use `awaitClose { unregisterCallback() }` as the last statement to keep the flow active and handle cleanup when the collector cancels. Without awaitClose, the flow completes immediately. Channel capacity defaults to BUFFERED (64 elements).
- channelFlow for Concurrent Emissions — channelFlow allows launching multiple coroutines that emit concurrently into the same flow. Unlike regular `flow {}` builder which is sequential and restricted to the same CoroutineContext, channelFlow provides a ProducerScope where you can `launch {}` child coroutines that each call `send()` or `trySend()`. It's backed by a Channel, so emissions from different coroutines are thread-safe.
- awaitClose & ProducerScope — awaitClose suspends the callbackFlow/channelFlow until the downstream collector cancels or the scope is cancelled. The lambda runs cleanup code (unregister listeners, close connections). ProducerScope extends CoroutineScope and SendChannel, providing `send()` (suspending), `trySend()` (non-suspending), `close()`, and `channel`. Use `trySend()` in callbacks since callbacks aren't suspend functions.
- Custom CoroutineDispatcher — Create custom dispatchers by extending CoroutineDispatcher and overriding `dispatch(context, block)`. Use cases: dedicated thread for database operations, limiting concurrency with `Dispatchers.IO.limitedParallelism(n)`, or wrapping Java executors with `executor.asCoroutineDispatcher()`. In production, `limitedParallelism()` is preferred over custom dispatchers for concurrency control.
- CoroutineContext Composition — CoroutineContext is a set of Elements combined with `+` operator. Elements include: Job (lifecycle), CoroutineDispatcher (threading), CoroutineName (debugging), CoroutineExceptionHandler (error handling). Combining contexts: `Dispatchers.IO + CoroutineName("fetcher") + exceptionHandler`. Right-side elements override left-side for same key. Access elements via `coroutineContext[Job]` or `coroutineContext[CoroutineName]`.
- Mutex & Shared Mutable State — Mutex provides mutual exclusion for coroutines (like synchronized but non-blocking). Use `mutex.withLock {}` to protect shared mutable state. Unlike synchronized blocks, Mutex suspends instead of blocking the thread. For atomic counters, use `AtomicInteger` or `MutableStateFlow.update {}`. Semaphore(permits) limits concurrent access to a resource (e.g., max 5 parallel network requests).
- Flow Operators: conflate, buffer, debounce — `buffer(capacity)` decouples producer and consumer into separate coroutines — producer doesn't wait for slow consumer. `conflate()` is buffer with CONFLATED capacity: drops intermediate values, consumer always gets the latest. `debounce(timeoutMillis)` waits for a pause in emissions before delivering (search-as-you-type). `sample(periodMillis)` emits the latest value at fixed intervals.
- Flow Operators: flatMapLatest, flatMapMerge, flatMapConcat — `flatMapLatest {}` cancels the previous inner flow when a new value arrives — ideal for search queries. `flatMapMerge(concurrency) {}` runs inner flows concurrently up to the concurrency limit. `flatMapConcat {}` runs inner flows sequentially. `transformLatest {}` is the transform version of flatMapLatest for more control.
- Testing: UnconfinedTestDispatcher vs StandardTestDispatcher — StandardTestDispatcher queues coroutines — they only execute when you call `advanceUntilIdle()`, `advanceTimeBy()`, or `runCurrent()`. Gives precise control over execution order. UnconfinedTestDispatcher executes coroutines eagerly (immediately at launch point). Use Standard for testing timing-sensitive code and Unconfined for simpler tests where execution order doesn't matter.
- Turbine for Flow Testing — Turbine library provides `flow.test {}` for ergonomic Flow testing. Inside the block: `awaitItem()` gets the next emission, `awaitComplete()` verifies completion, `awaitError()` verifies error, `cancelAndIgnoreRemainingEvents()` for cleanup. Handles timeouts automatically. Much cleaner than manually collecting into lists.
- Exception Handling in Flows — `catch {}` operator catches upstream exceptions and can `emit()` fallback values. It doesn't catch downstream exceptions. `onCompletion { cause -> }` runs on completion (normal or exceptional) for cleanup. `retry(retries) { cause -> }` and `retryWhen { cause, attempt -> }` implement retry logic. Use `flowOn(dispatcher)` to change upstream context without affecting downstream.
- SharedFlow & StateFlow Internals — SharedFlow is a hot flow backed by a replay cache + buffer. `MutableSharedFlow(replay, extraBufferCapacity, onBufferOverflow)` configures behavior. StateFlow is a SharedFlow with replay=1 that requires initial value and uses distinctUntilChanged. `shareIn(scope, started, replay)` and `stateIn(scope, started, initialValue)` convert cold flows to hot. SharingStarted.WhileSubscribed(5000) is the recommended production config.
Code example
// === callbackFlow: Wrapping Location Updates ===
fun locationUpdates(
client: FusedLocationProviderClient,
request: LocationRequest
): Flow<Location> = callbackFlow {
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult) {
result.lastLocation?.let { location ->
trySend(location).isSuccess
}
}
}
// Check permission before requesting
client.requestLocationUpdates(
request,
callback,
Looper.getMainLooper()
).addOnFailureListener { e ->
close(e) // Close the flow with error
}
// CRITICAL: keeps flow alive & cleans up on cancel
awaitClose {
client.removeLocationUpdates(callback)
}
}
// === channelFlow: Concurrent Data Loading ===
fun loadDashboard(
userApi: UserApi,
postApi: PostApi,
analyticsApi: AnalyticsApi
): Flow<DashboardState> = channelFlow {
// All three load concurrently
launch {
val user = userApi.getProfile()
send(DashboardState.UserLoaded(user))
}
launch {
val posts = postApi.getRecent(limit = 10)
send(DashboardState.PostsLoaded(posts))
}
launch {
val stats = analyticsApi.getWeeklyStats()
send(DashboardState.StatsLoaded(stats))
}
}
// === Mutex for Thread-Safe Cache ===
class InMemoryCache<K, V> {
private val mutex = Mutex()
private val map = mutableMapOf<K, V>()
suspend fun getOrPut(key: K, compute: suspend () -> V): V {
// Fast path: read without lock
map[key]?.let { return it }
// Slow path: compute with lock
return mutex.withLock {
// Double-check after acquiring lock
map[key] ?: compute().also { map[key] = it }
}
}
suspend fun evict(key: K): V? = mutex.withLock {
map.remove(key)
}
suspend fun clear() = mutex.withLock {
map.clear()
}
}
// === Flow Operators: Search with debounce ===
class SearchViewModel(
private val repository: SearchRepository
) : ViewModel() {
private val _query = MutableStateFlow("")
val query: StateFlow<String> = _query.asStateFlow()
val searchResults: StateFlow<SearchUiState> = _query
.debounce(300L)
.distinctUntilChanged()
.filter { it.length >= 2 }
.flatMapLatest { query ->
flow {
emit(SearchUiState.Loading)
val results = repository.search(query)
emit(SearchUiState.Success(results))
}.catch { e ->
emit(SearchUiState.Error(e.message ?: "Unknown"))
}
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = SearchUiState.Idle
)
fun onQueryChanged(newQuery: String) {
_query.value = newQuery
}
}
// === Semaphore for Limited Concurrency ===
class ImageDownloader(
private val httpClient: HttpClient,
maxConcurrent: Int = 4
) {
private val semaphore = Semaphore(maxConcurrent)
suspend fun downloadAll(
urls: List<String>
): List<Result<ByteArray>> = coroutineScope {
urls.map { url ->
async {
semaphore.withPermit {
runCatching {
httpClient.download(url)
}
}
}
}.awaitAll()
}
}
// === Custom Dispatcher with limitedParallelism ===
object AppDispatchers {
// Max 4 concurrent DB operations
val database = Dispatchers.IO.limitedParallelism(4)
// Single-threaded for file I/O ordering
val fileIO = Dispatchers.IO.limitedParallelism(1)
// Dedicated for heavy computation
val computation = Dispatchers.Default.limitedParallelism(2)
}
// Usage
class DatabaseRepository(private val dao: UserDao) {
suspend fun getUser(id: String): User =
withContext(AppDispatchers.database) {
dao.findById(id)
}
}
// === Testing with Turbine ===
class SearchViewModelTest {
@Test
fun `search emits loading then results`() = runTest {
val fakeRepo = FakeSearchRepository(
results = listOf("Kotlin", "KSP")
)
val viewModel = SearchViewModel(fakeRepo)
viewModel.searchResults.test {
assertEquals(SearchUiState.Idle, awaitItem())
viewModel.onQueryChanged("Kot")
advanceTimeBy(350) // past debounce
assertEquals(SearchUiState.Loading, awaitItem())
assertEquals(
SearchUiState.Success(listOf("Kotlin", "KSP")),
awaitItem()
)
cancelAndIgnoreRemainingEvents()
}
}
@Test
fun `search cancels previous on new query`() = runTest {
val slowRepo = FakeSearchRepository(
delay = 1000L,
results = listOf("Result")
)
val viewModel = SearchViewModel(slowRepo)
viewModel.searchResults.test {
awaitItem() // Idle
viewModel.onQueryChanged("first")
advanceTimeBy(350)
assertEquals(SearchUiState.Loading, awaitItem())
// New query before first completes
viewModel.onQueryChanged("second")
advanceTimeBy(350)
assertEquals(SearchUiState.Loading, awaitItem())
advanceTimeBy(1100)
val result = awaitItem()
assertTrue(result is SearchUiState.Success)
cancelAndIgnoreRemainingEvents()
}
}
}Line-by-line walkthrough
- 1. locationUpdates uses callbackFlow to bridge the callback-based FusedLocationProviderClient to a Flow.
- 2. trySend() is used inside onLocationResult because it's a regular function, not a suspend function.
- 3. addOnFailureListener calls close(e) to terminate the flow with an error if location registration fails.
- 4. awaitClose is CRITICAL — without it, the flow would complete immediately. The lambda removes the callback on cancellation.
- 5. loadDashboard uses channelFlow to launch 3 concurrent coroutines, each loading different data and sending results.
- 6. InMemoryCache uses Mutex.withLock for thread-safe access. The double-check pattern avoids redundant computation.
- 7. SearchViewModel chains debounce(300) -> distinctUntilChanged -> filter -> flatMapLatest for search-as-you-type.
- 8. flatMapLatest cancels the previous search API call when a new query arrives, preventing stale results.
- 9. stateIn with WhileSubscribed(5000) keeps the upstream alive for 5 seconds after the last collector, surviving rotation.
- 10. ImageDownloader uses Semaphore(4) to limit concurrent downloads, preventing server overload.
- 11. AppDispatchers uses limitedParallelism to create bounded dispatcher views without creating new thread pools.
- 12. Tests use Turbine's flow.test {} with advanceTimeBy to precisely control debounce timing and verify emissions.
Spot the bug
fun firebaseMessages(
ref: DatabaseReference
): Flow<Message> = callbackFlow {
val listener = object : ValueEventListener {
override fun onDataChange(snapshot: DataSnapshot) {
val msg = snapshot.getValue(Message::class.java)
send(msg!!)
}
override fun onCancelled(error: DatabaseError) {
close(error.toException())
}
}
ref.addValueEventListener(listener)
}Need a hint?
Show answer
Explain like I'm 5
Fun fact
Hands-on challenge
More resources
- Kotlin Coroutines Flow Guide (Kotlin Documentation)
- callbackFlow: A Primitives Guide (Android Developers Medium)
- Advanced Coroutines with Kotlin Flow (Android Developers YouTube)
- Turbine - Flow Testing Library (Cash App)