Skip to content

Flow & Reactive Streams

You already know async streams from RxJS Observables and Go channel pipelines. Kotlin’s Flow is the same idea: a cold asynchronous stream with built-in backpressure. This module maps Flow — its builders, operators, hot variants (SharedFlow/StateFlow), and error handling — onto the reactive tools you already reach for.

Same dependencies as Module 05: Coroutineskotlinx-coroutines-core already includes Flow. Add the test helpers for working through this module:

build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
testImplementation("app.cash.turbine:turbine:1.2.0") // Flow testing
testImplementation(kotlin("test"))
}
application {
mainClass.set("MainKt")
}
tasks.test {
useJUnitPlatform()
}

You already know async streams. Here’s the mapping:

ConceptTypeScript (RxJS)GoKotlin
Cold streamObservableChannel + goroutine (manual)Flow
Hot streamSubject / BehaviorSubjectBroadcast channel patternSharedFlow / StateFlow
Subscribe.subscribe()for val := range ch.collect { }
Transform.pipe(map(...))goroutine that reads/writes.map { }
Backpressurethrottle, debounce, sampleBuffered channel back-pressurebuffer, conflate, collectLatest
Cancellation.unsubscribe()close(ch) / context cancelCancel the collecting coroutine
Error handlingcatchErrorCheck err in loop.catch { }
FeatureFlowChannel
NatureCold — nothing happens until collectedHot — produces regardless of consumers
ConsumersEach collector gets its own streamMultiple consumers share the same stream
Use caseData transformation pipeline, API responsesCommunication between coroutines
BackpressureBuilt-in (collector controls pace)Built-in (buffered or rendezvous)
CancellationCancelling collector stops productionMust close channel explicitly
AnalogyIterator that can suspendGo channel

Rule of thumb: Use Flow for data pipelines. Use Channel for coroutine-to-coroutine communication.

Cold streams: nothing happens until you collect

Section titled “Cold streams: nothing happens until you collect”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numbersFlow(): Flow<Int> = flow {
println("Flow started") // Only runs when collected
for (i in 1..3) {
delay(100)
emit(i)
}
println("Flow completed")
}
fun main() = runBlocking {
val myFlow = numbersFlow()
println("Flow created, nothing happening yet...")
println("Collecting first time:")
myFlow.collect { value -> println("Got: $value") }
println("\nCollecting second time:")
myFlow.collect { value -> println("Got again: $value") }
}
// Output:
// Flow created, nothing happening yet...
// Collecting first time:
// Flow started
// Got: 1
// Got: 2
// Got: 3
// Flow completed
//
// Collecting second time:
// Flow started
// Got again: 1
// Got again: 2
// Got again: 3
// Flow completed

Key insight: Each collect call runs the flow from scratch. This is like RxJS cold Observables — each subscriber gets its own independent stream.

import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(subscriber => {
console.log("Stream started");
for (let i = 1; i <= 3; i++) {
subscriber.next(i);
}
subscriber.complete();
});
numbers$.subscribe(val => console.log(`Got: ${val}`));

Key Differences:

  • TS/RxJS: Observable pushes values to subscriber. Complex operator chains via .pipe().
  • Go: Channel is hot — goroutine runs immediately. Manual cancellation via context.
  • Kotlin: Flow is cold, pull-based. Collector drives the pace. Operators are simple extension functions.
import kotlinx.coroutines.flow.*
// Build a flow by emitting values
val myFlow: Flow<String> = flow {
emit("Hello")
emit("World")
}
import kotlinx.coroutines.flow.*
// Like listOf() but for flows
val numbers = flowOf(1, 2, 3, 4, 5)
// Like Observable.of(1, 2, 3) in RxJS

.asFlow() — convert collections/sequences

Section titled “.asFlow() — convert collections/sequences”
import kotlinx.coroutines.flow.*
// From a list
val fromList: Flow<Int> = listOf(1, 2, 3).asFlow()
// From a range
val fromRange: Flow<Int> = (1..10).asFlow()
// From a sequence (lazy)
val fromSequence: Flow<Int> = sequence {
yield(1)
yield(2)
yield(3)
}.asFlow()

channelFlow { } — for concurrent emission

Section titled “channelFlow { } — for concurrent emission”

Regular flow {} is sequential — you can only emit from the flow’s own coroutine. channelFlow allows concurrent emission from multiple coroutines:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val concurrentFlow: Flow<String> = channelFlow {
// Launch concurrent producers
launch { send("from coroutine 1") }
launch { send("from coroutine 2") }
launch {
delay(100)
send("from coroutine 3")
}
}
fun main() = runBlocking {
concurrentFlow.collect { println(it) }
}

callbackFlow { } — bridging callback-based APIs

Section titled “callbackFlow { } — bridging callback-based APIs”
const fileChanges$ = new Observable<string>(subscriber => {
const watcher = watchDirectory(dir);
watcher.on('change', path => subscriber.next(path));
return () => watcher.close(); // cleanup on unsubscribe
});

Key Differences:

  • TS/RxJS: The Observable’s returned function is the teardown logic, run on unsubscribe.
  • Kotlin: trySend pushes non-blocking from the callback; awaitClose holds the flow open and runs the cleanup when the collector cancels.

Operators transform flows without collecting them. They return new Flow instances — nothing executes until collect is called.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..5).asFlow()
.map { it * it }
.collect { println(it) }
// 1, 4, 9, 16, 25
}

RxJS: source$.pipe(map(x => x * x)) Go: Separate goroutine reading from input channel, writing to output channel

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.filter { it % 2 == 0 }
.collect { println(it) }
// 2, 4, 6, 8, 10
}

transform — flexible transformation (emit 0, 1, or N values)

Section titled “transform — flexible transformation (emit 0, 1, or N values)”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..5).asFlow()
.transform { value ->
emit("Processing $value")
delay(100)
emit("Done with $value")
// Can also emit 0 items (acts like filter)
}
.collect { println(it) }
// Processing 1
// Done with 1
// Processing 2
// Done with 2
// ...
}

transform is like RxJS mergeMap/concatMap when you need to emit multiple values per input.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..100).asFlow()
.take(5)
.collect { println(it) }
// 1, 2, 3, 4, 5
// Flow is cancelled after 5 items
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.drop(3)
.collect { println(it) }
// 4, 5, 6, 7, 8, 9, 10
}

onEach — side effects without changing values

Section titled “onEach — side effects without changing values”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..5).asFlow()
.onEach { println("About to process: $it") }
.map { it * 2 }
.onEach { println("After doubling: $it") }
.collect()
}

Like RxJS tap.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// reduce: no initial value, uses first element as accumulator
val sum = (1..10).asFlow()
.reduce { accumulator, value -> accumulator + value }
println("Sum: $sum") // 55
// fold: with initial value
val product = (1..5).asFlow()
.fold(1) { acc, value -> acc * value }
println("Product: $product") // 120
}

toList, toSet — collect into collections

Section titled “toList, toSet — collect into collections”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val list: List<Int> = (1..5).asFlow()
.map { it * it }
.toList()
println(list) // [1, 4, 9, 16, 25]
val set: Set<Int> = flowOf(1, 2, 2, 3, 3, 3).toSet()
println(set) // [1, 2, 3]
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val first = (1..10).asFlow().first()
println("First: $first") // 1
val firstEven = (1..10).asFlow().first { it % 2 == 0 }
println("First even: $firstEven") // 2
val single = flowOf(42).single()
println("Single: $single") // 42
// Throws if flow has 0 or 2+ elements
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class LogEntry(val level: String, val message: String, val timestamp: Long)
fun logStream(): Flow<LogEntry> = flow {
val logs = listOf(
LogEntry("INFO", "Server started", 1000),
LogEntry("DEBUG", "Connection pool initialized", 1001),
LogEntry("ERROR", "Failed to connect to DB", 1002),
LogEntry("INFO", "Retrying connection", 1003),
LogEntry("ERROR", "Connection timeout", 1004),
LogEntry("INFO", "Connected successfully", 1005),
)
for (log in logs) {
delay(100)
emit(log)
}
}
fun main() = runBlocking {
logStream()
.filter { it.level == "ERROR" }
.map { "[${it.level}] ${it.message}" }
.onEach { println("Alert: $it") }
.toList()
.also { println("\nTotal errors: ${it.size}") }
}
// Alert: [ERROR] Failed to connect to DB
// Alert: [ERROR] Connection timeout
//
// Total errors: 2

This is the canonical Flow shape — a source feeding a chain of operators into a terminal collector:

Flow operator pipeline
Rendering diagram…

Combines one element from each flow into a pair. Completes when the shorter flow ends.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val names = flowOf("Alice", "Bob", "Charlie")
val ages = flowOf(30, 25, 35)
names.zip(ages) { name, age -> "$name is $age" }
.collect { println(it) }
// Alice is 30
// Bob is 25
// Charlie is 35
}

RxJS equivalent: zip(names$, ages$).pipe(map(([name, age]) => ...))

Takes the latest value from each flow whenever either emits. Useful for merging state.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow1 = flow {
emit("A1"); delay(100)
emit("A2"); delay(200)
emit("A3")
}
val flow2 = flow {
emit("B1"); delay(150)
emit("B2"); delay(150)
emit("B3")
}
flow1.combine(flow2) { a, b -> "$a + $b" }
.collect { println(it) }
// A1 + B1
// A2 + B1
// A2 + B2
// A3 + B2
// A3 + B3
}

RxJS equivalent: combineLatest([flow1$, flow2$])

All values from all flows in arrival order:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow1 = flow {
delay(100); emit("A")
delay(200); emit("B")
}
val flow2 = flow {
delay(150); emit("1")
delay(150); emit("2")
}
merge(flow1, flow2).collect { println(it) }
// A (at 100ms)
// 1 (at 150ms)
// B (at 300ms)
// 2 (at 300ms)
}

RxJS equivalent: merge(flow1$, flow2$) Go equivalent: Fan-in pattern — multiple goroutines writing to one channel

OperationRxJSGoKotlin Flow
Pair by positionzip()Manual sync with 2 channelszip()
Latest from eachcombineLatest()Manual with goroutines + selectcombine()
Merge allmerge()Fan-in: N goroutines → 1 channelmerge()
Concatconcat()Read ch1 fully, then ch2flatMapConcat or emptyFlow + ...

FlatMap operators transform each value into a flow, then flatten the results. This is where the real power is for async pipelines.

flatMapConcat — sequential (one at a time)

Section titled “flatMapConcat — sequential (one at a time)”

Each inner flow completes before the next starts:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun fetchDetails(id: Int): Flow<String> = flow {
delay(100)
emit("Details for $id (part 1)")
delay(100)
emit("Details for $id (part 2)")
}
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { id -> fetchDetails(id) }
.collect { println(it) }
// Details for 1 (part 1)
// Details for 1 (part 2)
// Details for 2 (part 1)
// Details for 2 (part 2)
// Details for 3 (part 1)
// Details for 3 (part 2)
}

RxJS equivalent: concatMap

All inner flows run concurrently:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapMerge { id -> fetchDetails(id) }
.collect { println(it) }
// Results interleaved — all three running concurrently
// Details for 1 (part 1)
// Details for 2 (part 1)
// Details for 3 (part 1)
// Details for 1 (part 2)
// Details for 2 (part 2)
// Details for 3 (part 2)
}
// Control concurrency:
flowOf(1, 2, 3)
.flatMapMerge(concurrency = 2) { id -> fetchDetails(id) }
.collect { println(it) }

RxJS equivalent: mergeMap (with optional concurrent parameter)

flatMapLatest — cancel previous, keep latest

Section titled “flatMapLatest — cancel previous, keep latest”

When a new value arrives, cancel the previous inner flow:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flow {
emit(1); delay(50)
emit(2); delay(50)
emit(3)
}
.flatMapLatest { id ->
flow {
emit("Start $id")
delay(200) // This gets cancelled when next value arrives
emit("End $id")
}
}
.collect { println(it) }
// Start 1 (1 emitted, starts inner flow)
// Start 2 (2 emitted after 50ms, cancels inner flow for 1)
// Start 3 (3 emitted after 50ms, cancels inner flow for 2)
// End 3 (only 3's inner flow completes)
}

RxJS equivalent: switchMap — the most-used operator for search-as-you-type, autocomplete, etc.

OperatorBehaviorUse CaseRxJS Equivalent
flatMapConcatSequential, in orderOrdered API calls, migrationsconcatMap
flatMapMergeConcurrent, interleavedBatch processing, fan-outmergeMap
flatMapLatestCancel previousSearch autocomplete, latest dataswitchMap

Flow operators run in the collector’s context by default:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking { // runs on main thread
flow {
println("Emitting on: ${Thread.currentThread().name}") // main
emit(1)
}
.map {
println("Mapping on: ${Thread.currentThread().name}") // main
it * 2
}
.collect {
println("Collecting on: ${Thread.currentThread().name}") // main
println("Value: $it")
}
}

flowOn changes the dispatcher for everything above it in the chain:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flow {
println("Emitting on: ${Thread.currentThread().name}") // IO thread
emit(readFileFromDisk())
}
.map {
println("Mapping on: ${Thread.currentThread().name}") // IO thread
parseData(it)
}
.flowOn(Dispatchers.IO) // Everything ABOVE runs on IO
.map {
println("Processing on: ${Thread.currentThread().name}") // Default thread
processData(it)
}
.flowOn(Dispatchers.Default) // This map runs on Default
.collect {
println("Collecting on: ${Thread.currentThread().name}") // main
println("Result: $it")
}
}
fun readFileFromDisk(): String = "file-content"
fun parseData(s: String): String = s.uppercase()
fun processData(s: String): String = "processed: $s"

Each flowOn reassigns the dispatcher for the segment of the chain above it, leaving the terminal collect on the original context:

flowOn dispatcher boundaries
Rendering diagram…
// Everything runs on the event loop
const result = await process(data);
// For CPU-bound: spawn worker thread
const worker = new Worker('./process.js');

Key Differences:

  • TS: Everything runs on the event loop; CPU-bound work needs explicit worker threads.
  • Go: Goroutines are scheduled by the runtime across GOMAXPROCS threads — no explicit thread control.
  • Kotlin: Explicit control via dispatchers and flowOn.

Backpressure occurs when the producer is faster than the consumer. Flow handles this naturally (collector controls the pace), but sometimes you need more control.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val start = System.currentTimeMillis()
fun elapsed() = System.currentTimeMillis() - start
flow {
for (i in 1..3) {
delay(100) // produces every 100ms
emit(i)
println("Emitted $i at ${elapsed()}ms")
}
}.collect { value ->
delay(300) // consumes every 300ms
println("Collected $value at ${elapsed()}ms")
}
}
// Emitted 1 at ~100ms
// Collected 1 at ~400ms (waits for consumer)
// Emitted 2 at ~500ms (producer blocked by slow consumer)
// Collected 2 at ~800ms
// Emitted 3 at ~900ms
// Collected 3 at ~1200ms
// Total: ~1200ms (sequential)

buffer — run producer and consumer concurrently

Section titled “buffer — run producer and consumer concurrently”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val start = System.currentTimeMillis()
fun elapsed() = System.currentTimeMillis() - start
flow {
for (i in 1..3) {
delay(100)
emit(i)
println("Emitted $i at ${elapsed()}ms")
}
}
.buffer() // Buffer emissions — producer doesn't wait for consumer
.collect { value ->
delay(300)
println("Collected $value at ${elapsed()}ms")
}
}
// Emitted 1 at ~100ms
// Emitted 2 at ~200ms (producer runs ahead!)
// Emitted 3 at ~300ms
// Collected 1 at ~400ms
// Collected 2 at ~700ms
// Collected 3 at ~1000ms
// Total: ~1000ms (producer runs ahead of consumer)

Keep only the latest value when the consumer is slow:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val start = System.currentTimeMillis()
fun elapsed() = System.currentTimeMillis() - start
flow {
for (i in 1..5) {
delay(100)
emit(i)
println("Emitted $i at ${elapsed()}ms")
}
}
.conflate() // Drop intermediate values
.collect { value ->
delay(300)
println("Collected $value at ${elapsed()}ms")
}
}
// Emitted 1 at ~100ms
// Emitted 2 at ~200ms
// Emitted 3 at ~300ms
// Collected 1 at ~400ms (started with 1)
// Emitted 4 at ~400ms
// Emitted 5 at ~500ms
// Collected 5 at ~700ms (skipped 2, 3, 4 — only got latest)

RxJS equivalent: throttle / sample

When a new value arrives, cancel the previous collection:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val start = System.currentTimeMillis()
fun elapsed() = System.currentTimeMillis() - start
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.collectLatest { value ->
println("Started processing $value at ${elapsed()}ms")
delay(300) // slow processing — gets cancelled by next value
println("Finished processing $value at ${elapsed()}ms")
}
}
// Started processing 1 at ~100ms
// Started processing 2 at ~200ms (cancelled processing of 1)
// Started processing 3 at ~300ms (cancelled processing of 2)
// Started processing 4 at ~400ms (cancelled processing of 3)
// Started processing 5 at ~500ms (cancelled processing of 4)
// Finished processing 5 at ~800ms (only last one completes)

RxJS equivalent: switchMap behavior on the consumer side

StrategyBehaviorWhen to UseRxJS Equivalent
Default (none)Producer waits for consumerData integrity criticalImplicit backpressure
buffer()Producer runs ahead, results queuedSpeed matters, can handle memoryN/A (RxJS is push-based)
conflate()Drop intermediate valuesOnly latest matters (telemetry, ticks)throttle, sample
collectLatestCancel previous processingSearch/autocomplete, UI updatesswitchMap

Cold flows (regular Flow) produce a new stream for each collector. Hot flows share a single stream across multiple collectors.

SharedFlow emits values to all active collectors. Values emitted before collection starts are lost (unless replayed).

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// replay = 0: no history, only new values
// replay = 1: new collectors get the last emitted value
val events = MutableSharedFlow<String>(replay = 0)
// Collector 1
val job1 = launch {
events.collect { println("Collector 1: $it") }
}
// Collector 2
val job2 = launch {
events.collect { println("Collector 2: $it") }
}
delay(100) // Give collectors time to start
// Emit events — both collectors receive them
events.emit("UserLoggedIn")
events.emit("OrderPlaced")
events.emit("PaymentProcessed")
delay(100)
job1.cancel()
job2.cancel()
}
// Collector 1: UserLoggedIn
// Collector 2: UserLoggedIn
// Collector 1: OrderPlaced
// Collector 2: OrderPlaced
// Collector 1: PaymentProcessed
// Collector 2: PaymentProcessed

A single emitter fans out to every active collector:

SharedFlow fan-out
Rendering diagram…

RxJS equivalent: Subject (replay=0) or ReplaySubject (replay>0)

Go equivalent: Fan-out pattern — but you’d need to manually manage multiple channels:

// Go: manual fan-out to multiple consumers
subscribers := make([]chan string, 0)
func broadcast(msg string) {
for _, ch := range subscribers {
ch <- msg
}
}

StateFlow always has a current value. New collectors immediately get the latest value. Only emits when the value changes (deduplication).

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class AppState(val count: Int, val status: String)
fun main() = runBlocking {
val state = MutableStateFlow(AppState(count = 0, status = "idle"))
// Observer
val observer = launch {
state.collect { println("State: $it") }
}
delay(100)
// Update state
state.value = AppState(count = 1, status = "loading")
delay(50)
state.value = AppState(count = 1, status = "loading") // No emission — same value
delay(50)
state.value = AppState(count = 2, status = "done")
delay(100)
observer.cancel()
}
// State: AppState(count=0, status=idle) (initial value)
// State: AppState(count=1, status=loading)
// State: AppState(count=2, status=done)
// (No duplicate emission for the repeated loading state)

RxJS equivalent: BehaviorSubject

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val counter = MutableStateFlow(0)
// Atomic update — thread-safe
val jobs = (1..100).map {
launch(Dispatchers.Default) {
repeat(100) {
counter.update { current -> current + 1 }
}
}
}
jobs.forEach { it.join() }
println("Counter: ${counter.value}") // 10000 — always correct
}
FeatureSharedFlowStateFlow
Initial valueNoYes (required)
Current value accessNo .value.value always available
DeduplicationNoYes (no emit on same value)
ReplayConfigurable (0, 1, N)Always 1 (latest value)
Use caseEvents (click, navigation)State (loading, data, error)
RxJS equivalentSubject / ReplaySubjectBehaviorSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Cold flow — each collector gets its own stream
val coldFlow = flow {
println("Cold flow started") // Prints for EACH collector
emit(1)
emit(2)
emit(3)
}
// Convert to shared (hot) flow
val hotFlow = coldFlow.shareIn(
scope = this,
started = SharingStarted.Lazily, // Start when first collector subscribes
replay = 1, // New collectors get the last value
)
// Both collectors share the SAME upstream flow
launch { hotFlow.take(3).collect { println("A: $it") } }
launch { hotFlow.take(3).collect { println("B: $it") } }
delay(500)
coroutineContext.cancelChildren()
}
StrategyBehaviorUse Case
SharingStarted.EagerlyStart immediatelyAlways-on event streams
SharingStarted.LazilyStart on first collectorOn-demand data sources
SharingStarted.WhileSubscribed(stopTimeout, replayExpiry)Active while collectors existResource-efficient, stops when unused
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun priceStream(): Flow<Double> = flow {
var price = 100.0
while (true) {
delay(1000)
price += (-5..5).random()
emit(price)
}
}
fun main() = runBlocking {
// Convert cold flow to StateFlow with initial value
val currentPrice: StateFlow<Double> = priceStream()
.stateIn(
scope = this,
started = SharingStarted.WhileSubscribed(5000),
initialValue = 100.0,
)
// Access current value at any time
println("Current price: ${currentPrice.value}")
// Collect updates
val job = launch {
currentPrice.take(5).collect { println("Price update: $it") }
}
job.join()
coroutineContext.cancelChildren()
}

catch handles exceptions from upstream operators (above it in the chain):

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("Something went wrong")
emit(3) // Never reached
}
fun main() = runBlocking {
riskyFlow()
.catch { e ->
println("Caught: ${e.message}")
emit(-1) // Can emit a fallback value
}
.collect { println("Got: $it") }
}
// Got: 1
// Got: 2
// Caught: Something went wrong
// Got: -1
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.catch { println("This won't catch collector errors") }
.collect { value ->
if (value == 2) throw RuntimeException("Collector error!")
println(value)
}
// 1
// Exception in thread "main" RuntimeException: Collector error!
}
// To handle collector errors, use onEach + catch:
fun main2() = runBlocking {
flowOf(1, 2, 3)
.onEach { value ->
if (value == 2) throw RuntimeException("Error!")
println(value)
}
.catch { println("Caught: ${it.message}") }
.collect()
// 1
// Caught: Error!
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
var attempt = 0
fun unreliableFlow(): Flow<String> = flow {
attempt++
if (attempt < 3) {
throw RuntimeException("Attempt $attempt failed")
}
emit("Success on attempt $attempt")
}
fun main() = runBlocking {
// Simple retry
unreliableFlow()
.retry(retries = 3) // Retry up to 3 times
.catch { println("All retries failed: ${it.message}") }
.collect { println(it) }
// Success on attempt 3
// Conditional retry with delay
attempt = 0
unreliableFlow()
.retryWhen { cause, attempt ->
println("Retry $attempt due to: ${cause.message}")
delay(100 * (attempt + 1)) // exponential-ish backoff
attempt < 5 // return true to retry, false to give up
}
.collect { println(it) }
}
source$.pipe(
catchError(err => {
console.log('Error:', err);
return of('fallback'); // emit fallback
}),
retry(3),
);

Key Differences:

  • TS/RxJS: catchError returns a replacement Observable; retry(3) resubscribes on error.
  • Go: No stream-level error channel — you check err per value and continue.
  • Kotlin: catch can emit a fallback; retry re-runs the upstream flow.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.onStart { println("Flow starting...") }
.onEach { println("Processing: $it") }
.onCompletion { cause ->
if (cause == null) println("Flow completed successfully")
else println("Flow failed: ${cause.message}")
}
.collect()
// Flow starting...
// Processing: 1
// Processing: 2
// Processing: 3
// Flow completed successfully
println("---")
// onEmpty
emptyFlow<Int>()
.onEmpty {
println("Flow was empty, emitting default")
emit(0)
}
.collect { println("Got: $it") }
// Flow was empty, emitting default
// Got: 0
}

launchIn — collect in a separate coroutine

Section titled “launchIn — collect in a separate coroutine”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Instead of:
launch {
myFlow.collect { process(it) }
}
// You can write:
myFlow
.onEach { process(it) }
.launchIn(this) // Returns Job — can be cancelled
delay(5000)
}
val myFlow = flowOf(1, 2, 3)
fun process(i: Int) = println("Process: $i")
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Collection → Flow
val flow1 = listOf(1, 2, 3).asFlow()
// Flow → Collection
val list: List<Int> = flowOf(1, 2, 3).toList()
val set: Set<Int> = flowOf(1, 1, 2, 2, 3).toSet()
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Channel → Flow
val channel = Channel<Int>()
launch {
channel.send(1)
channel.send(2)
channel.send(3)
channel.close()
}
val flowFromChannel: Flow<Int> = channel.consumeAsFlow()
flowFromChannel.collect { println(it) }
// Flow → Channel (via produce)
val channelFromFlow: ReceiveChannel<Int> = produce {
flowOf(10, 20, 30).collect { send(it) }
}
for (value in channelFromFlow) {
println(value)
}
}
import kotlinx.coroutines.flow.*
fun main() {
// Sequence → Flow
val seq = sequence {
yield(1)
yield(2)
yield(3)
}
val flow = seq.asFlow()
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class FlowTest {
@Test
fun `test flow collects expected values`() = runTest {
val result = flowOf(1, 2, 3)
.map { it * 2 }
.toList()
assertEquals(listOf(2, 4, 6), result)
}
@Test
fun `test flow with delay`() = runTest {
// runTest auto-advances virtual time
val result = flow {
delay(1000); emit("a")
delay(1000); emit("b")
delay(1000); emit("c")
}.toList()
assertEquals(listOf("a", "b", "c"), result)
// Runs instantly — no real waiting
}
}

Turbine provides a cleaner API for testing flows:

import app.cash.turbine.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class TurbineTest {
@Test
fun `test flow emissions with turbine`() = runTest {
flowOf(1, 2, 3).test {
assertEquals(1, awaitItem())
assertEquals(2, awaitItem())
assertEquals(3, awaitItem())
awaitComplete()
}
}
@Test
fun `test error flow with turbine`() = runTest {
flow {
emit(1)
throw RuntimeException("boom")
}.test {
assertEquals(1, awaitItem())
val error = awaitError()
assertEquals("boom", error.message)
}
}
@Test
fun `test SharedFlow with turbine`() = runTest {
val sharedFlow = MutableSharedFlow<String>()
sharedFlow.test {
sharedFlow.emit("hello")
assertEquals("hello", awaitItem())
sharedFlow.emit("world")
assertEquals("world", awaitItem())
}
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class CounterViewModel {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.update { it + 1 }
}
fun decrement() {
_count.update { it - 1 }
}
}
class StateFlowTest {
@Test
fun `test counter increments`() = runTest {
val vm = CounterViewModel()
assertEquals(0, vm.count.value) // Initial value
vm.increment()
assertEquals(1, vm.count.value)
vm.increment()
vm.increment()
assertEquals(3, vm.count.value)
vm.decrement()
assertEquals(2, vm.count.value)
}
@Test
fun `test counter state flow with turbine`() = runTest {
val vm = CounterViewModel()
vm.count.test {
assertEquals(0, awaitItem()) // Initial
vm.increment()
assertEquals(1, awaitItem())
vm.increment()
assertEquals(2, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Simulate user typing — emits characters with varying delays
fun searchQueryFlow(): Flow<String> = flow {
val inputs = listOf("k", "ko", "kot", "kotl", "kotli", "kotlin")
for (input in inputs) {
delay(100) // user types every 100ms
emit(input)
}
}
fun searchApi(query: String): List<String> {
return listOf("$query result 1", "$query result 2")
}
@OptIn(FlowPreview::class)
fun main() = runBlocking {
searchQueryFlow()
.debounce(300) // Wait 300ms after last keystroke
.distinctUntilChanged() // Skip if query hasn't changed
.filter { it.length >= 3 } // Min 3 characters
.flatMapLatest { query -> // Cancel previous search
flow {
println("Searching for: $query")
emit(searchApi(query))
}
}
.collect { results ->
println("Results: $results")
}
}
// Searching for: kotlin (only the final query, after debounce)
// Results: [kotlin result 1, kotlin result 2]
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun pollEndpoint(intervalMs: Long): Flow<String> = flow {
while (true) {
val response = fetchStatus() // suspend function
emit(response)
delay(intervalMs)
}
}
suspend fun fetchStatus(): String {
delay(50) // simulate network
return "status: OK at ${System.currentTimeMillis()}"
}
fun main() = runBlocking {
pollEndpoint(1000)
.take(5)
.collect { println(it) }
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Process items in batches
fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
collect { value ->
buffer.add(value)
if (buffer.size >= size) {
emit(buffer.toList())
buffer.clear()
}
}
if (buffer.isNotEmpty()) {
emit(buffer.toList())
}
}
fun main() = runBlocking {
(1..25).asFlow()
.chunked(10)
.collect { batch ->
println("Processing batch of ${batch.size}: $batch")
delay(100) // simulate batch insert
}
// Processing batch of 10: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Processing batch of 10: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// Processing batch of 5: [21, 22, 23, 24, 25]
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class Event(val type: String, val payload: Map<String, String>, val timestamp: Long)
data class ProcessedEvent(val type: String, val summary: String, val processedAt: Long)
fun eventSource(): Flow<Event> = flow {
val events = listOf(
Event("order.created", mapOf("orderId" to "123", "amount" to "99.99"), System.currentTimeMillis()),
Event("user.login", mapOf("userId" to "alice"), System.currentTimeMillis()),
Event("order.created", mapOf("orderId" to "124", "amount" to "49.99"), System.currentTimeMillis()),
Event("order.shipped", mapOf("orderId" to "123"), System.currentTimeMillis()),
Event("user.login", mapOf("userId" to "bob"), System.currentTimeMillis()),
)
for (event in events) {
delay(100)
emit(event)
}
}
fun main() = runBlocking {
eventSource()
.filter { it.type.startsWith("order.") }
.map { event ->
ProcessedEvent(
type = event.type,
summary = "Order event: ${event.payload}",
processedAt = System.currentTimeMillis(),
)
}
.onEach { println("Processing: ${it.type}") }
.buffer() // Don't slow down the source
.collect { println(" -> ${it.summary}") }
}

Pattern: parallel processing with flatMapMerge

Section titled “Pattern: parallel processing with flatMapMerge”
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun processItem(id: Int): String {
delay((100L..500L).random()) // variable processing time
return "Result-$id"
}
fun main() = runBlocking {
val start = System.currentTimeMillis()
(1..20).asFlow()
.flatMapMerge(concurrency = 5) { id -> // Process 5 at a time
flow {
emit(processItem(id))
}
}
.collect { println(it) }
println("Total time: ${System.currentTimeMillis() - start}ms")
// Much faster than sequential — 5 items processed concurrently
}

Put Flow to work — build a real pipeline and a real event bus.