Flow & Reactive Streams
You already know async streams from RxJS Observables and Go channel pipelines.
Kotlin’s Flow is the same idea: a cold asynchronous stream with built-in
backpressure. This module maps Flow — its builders, operators, hot variants
(SharedFlow/StateFlow), and error handling — onto the reactive tools you
already reach for.
Same dependencies as Module 05: Coroutines —
kotlinx-coroutines-core already includes Flow. Add the test helpers for working
through this module:
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation("app.cash.turbine:turbine:1.2.0") // Flow testing testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}Why Flow?
Section titled “Why Flow?”You already know async streams. Here’s the mapping:
| Concept | TypeScript (RxJS) | Go | Kotlin |
|---|---|---|---|
| Cold stream | Observable | Channel + goroutine (manual) | Flow |
| Hot stream | Subject / BehaviorSubject | Broadcast channel pattern | SharedFlow / StateFlow |
| Subscribe | .subscribe() | for val := range ch | .collect { } |
| Transform | .pipe(map(...)) | goroutine that reads/writes | .map { } |
| Backpressure | throttle, debounce, sample | Buffered channel back-pressure | buffer, conflate, collectLatest |
| Cancellation | .unsubscribe() | close(ch) / context cancel | Cancel the collecting coroutine |
| Error handling | catchError | Check err in loop | .catch { } |
Flow vs Channel (when to use which)
Section titled “Flow vs Channel (when to use which)”| Feature | Flow | Channel |
|---|---|---|
| Nature | Cold — nothing happens until collected | Hot — produces regardless of consumers |
| Consumers | Each collector gets its own stream | Multiple consumers share the same stream |
| Use case | Data transformation pipeline, API responses | Communication between coroutines |
| Backpressure | Built-in (collector controls pace) | Built-in (buffered or rendezvous) |
| Cancellation | Cancelling collector stops production | Must close channel explicitly |
| Analogy | Iterator that can suspend | Go channel |
Rule of thumb: Use Flow for data pipelines. Use Channel for
coroutine-to-coroutine communication.
Flow basics
Section titled “Flow basics”Cold streams: nothing happens until you collect
Section titled “Cold streams: nothing happens until you collect”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun numbersFlow(): Flow<Int> = flow { println("Flow started") // Only runs when collected for (i in 1..3) { delay(100) emit(i) } println("Flow completed")}
fun main() = runBlocking { val myFlow = numbersFlow() println("Flow created, nothing happening yet...")
println("Collecting first time:") myFlow.collect { value -> println("Got: $value") }
println("\nCollecting second time:") myFlow.collect { value -> println("Got again: $value") }}// Output:// Flow created, nothing happening yet...// Collecting first time:// Flow started// Got: 1// Got: 2// Got: 3// Flow completed//// Collecting second time:// Flow started// Got again: 1// Got again: 2// Got again: 3// Flow completedKey insight: Each collect call runs the flow from scratch. This is like RxJS
cold Observables — each subscriber gets its own independent stream.
Comparison: creating an async stream
Section titled “Comparison: creating an async stream”import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(subscriber => { console.log("Stream started"); for (let i = 1; i <= 3; i++) { subscriber.next(i); } subscriber.complete();});
numbers$.subscribe(val => console.log(`Got: ${val}`));func numbers(ctx context.Context) <-chan int { ch := make(chan int) go func() { defer close(ch) for i := 1; i <= 3; i++ { select { case ch <- i: case <-ctx.Done(): return } } }() return ch}
ch := numbers(ctx)for val := range ch { fmt.Println("Got:", val)}fun numbers(): Flow<Int> = flow { for (i in 1..3) { emit(i) }}
numbers().collect { println("Got: $it") }Key Differences:
- TS/RxJS: Observable pushes values to subscriber. Complex operator chains via
.pipe(). - Go: Channel is hot — goroutine runs immediately. Manual cancellation via context.
- Kotlin: Flow is cold, pull-based. Collector drives the pace. Operators are simple extension functions.
Flow builders
Section titled “Flow builders”flow { } — the main builder
Section titled “flow { } — the main builder”import kotlinx.coroutines.flow.*
// Build a flow by emitting valuesval myFlow: Flow<String> = flow { emit("Hello") emit("World")}flowOf() — from fixed values
Section titled “flowOf() — from fixed values”import kotlinx.coroutines.flow.*
// Like listOf() but for flowsval numbers = flowOf(1, 2, 3, 4, 5)
// Like Observable.of(1, 2, 3) in RxJS.asFlow() — convert collections/sequences
Section titled “.asFlow() — convert collections/sequences”import kotlinx.coroutines.flow.*
// From a listval fromList: Flow<Int> = listOf(1, 2, 3).asFlow()
// From a rangeval fromRange: Flow<Int> = (1..10).asFlow()
// From a sequence (lazy)val fromSequence: Flow<Int> = sequence { yield(1) yield(2) yield(3)}.asFlow()channelFlow { } — for concurrent emission
Section titled “channelFlow { } — for concurrent emission”Regular flow {} is sequential — you can only emit from the flow’s own coroutine.
channelFlow allows concurrent emission from multiple coroutines:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
val concurrentFlow: Flow<String> = channelFlow { // Launch concurrent producers launch { send("from coroutine 1") } launch { send("from coroutine 2") } launch { delay(100) send("from coroutine 3") }}
fun main() = runBlocking { concurrentFlow.collect { println(it) }}callbackFlow { } — bridging callback-based APIs
Section titled “callbackFlow { } — bridging callback-based APIs”const fileChanges$ = new Observable<string>(subscriber => { const watcher = watchDirectory(dir); watcher.on('change', path => subscriber.next(path)); return () => watcher.close(); // cleanup on unsubscribe});import kotlinx.coroutines.*import kotlinx.coroutines.channels.*import kotlinx.coroutines.flow.*
// Wrapping a callback-based event sourcefun fileWatcher(directory: String): Flow<String> = callbackFlow { val watcher = object : FileChangeListener { override fun onFileChanged(path: String) { trySend(path) // non-blocking send from callback } }
// Register the watcher registerWatcher(directory, watcher)
// Suspend until the flow collector cancels awaitClose { // Cleanup when collection stops unregisterWatcher(directory, watcher) }}Key Differences:
- TS/RxJS: The Observable’s returned function is the teardown logic, run on
unsubscribe. - Kotlin:
trySendpushes non-blocking from the callback;awaitCloseholds the flow open and runs the cleanup when the collector cancels.
Flow operators
Section titled “Flow operators”Operators transform flows without collecting them. They return new Flow
instances — nothing executes until collect is called.
map — transform each value
Section titled “map — transform each value”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..5).asFlow() .map { it * it } .collect { println(it) } // 1, 4, 9, 16, 25}RxJS: source$.pipe(map(x => x * x))
Go: Separate goroutine reading from input channel, writing to output channel
filter — keep matching values
Section titled “filter — keep matching values”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..10).asFlow() .filter { it % 2 == 0 } .collect { println(it) } // 2, 4, 6, 8, 10}transform — flexible transformation (emit 0, 1, or N values)
Section titled “transform — flexible transformation (emit 0, 1, or N values)”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..5).asFlow() .transform { value -> emit("Processing $value") delay(100) emit("Done with $value") // Can also emit 0 items (acts like filter) } .collect { println(it) } // Processing 1 // Done with 1 // Processing 2 // Done with 2 // ...}transform is like RxJS mergeMap/concatMap when you need to emit multiple
values per input.
take — limit items
Section titled “take — limit items”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..100).asFlow() .take(5) .collect { println(it) } // 1, 2, 3, 4, 5 // Flow is cancelled after 5 items}drop — skip first N items
Section titled “drop — skip first N items”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..10).asFlow() .drop(3) .collect { println(it) } // 4, 5, 6, 7, 8, 9, 10}onEach — side effects without changing values
Section titled “onEach — side effects without changing values”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { (1..5).asFlow() .onEach { println("About to process: $it") } .map { it * 2 } .onEach { println("After doubling: $it") } .collect()}Like RxJS tap.
reduce and fold — terminal aggregation
Section titled “reduce and fold — terminal aggregation”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // reduce: no initial value, uses first element as accumulator val sum = (1..10).asFlow() .reduce { accumulator, value -> accumulator + value } println("Sum: $sum") // 55
// fold: with initial value val product = (1..5).asFlow() .fold(1) { acc, value -> acc * value } println("Product: $product") // 120}toList, toSet — collect into collections
Section titled “toList, toSet — collect into collections”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val list: List<Int> = (1..5).asFlow() .map { it * it } .toList() println(list) // [1, 4, 9, 16, 25]
val set: Set<Int> = flowOf(1, 2, 2, 3, 3, 3).toSet() println(set) // [1, 2, 3]}first, single — get specific elements
Section titled “first, single — get specific elements”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val first = (1..10).asFlow().first() println("First: $first") // 1
val firstEven = (1..10).asFlow().first { it % 2 == 0 } println("First even: $firstEven") // 2
val single = flowOf(42).single() println("Single: $single") // 42 // Throws if flow has 0 or 2+ elements}Chaining operators (pipeline)
Section titled “Chaining operators (pipeline)”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
data class LogEntry(val level: String, val message: String, val timestamp: Long)
fun logStream(): Flow<LogEntry> = flow { val logs = listOf( LogEntry("INFO", "Server started", 1000), LogEntry("DEBUG", "Connection pool initialized", 1001), LogEntry("ERROR", "Failed to connect to DB", 1002), LogEntry("INFO", "Retrying connection", 1003), LogEntry("ERROR", "Connection timeout", 1004), LogEntry("INFO", "Connected successfully", 1005), ) for (log in logs) { delay(100) emit(log) }}
fun main() = runBlocking { logStream() .filter { it.level == "ERROR" } .map { "[${it.level}] ${it.message}" } .onEach { println("Alert: $it") } .toList() .also { println("\nTotal errors: ${it.size}") }}// Alert: [ERROR] Failed to connect to DB// Alert: [ERROR] Connection timeout//// Total errors: 2This is the canonical Flow shape — a source feeding a chain of operators into a terminal collector:
flowchart LR
S["logStream()"] --> F["filter { ERROR }"]
F --> M["map { format }"]
M --> O["onEach { alert }"]
O --> C["toList() collector"]
Combining flows
Section titled “Combining flows”zip — pair elements from two flows
Section titled “zip — pair elements from two flows”Combines one element from each flow into a pair. Completes when the shorter flow ends.
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val names = flowOf("Alice", "Bob", "Charlie") val ages = flowOf(30, 25, 35)
names.zip(ages) { name, age -> "$name is $age" } .collect { println(it) } // Alice is 30 // Bob is 25 // Charlie is 35}RxJS equivalent: zip(names$, ages$).pipe(map(([name, age]) => ...))
combine — latest from each flow
Section titled “combine — latest from each flow”Takes the latest value from each flow whenever either emits. Useful for merging state.
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val flow1 = flow { emit("A1"); delay(100) emit("A2"); delay(200) emit("A3") } val flow2 = flow { emit("B1"); delay(150) emit("B2"); delay(150) emit("B3") }
flow1.combine(flow2) { a, b -> "$a + $b" } .collect { println(it) } // A1 + B1 // A2 + B1 // A2 + B2 // A3 + B2 // A3 + B3}RxJS equivalent: combineLatest([flow1$, flow2$])
merge — interleave multiple flows
Section titled “merge — interleave multiple flows”All values from all flows in arrival order:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val flow1 = flow { delay(100); emit("A") delay(200); emit("B") } val flow2 = flow { delay(150); emit("1") delay(150); emit("2") }
merge(flow1, flow2).collect { println(it) } // A (at 100ms) // 1 (at 150ms) // B (at 300ms) // 2 (at 300ms)}RxJS equivalent: merge(flow1$, flow2$)
Go equivalent: Fan-in pattern — multiple goroutines writing to one channel
Comparison: combining async streams
Section titled “Comparison: combining async streams”| Operation | RxJS | Go | Kotlin Flow |
|---|---|---|---|
| Pair by position | zip() | Manual sync with 2 channels | zip() |
| Latest from each | combineLatest() | Manual with goroutines + select | combine() |
| Merge all | merge() | Fan-in: N goroutines → 1 channel | merge() |
| Concat | concat() | Read ch1 fully, then ch2 | flatMapConcat or emptyFlow + ... |
FlatMap operators
Section titled “FlatMap operators”FlatMap operators transform each value into a flow, then flatten the results. This is where the real power is for async pipelines.
flatMapConcat — sequential (one at a time)
Section titled “flatMapConcat — sequential (one at a time)”Each inner flow completes before the next starts:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun fetchDetails(id: Int): Flow<String> = flow { delay(100) emit("Details for $id (part 1)") delay(100) emit("Details for $id (part 2)")}
fun main() = runBlocking { flowOf(1, 2, 3) .flatMapConcat { id -> fetchDetails(id) } .collect { println(it) } // Details for 1 (part 1) // Details for 1 (part 2) // Details for 2 (part 1) // Details for 2 (part 2) // Details for 3 (part 1) // Details for 3 (part 2)}RxJS equivalent: concatMap
flatMapMerge — concurrent (all at once)
Section titled “flatMapMerge — concurrent (all at once)”All inner flows run concurrently:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { flowOf(1, 2, 3) .flatMapMerge { id -> fetchDetails(id) } .collect { println(it) } // Results interleaved — all three running concurrently // Details for 1 (part 1) // Details for 2 (part 1) // Details for 3 (part 1) // Details for 1 (part 2) // Details for 2 (part 2) // Details for 3 (part 2)}
// Control concurrency:flowOf(1, 2, 3) .flatMapMerge(concurrency = 2) { id -> fetchDetails(id) } .collect { println(it) }RxJS equivalent: mergeMap (with optional concurrent parameter)
flatMapLatest — cancel previous, keep latest
Section titled “flatMapLatest — cancel previous, keep latest”When a new value arrives, cancel the previous inner flow:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { flow { emit(1); delay(50) emit(2); delay(50) emit(3) } .flatMapLatest { id -> flow { emit("Start $id") delay(200) // This gets cancelled when next value arrives emit("End $id") } } .collect { println(it) } // Start 1 (1 emitted, starts inner flow) // Start 2 (2 emitted after 50ms, cancels inner flow for 1) // Start 3 (3 emitted after 50ms, cancels inner flow for 2) // End 3 (only 3's inner flow completes)}RxJS equivalent: switchMap — the most-used operator for search-as-you-type,
autocomplete, etc.
When to use which FlatMap
Section titled “When to use which FlatMap”| Operator | Behavior | Use Case | RxJS Equivalent |
|---|---|---|---|
flatMapConcat | Sequential, in order | Ordered API calls, migrations | concatMap |
flatMapMerge | Concurrent, interleaved | Batch processing, fan-out | mergeMap |
flatMapLatest | Cancel previous | Search autocomplete, latest data | switchMap |
Context & threading: flowOn
Section titled “Context & threading: flowOn”The problem
Section titled “The problem”Flow operators run in the collector’s context by default:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // runs on main thread flow { println("Emitting on: ${Thread.currentThread().name}") // main emit(1) } .map { println("Mapping on: ${Thread.currentThread().name}") // main it * 2 } .collect { println("Collecting on: ${Thread.currentThread().name}") // main println("Value: $it") }}flowOn changes upstream context
Section titled “flowOn changes upstream context”flowOn changes the dispatcher for everything above it in the chain:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { flow { println("Emitting on: ${Thread.currentThread().name}") // IO thread emit(readFileFromDisk()) } .map { println("Mapping on: ${Thread.currentThread().name}") // IO thread parseData(it) } .flowOn(Dispatchers.IO) // Everything ABOVE runs on IO .map { println("Processing on: ${Thread.currentThread().name}") // Default thread processData(it) } .flowOn(Dispatchers.Default) // This map runs on Default .collect { println("Collecting on: ${Thread.currentThread().name}") // main println("Result: $it") }}
fun readFileFromDisk(): String = "file-content"fun parseData(s: String): String = s.uppercase()fun processData(s: String): String = "processed: $s"Each flowOn reassigns the dispatcher for the segment of the chain above it,
leaving the terminal collect on the original context:
flowchart LR
subgraph IO["Dispatchers.IO"]
E["emit(readFile)"] --> M1["map { parse }"]
end
subgraph DEF["Dispatchers.Default"]
M2["map { process }"]
end
subgraph MAIN["main"]
C["collect"]
end
M1 --> M2 --> C
Comparison: thread/context switching
Section titled “Comparison: thread/context switching”// Everything runs on the event loopconst result = await process(data);
// For CPU-bound: spawn worker threadconst worker = new Worker('./process.js');// Go doesn't let you control which thread a goroutine runs on.// The runtime handles scheduling across GOMAXPROCS threads.go process(data) // runtime decides the thread// Explicit: emit on IO, process on Default, collect on Mainflow { emit(readFile()) } .flowOn(Dispatchers.IO) .map { process(it) } .flowOn(Dispatchers.Default) .collect { updateUI(it) }Key Differences:
- TS: Everything runs on the event loop; CPU-bound work needs explicit worker threads.
- Go: Goroutines are scheduled by the runtime across
GOMAXPROCSthreads — no explicit thread control. - Kotlin: Explicit control via dispatchers and
flowOn.
Backpressure handling
Section titled “Backpressure handling”Backpressure occurs when the producer is faster than the consumer. Flow handles this naturally (collector controls the pace), but sometimes you need more control.
Default behavior: sequential
Section titled “Default behavior: sequential”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val start = System.currentTimeMillis() fun elapsed() = System.currentTimeMillis() - start
flow { for (i in 1..3) { delay(100) // produces every 100ms emit(i) println("Emitted $i at ${elapsed()}ms") } }.collect { value -> delay(300) // consumes every 300ms println("Collected $value at ${elapsed()}ms") }}// Emitted 1 at ~100ms// Collected 1 at ~400ms (waits for consumer)// Emitted 2 at ~500ms (producer blocked by slow consumer)// Collected 2 at ~800ms// Emitted 3 at ~900ms// Collected 3 at ~1200ms// Total: ~1200ms (sequential)buffer — run producer and consumer concurrently
Section titled “buffer — run producer and consumer concurrently”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val start = System.currentTimeMillis() fun elapsed() = System.currentTimeMillis() - start
flow { for (i in 1..3) { delay(100) emit(i) println("Emitted $i at ${elapsed()}ms") } } .buffer() // Buffer emissions — producer doesn't wait for consumer .collect { value -> delay(300) println("Collected $value at ${elapsed()}ms") }}// Emitted 1 at ~100ms// Emitted 2 at ~200ms (producer runs ahead!)// Emitted 3 at ~300ms// Collected 1 at ~400ms// Collected 2 at ~700ms// Collected 3 at ~1000ms// Total: ~1000ms (producer runs ahead of consumer)conflate — drop intermediate values
Section titled “conflate — drop intermediate values”Keep only the latest value when the consumer is slow:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val start = System.currentTimeMillis() fun elapsed() = System.currentTimeMillis() - start
flow { for (i in 1..5) { delay(100) emit(i) println("Emitted $i at ${elapsed()}ms") } } .conflate() // Drop intermediate values .collect { value -> delay(300) println("Collected $value at ${elapsed()}ms") }}// Emitted 1 at ~100ms// Emitted 2 at ~200ms// Emitted 3 at ~300ms// Collected 1 at ~400ms (started with 1)// Emitted 4 at ~400ms// Emitted 5 at ~500ms// Collected 5 at ~700ms (skipped 2, 3, 4 — only got latest)RxJS equivalent: throttle / sample
collectLatest — cancel slow processing
Section titled “collectLatest — cancel slow processing”When a new value arrives, cancel the previous collection:
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val start = System.currentTimeMillis() fun elapsed() = System.currentTimeMillis() - start
flow { for (i in 1..5) { delay(100) emit(i) } } .collectLatest { value -> println("Started processing $value at ${elapsed()}ms") delay(300) // slow processing — gets cancelled by next value println("Finished processing $value at ${elapsed()}ms") }}// Started processing 1 at ~100ms// Started processing 2 at ~200ms (cancelled processing of 1)// Started processing 3 at ~300ms (cancelled processing of 2)// Started processing 4 at ~400ms (cancelled processing of 3)// Started processing 5 at ~500ms (cancelled processing of 4)// Finished processing 5 at ~800ms (only last one completes)RxJS equivalent: switchMap behavior on the consumer side
Backpressure summary
Section titled “Backpressure summary”| Strategy | Behavior | When to Use | RxJS Equivalent |
|---|---|---|---|
| Default (none) | Producer waits for consumer | Data integrity critical | Implicit backpressure |
buffer() | Producer runs ahead, results queued | Speed matters, can handle memory | N/A (RxJS is push-based) |
conflate() | Drop intermediate values | Only latest matters (telemetry, ticks) | throttle, sample |
collectLatest | Cancel previous processing | Search/autocomplete, UI updates | switchMap |
Hot streams: SharedFlow & StateFlow
Section titled “Hot streams: SharedFlow & StateFlow”Cold flows (regular Flow) produce a new stream for each collector. Hot flows
share a single stream across multiple collectors.
SharedFlow — event bus
Section titled “SharedFlow — event bus”SharedFlow emits values to all active collectors. Values emitted before
collection starts are lost (unless replayed).
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // replay = 0: no history, only new values // replay = 1: new collectors get the last emitted value val events = MutableSharedFlow<String>(replay = 0)
// Collector 1 val job1 = launch { events.collect { println("Collector 1: $it") } }
// Collector 2 val job2 = launch { events.collect { println("Collector 2: $it") } }
delay(100) // Give collectors time to start
// Emit events — both collectors receive them events.emit("UserLoggedIn") events.emit("OrderPlaced") events.emit("PaymentProcessed")
delay(100) job1.cancel() job2.cancel()}// Collector 1: UserLoggedIn// Collector 2: UserLoggedIn// Collector 1: OrderPlaced// Collector 2: OrderPlaced// Collector 1: PaymentProcessed// Collector 2: PaymentProcessedA single emitter fans out to every active collector:
flowchart LR E["MutableSharedFlow.emit"] --> C1["Collector 1"] E --> C2["Collector 2"] E --> C3["Collector N"]
RxJS equivalent: Subject (replay=0) or ReplaySubject (replay>0)
Go equivalent: Fan-out pattern — but you’d need to manually manage multiple channels:
// Go: manual fan-out to multiple consumerssubscribers := make([]chan string, 0)func broadcast(msg string) { for _, ch := range subscribers { ch <- msg }}StateFlow — observable state
Section titled “StateFlow — observable state”StateFlow always has a current value. New collectors immediately get the latest
value. Only emits when the value changes (deduplication).
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
data class AppState(val count: Int, val status: String)
fun main() = runBlocking { val state = MutableStateFlow(AppState(count = 0, status = "idle"))
// Observer val observer = launch { state.collect { println("State: $it") } }
delay(100)
// Update state state.value = AppState(count = 1, status = "loading") delay(50) state.value = AppState(count = 1, status = "loading") // No emission — same value delay(50) state.value = AppState(count = 2, status = "done")
delay(100) observer.cancel()}// State: AppState(count=0, status=idle) (initial value)// State: AppState(count=1, status=loading)// State: AppState(count=2, status=done)// (No duplicate emission for the repeated loading state)RxJS equivalent: BehaviorSubject
StateFlow with update (atomic modify)
Section titled “StateFlow with update (atomic modify)”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { val counter = MutableStateFlow(0)
// Atomic update — thread-safe val jobs = (1..100).map { launch(Dispatchers.Default) { repeat(100) { counter.update { current -> current + 1 } } } } jobs.forEach { it.join() } println("Counter: ${counter.value}") // 10000 — always correct}SharedFlow vs StateFlow
Section titled “SharedFlow vs StateFlow”| Feature | SharedFlow | StateFlow |
|---|---|---|
| Initial value | No | Yes (required) |
| Current value access | No .value | .value always available |
| Deduplication | No | Yes (no emit on same value) |
| Replay | Configurable (0, 1, N) | Always 1 (latest value) |
| Use case | Events (click, navigation) | State (loading, data, error) |
| RxJS equivalent | Subject / ReplaySubject | BehaviorSubject |
Converting cold flow to hot
Section titled “Converting cold flow to hot”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // Cold flow — each collector gets its own stream val coldFlow = flow { println("Cold flow started") // Prints for EACH collector emit(1) emit(2) emit(3) }
// Convert to shared (hot) flow val hotFlow = coldFlow.shareIn( scope = this, started = SharingStarted.Lazily, // Start when first collector subscribes replay = 1, // New collectors get the last value )
// Both collectors share the SAME upstream flow launch { hotFlow.take(3).collect { println("A: $it") } } launch { hotFlow.take(3).collect { println("B: $it") } }
delay(500) coroutineContext.cancelChildren()}SharingStarted strategies
Section titled “SharingStarted strategies”| Strategy | Behavior | Use Case |
|---|---|---|
SharingStarted.Eagerly | Start immediately | Always-on event streams |
SharingStarted.Lazily | Start on first collector | On-demand data sources |
SharingStarted.WhileSubscribed(stopTimeout, replayExpiry) | Active while collectors exist | Resource-efficient, stops when unused |
stateIn — convert to StateFlow
Section titled “stateIn — convert to StateFlow”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun priceStream(): Flow<Double> = flow { var price = 100.0 while (true) { delay(1000) price += (-5..5).random() emit(price) }}
fun main() = runBlocking { // Convert cold flow to StateFlow with initial value val currentPrice: StateFlow<Double> = priceStream() .stateIn( scope = this, started = SharingStarted.WhileSubscribed(5000), initialValue = 100.0, )
// Access current value at any time println("Current price: ${currentPrice.value}")
// Collect updates val job = launch { currentPrice.take(5).collect { println("Price update: $it") } } job.join() coroutineContext.cancelChildren()}Exception handling
Section titled “Exception handling”catch operator
Section titled “catch operator”catch handles exceptions from upstream operators (above it in the chain):
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun riskyFlow(): Flow<Int> = flow { emit(1) emit(2) throw RuntimeException("Something went wrong") emit(3) // Never reached}
fun main() = runBlocking { riskyFlow() .catch { e -> println("Caught: ${e.message}") emit(-1) // Can emit a fallback value } .collect { println("Got: $it") }}// Got: 1// Got: 2// Caught: Something went wrong// Got: -1import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { flowOf(1, 2, 3) .catch { println("This won't catch collector errors") } .collect { value -> if (value == 2) throw RuntimeException("Collector error!") println(value) } // 1 // Exception in thread "main" RuntimeException: Collector error!}
// To handle collector errors, use onEach + catch:fun main2() = runBlocking { flowOf(1, 2, 3) .onEach { value -> if (value == 2) throw RuntimeException("Error!") println(value) } .catch { println("Caught: ${it.message}") } .collect() // 1 // Caught: Error!}retry and retryWhen
Section titled “retry and retryWhen”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
var attempt = 0
fun unreliableFlow(): Flow<String> = flow { attempt++ if (attempt < 3) { throw RuntimeException("Attempt $attempt failed") } emit("Success on attempt $attempt")}
fun main() = runBlocking { // Simple retry unreliableFlow() .retry(retries = 3) // Retry up to 3 times .catch { println("All retries failed: ${it.message}") } .collect { println(it) } // Success on attempt 3
// Conditional retry with delay attempt = 0 unreliableFlow() .retryWhen { cause, attempt -> println("Retry $attempt due to: ${cause.message}") delay(100 * (attempt + 1)) // exponential-ish backoff attempt < 5 // return true to retry, false to give up } .collect { println(it) }}Comparison: error handling in streams
Section titled “Comparison: error handling in streams”source$.pipe( catchError(err => { console.log('Error:', err); return of('fallback'); // emit fallback }), retry(3),);for val := range ch { result, err := process(val) if err != nil { log.Printf("Error: %v", err) continue // skip and continue } output <- result}source .catch { emit("fallback") } .retry(3) .collect { println(it) }Key Differences:
- TS/RxJS:
catchErrorreturns a replacement Observable;retry(3)resubscribes on error. - Go: No stream-level error channel — you check
errper value andcontinue. - Kotlin:
catchcanemita fallback;retryre-runs the upstream flow.
Flow lifecycle
Section titled “Flow lifecycle”onStart, onCompletion, onEmpty
Section titled “onStart, onCompletion, onEmpty”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { flowOf(1, 2, 3) .onStart { println("Flow starting...") } .onEach { println("Processing: $it") } .onCompletion { cause -> if (cause == null) println("Flow completed successfully") else println("Flow failed: ${cause.message}") } .collect() // Flow starting... // Processing: 1 // Processing: 2 // Processing: 3 // Flow completed successfully
println("---")
// onEmpty emptyFlow<Int>() .onEmpty { println("Flow was empty, emitting default") emit(0) } .collect { println("Got: $it") } // Flow was empty, emitting default // Got: 0}launchIn — collect in a separate coroutine
Section titled “launchIn — collect in a separate coroutine”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // Instead of: launch { myFlow.collect { process(it) } }
// You can write: myFlow .onEach { process(it) } .launchIn(this) // Returns Job — can be cancelled
delay(5000)}
val myFlow = flowOf(1, 2, 3)fun process(i: Int) = println("Process: $i")Converting between types
Section titled “Converting between types”Flow to/from collections
Section titled “Flow to/from collections”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // Collection → Flow val flow1 = listOf(1, 2, 3).asFlow()
// Flow → Collection val list: List<Int> = flowOf(1, 2, 3).toList() val set: Set<Int> = flowOf(1, 1, 2, 2, 3).toSet()}Flow to/from channel
Section titled “Flow to/from channel”import kotlinx.coroutines.*import kotlinx.coroutines.channels.*import kotlinx.coroutines.flow.*
fun main() = runBlocking { // Channel → Flow val channel = Channel<Int>() launch { channel.send(1) channel.send(2) channel.send(3) channel.close() } val flowFromChannel: Flow<Int> = channel.consumeAsFlow() flowFromChannel.collect { println(it) }
// Flow → Channel (via produce) val channelFromFlow: ReceiveChannel<Int> = produce { flowOf(10, 20, 30).collect { send(it) } } for (value in channelFromFlow) { println(value) }}Flow to/from sequence
Section titled “Flow to/from sequence”import kotlinx.coroutines.flow.*
fun main() { // Sequence → Flow val seq = sequence { yield(1) yield(2) yield(3) } val flow = seq.asFlow()}Testing flows
Section titled “Testing flows”Basic flow testing with runTest
Section titled “Basic flow testing with runTest”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*import kotlinx.coroutines.test.*import kotlin.test.*
class FlowTest {
@Test fun `test flow collects expected values`() = runTest { val result = flowOf(1, 2, 3) .map { it * 2 } .toList()
assertEquals(listOf(2, 4, 6), result) }
@Test fun `test flow with delay`() = runTest { // runTest auto-advances virtual time val result = flow { delay(1000); emit("a") delay(1000); emit("b") delay(1000); emit("c") }.toList()
assertEquals(listOf("a", "b", "c"), result) // Runs instantly — no real waiting }}Testing with Turbine
Section titled “Testing with Turbine”Turbine provides a cleaner API for testing flows:
import app.cash.turbine.*import kotlinx.coroutines.*import kotlinx.coroutines.flow.*import kotlinx.coroutines.test.*import kotlin.test.*
class TurbineTest {
@Test fun `test flow emissions with turbine`() = runTest { flowOf(1, 2, 3).test { assertEquals(1, awaitItem()) assertEquals(2, awaitItem()) assertEquals(3, awaitItem()) awaitComplete() } }
@Test fun `test error flow with turbine`() = runTest { flow { emit(1) throw RuntimeException("boom") }.test { assertEquals(1, awaitItem()) val error = awaitError() assertEquals("boom", error.message) } }
@Test fun `test SharedFlow with turbine`() = runTest { val sharedFlow = MutableSharedFlow<String>()
sharedFlow.test { sharedFlow.emit("hello") assertEquals("hello", awaitItem())
sharedFlow.emit("world") assertEquals("world", awaitItem()) } }}Testing StateFlow
Section titled “Testing StateFlow”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*import kotlinx.coroutines.test.*import kotlin.test.*
class CounterViewModel { private val _count = MutableStateFlow(0) val count: StateFlow<Int> = _count.asStateFlow()
fun increment() { _count.update { it + 1 } }
fun decrement() { _count.update { it - 1 } }}
class StateFlowTest {
@Test fun `test counter increments`() = runTest { val vm = CounterViewModel()
assertEquals(0, vm.count.value) // Initial value
vm.increment() assertEquals(1, vm.count.value)
vm.increment() vm.increment() assertEquals(3, vm.count.value)
vm.decrement() assertEquals(2, vm.count.value) }
@Test fun `test counter state flow with turbine`() = runTest { val vm = CounterViewModel()
vm.count.test { assertEquals(0, awaitItem()) // Initial
vm.increment() assertEquals(1, awaitItem())
vm.increment() assertEquals(2, awaitItem())
cancelAndIgnoreRemainingEvents() } }}Real-world patterns
Section titled “Real-world patterns”Pattern: search with debounce
Section titled “Pattern: search with debounce”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
// Simulate user typing — emits characters with varying delaysfun searchQueryFlow(): Flow<String> = flow { val inputs = listOf("k", "ko", "kot", "kotl", "kotli", "kotlin") for (input in inputs) { delay(100) // user types every 100ms emit(input) }}
fun searchApi(query: String): List<String> { return listOf("$query result 1", "$query result 2")}
@OptIn(FlowPreview::class)fun main() = runBlocking { searchQueryFlow() .debounce(300) // Wait 300ms after last keystroke .distinctUntilChanged() // Skip if query hasn't changed .filter { it.length >= 3 } // Min 3 characters .flatMapLatest { query -> // Cancel previous search flow { println("Searching for: $query") emit(searchApi(query)) } } .collect { results -> println("Results: $results") }}// Searching for: kotlin (only the final query, after debounce)// Results: [kotlin result 1, kotlin result 2]Pattern: polling with Flow
Section titled “Pattern: polling with Flow”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
fun pollEndpoint(intervalMs: Long): Flow<String> = flow { while (true) { val response = fetchStatus() // suspend function emit(response) delay(intervalMs) }}
suspend fun fetchStatus(): String { delay(50) // simulate network return "status: OK at ${System.currentTimeMillis()}"}
fun main() = runBlocking { pollEndpoint(1000) .take(5) .collect { println(it) }}Pattern: batch processing
Section titled “Pattern: batch processing”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
// Process items in batchesfun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow { val buffer = mutableListOf<T>() collect { value -> buffer.add(value) if (buffer.size >= size) { emit(buffer.toList()) buffer.clear() } } if (buffer.isNotEmpty()) { emit(buffer.toList()) }}
fun main() = runBlocking { (1..25).asFlow() .chunked(10) .collect { batch -> println("Processing batch of ${batch.size}: $batch") delay(100) // simulate batch insert } // Processing batch of 10: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] // Processing batch of 10: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20] // Processing batch of 5: [21, 22, 23, 24, 25]}Pattern: event processing pipeline
Section titled “Pattern: event processing pipeline”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
data class Event(val type: String, val payload: Map<String, String>, val timestamp: Long)data class ProcessedEvent(val type: String, val summary: String, val processedAt: Long)
fun eventSource(): Flow<Event> = flow { val events = listOf( Event("order.created", mapOf("orderId" to "123", "amount" to "99.99"), System.currentTimeMillis()), Event("user.login", mapOf("userId" to "alice"), System.currentTimeMillis()), Event("order.created", mapOf("orderId" to "124", "amount" to "49.99"), System.currentTimeMillis()), Event("order.shipped", mapOf("orderId" to "123"), System.currentTimeMillis()), Event("user.login", mapOf("userId" to "bob"), System.currentTimeMillis()), ) for (event in events) { delay(100) emit(event) }}
fun main() = runBlocking { eventSource() .filter { it.type.startsWith("order.") } .map { event -> ProcessedEvent( type = event.type, summary = "Order event: ${event.payload}", processedAt = System.currentTimeMillis(), ) } .onEach { println("Processing: ${it.type}") } .buffer() // Don't slow down the source .collect { println(" -> ${it.summary}") }}Pattern: parallel processing with flatMapMerge
Section titled “Pattern: parallel processing with flatMapMerge”import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
suspend fun processItem(id: Int): String { delay((100L..500L).random()) // variable processing time return "Result-$id"}
fun main() = runBlocking { val start = System.currentTimeMillis()
(1..20).asFlow() .flatMapMerge(concurrency = 5) { id -> // Process 5 at a time flow { emit(processItem(id)) } } .collect { println(it) }
println("Total time: ${System.currentTimeMillis() - start}ms") // Much faster than sequential — 5 items processed concurrently}Practice
Section titled “Practice”Put Flow to work — build a real pipeline and a real event bus.