Kotlin Coroutines & Flows
Patterns for structured concurrency, Flow-based reactive streams, and coroutine testing in Android and Kotlin Multiplatform projects.
When to Activate
- Writing async code with Kotlin coroutines
- Using Flow, StateFlow, or SharedFlow for reactive data
- Handling concurrent operations (parallel loading, debounce, retry)
- Testing coroutines and Flows
- Managing coroutine scopes and cancellation
Structured Concurrency
Scope Hierarchy
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (structured child)
├── async { } (concurrent task)
└── async { } (concurrent task)
Always use structured concurrency — never GlobalScope:
kotlin
1// BAD
2GlobalScope.launch { fetchData() }
3
4// GOOD — scoped to ViewModel lifecycle
5viewModelScope.launch { fetchData() }
6
7// GOOD — scoped to composable lifecycle
8LaunchedEffect(key) { fetchData() }
Parallel Decomposition
Use coroutineScope + async for parallel work:
kotlin
1suspend fun loadDashboard(): Dashboard = coroutineScope {
2 val items = async { itemRepository.getRecent() }
3 val stats = async { statsRepository.getToday() }
4 val profile = async { userRepository.getCurrent() }
5 Dashboard(
6 items = items.await(),
7 stats = stats.await(),
8 profile = profile.await()
9 )
10}
SupervisorScope
Use supervisorScope when child failures should not cancel siblings:
kotlin
1suspend fun syncAll() = supervisorScope {
2 launch { syncItems() } // failure here won't cancel syncStats
3 launch { syncStats() }
4 launch { syncSettings() }
5}
Flow Patterns
Cold Flow — One-Shot to Stream Conversion
kotlin
1fun observeItems(): Flow<List<Item>> = flow {
2 // Re-emits whenever the database changes
3 itemDao.observeAll()
4 .map { entities -> entities.map { it.toDomain() } }
5 .collect { emit(it) }
6}
StateFlow for UI State
kotlin
1class DashboardViewModel(
2 observeProgress: ObserveUserProgressUseCase
3) : ViewModel() {
4 val progress: StateFlow<UserProgress> = observeProgress()
5 .stateIn(
6 scope = viewModelScope,
7 started = SharingStarted.WhileSubscribed(5_000),
8 initialValue = UserProgress.EMPTY
9 )
10}
WhileSubscribed(5_000) keeps the upstream active for 5 seconds after the last subscriber leaves — survives configuration changes without restarting.
Combining Multiple Flows
kotlin
1val uiState: StateFlow<HomeState> = combine(
2 itemRepository.observeItems(),
3 settingsRepository.observeTheme(),
4 userRepository.observeProfile()
5) { items, theme, profile ->
6 HomeState(items = items, theme = theme, profile = profile)
7}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
Flow Operators
kotlin
1// Debounce search input
2searchQuery
3 .debounce(300)
4 .distinctUntilChanged()
5 .flatMapLatest { query -> repository.search(query) }
6 .catch { emit(emptyList()) }
7 .collect { results -> _state.update { it.copy(results = results) } }
8
9// Retry with exponential backoff
10fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
11 .retryWhen { cause, attempt ->
12 if (cause is IOException && attempt < 3) {
13 delay(1000L * (1 shl attempt.toInt()))
14 true
15 } else {
16 false
17 }
18 }
SharedFlow for One-Time Events
kotlin
1class ItemListViewModel : ViewModel() {
2 private val _effects = MutableSharedFlow<Effect>()
3 val effects: SharedFlow<Effect> = _effects.asSharedFlow()
4
5 sealed interface Effect {
6 data class ShowSnackbar(val message: String) : Effect
7 data class NavigateTo(val route: String) : Effect
8 }
9
10 private fun deleteItem(id: String) {
11 viewModelScope.launch {
12 repository.delete(id)
13 _effects.emit(Effect.ShowSnackbar("Item deleted"))
14 }
15 }
16}
17
18// Collect in Composable
19LaunchedEffect(Unit) {
20 viewModel.effects.collect { effect ->
21 when (effect) {
22 is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
23 is Effect.NavigateTo -> navController.navigate(effect.route)
24 }
25 }
26}
Dispatchers
kotlin
1// CPU-intensive work
2withContext(Dispatchers.Default) { parseJson(largePayload) }
3
4// IO-bound work
5withContext(Dispatchers.IO) { database.query() }
6
7// Main thread (UI) — default in viewModelScope
8withContext(Dispatchers.Main) { updateUi() }
In KMP, use Dispatchers.Default and Dispatchers.Main (available on all platforms). Dispatchers.IO is JVM/Android only — use Dispatchers.Default on other platforms or provide via DI.
Cancellation
Cooperative Cancellation
Long-running loops must check for cancellation:
kotlin
1suspend fun processItems(items: List<Item>) = coroutineScope {
2 for (item in items) {
3 ensureActive() // throws CancellationException if cancelled
4 process(item)
5 }
6}
Cleanup with try/finally
kotlin
1viewModelScope.launch {
2 try {
3 _state.update { it.copy(isLoading = true) }
4 val data = repository.fetch()
5 _state.update { it.copy(data = data) }
6 } finally {
7 _state.update { it.copy(isLoading = false) } // always runs, even on cancellation
8 }
9}
Testing
Testing StateFlow with Turbine
kotlin
1@Test
2fun `search updates item list`() = runTest {
3 val fakeRepository = FakeItemRepository().apply { emit(testItems) }
4 val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
5
6 viewModel.state.test {
7 assertEquals(ItemListState(), awaitItem()) // initial
8
9 viewModel.onSearch("query")
10 val loading = awaitItem()
11 assertTrue(loading.isLoading)
12
13 val loaded = awaitItem()
14 assertFalse(loaded.isLoading)
15 assertEquals(1, loaded.items.size)
16 }
17}
Testing with TestDispatcher
kotlin
1@Test
2fun `parallel load completes correctly`() = runTest {
3 val viewModel = DashboardViewModel(
4 itemRepo = FakeItemRepo(),
5 statsRepo = FakeStatsRepo()
6 )
7
8 viewModel.load()
9 advanceUntilIdle()
10
11 val state = viewModel.state.value
12 assertNotNull(state.items)
13 assertNotNull(state.stats)
14}
Faking Flows
kotlin
1class FakeItemRepository : ItemRepository {
2 private val _items = MutableStateFlow<List<Item>>(emptyList())
3
4 override fun observeItems(): Flow<List<Item>> = _items
5
6 fun emit(items: List<Item>) { _items.value = items }
7
8 override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
9 return Result.success(_items.value.filter { it.category == category })
10 }
11}
Anti-Patterns to Avoid
- Using
GlobalScope — leaks coroutines, no structured cancellation
- Collecting Flows in
init {} without a scope — use viewModelScope.launch
- Using
MutableStateFlow with mutable collections — always use immutable copies: _state.update { it.copy(list = it.list + newItem) }
- Catching
CancellationException — let it propagate for proper cancellation
- Using
flowOn(Dispatchers.Main) to collect — collection dispatcher is the caller's dispatcher
- Creating
Flow in @Composable without remember — recreates the flow every recomposition
References
See skill: compose-multiplatform-patterns for UI consumption of Flows.
See skill: android-clean-architecture for where coroutines fit in layers.