Coroutines & Structured Concurrency
You already know concurrent programming — async/await in TypeScript, goroutines
in Go. This module shows you the Kotlin way: coroutines mapped to the primitives you
already use, plus Kotlin’s standout feature, structured concurrency, which gives
you parent-child lifecycle management you have to build by hand elsewhere.
Add coroutine dependencies to your build.gradle.kts:
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}Why Coroutines?
Section titled “Why Coroutines?”You already use concurrency daily. Here’s the mental model:
| Concept | TypeScript | Go | Kotlin |
|---|---|---|---|
| Lightweight task | async function + event loop | goroutine | coroutine |
| Concurrency model | Single-threaded event loop | M:N green threads (goroutines on OS threads) | Suspendable computations on thread pools |
| Blocking avoidance | Non-blocking by nature (everything is async) | Goroutines yield at I/O automatically | suspend functions yield cooperatively |
| Parallelism | Worker threads / Promise.all | goroutines + GOMAXPROCS | Dispatchers control thread pools |
| Structured lifecycle | No built-in hierarchy | No built-in hierarchy (errgroup helps) | Built-in: parent-child scope hierarchy |
How many can you run?
Section titled “How many can you run?”import kotlinx.coroutines.*
fun main() = runBlocking { val jobs = List(100_000) { launch { delay(1000) } } println("Launched ${jobs.size} coroutines") jobs.forEach { it.join() } println("All done")}100,000 coroutines use a few MB of memory. Try that with OS threads.
Your First Coroutine
Section titled “Your First Coroutine”Let’s do the same thing in all three languages: run two tasks concurrently, collect results.
async function fetchUser(): Promise<string> { await new Promise(r => setTimeout(r, 1000)); return "Alice";}
async function fetchOrder(): Promise<string> { await new Promise(r => setTimeout(r, 1000)); return "Order-42";}
async function main() { const start = Date.now(); const [user, order] = await Promise.all([fetchUser(), fetchOrder()]); console.log(`${user}, ${order} in ${Date.now() - start}ms`); // Alice, Order-42 in ~1000ms}
main();package main
import ( "fmt" "sync" "time")
func fetchUser(wg *sync.WaitGroup, result *string) { defer wg.Done() time.Sleep(1 * time.Second) *result = "Alice"}
func fetchOrder(wg *sync.WaitGroup, result *string) { defer wg.Done() time.Sleep(1 * time.Second) *result = "Order-42"}
func main() { start := time.Now() var user, order string var wg sync.WaitGroup wg.Add(2) go fetchUser(&wg, &user) go fetchOrder(&wg, &order) wg.Wait() fmt.Printf("%s, %s in %v\n", user, order, time.Since(start)) // Alice, Order-42 in ~1s}import kotlinx.coroutines.*
suspend fun fetchUser(): String { delay(1000) // non-blocking sleep (like setTimeout, not Thread.sleep) return "Alice"}
suspend fun fetchOrder(): String { delay(1000) return "Order-42"}
fun main() = runBlocking { val start = System.currentTimeMillis() val user = async { fetchUser() } val order = async { fetchOrder() } println("${user.await()}, ${order.await()} in ${System.currentTimeMillis() - start}ms") // Alice, Order-42 in ~1000ms}Key Differences:
- TS:
Promise.allruns tasks concurrently on the single event loop thread. - Go: goroutines +
WaitGroupfor synchronization; result passed via pointer. - Kotlin:
asyncreturns aDeferred<T>(like a typedPromise);.await()gets the result. - Kotlin’s
delay()is non-blocking — it suspends the coroutine, freeing the thread.
Structured Concurrency
Section titled “Structured Concurrency”This is Kotlin’s biggest advantage over Go and TypeScript for concurrent code. In Go, unsupervised goroutines get orphaned; in TS, floating promises become unhandled rejections. Kotlin makes the parent scope responsible for its children.
function handleRequest() { fetchFromDB(); // Floating promise — no one awaits it callExternalAPI(); // If this throws, it's an unhandled rejection sendNotification(); // Fire and forget — hope it works}func handleRequest() { go fetchFromDB() // Who cancels this if handleRequest returns? go callExternalAPI() // What if this panics? go sendNotification() // Does anyone wait for this? // All three goroutines are now orphaned}suspend fun handleRequest() = coroutineScope { val db = async { fetchFromDB() } // child of this scope val api = async { callExternalAPI() } // child of this scope val notification = launch { sendNotification() } // child of this scope
// coroutineScope waits for ALL children to complete // If any child fails, ALL siblings are cancelled // If this scope is cancelled, ALL children are cancelled val result = db.await() + api.await() result}Key Differences:
- TS / Go: leaked work is on you to track and cancel — every Go developer has dealt with leaked goroutines.
- Kotlin:
coroutineScopewon’t return until every child finishes, and failure or cancellation propagates automatically.
Rules of structured concurrency:
- Every coroutine has a parent scope.
- A parent scope does not complete until all children complete.
- If a parent is cancelled, all children are cancelled.
- If a child fails (uncaught exception), the parent is cancelled (and thus all siblings).
Scope hierarchy
Section titled “Scope hierarchy”A scope forms a tree. Cancellation and failure flow along its edges:
flowchart TB R["runBlocking (root scope)"] --> A["launch (Job A)"] R --> B["launch (Job B)"] A --> A1["async (Job A1)"] A --> A2["async (Job A2)"] B --> B1["launch (Job B1)"] B --> B2["launch (Job B2)"]
- Cancel Job A → A1 and A2 are cancelled; B is unaffected.
- Cancel root → everything is cancelled.
- A1 throws → A2 is cancelled, then A is cancelled (propagates up).
coroutineScope vs supervisorScope
Section titled “coroutineScope vs supervisorScope”coroutineScope fails fast: one child’s failure cancels its siblings.
supervisorScope isolates children: one failure leaves the others running.
import kotlinx.coroutines.*
// coroutineScope: one child fails → all siblings cancelledsuspend fun strictScope() = coroutineScope { launch { delay(1000) println("Task 1 done") // Never prints — cancelled by Task 2's failure } launch { delay(500) throw RuntimeException("Task 2 failed") }}
// supervisorScope: one child fails → siblings continuesuspend fun lenientScope() = supervisorScope { launch { delay(1000) println("Task 1 done") // DOES print — not affected by Task 2 } launch { delay(500) throw RuntimeException("Task 2 failed") // Exception is NOT propagated to parent }}
fun main() = runBlocking { try { strictScope() } catch (e: RuntimeException) { println("Caught from strict: ${e.message}") }
println("---")
lenientScope() println("Supervisor scope completed")}Coroutine Builders: launch, async, runBlocking
Section titled “Coroutine Builders: launch, async, runBlocking”| Builder | Returns | Use For | Blocks Thread? |
|---|---|---|---|
launch | Job | Fire-and-forget work (side effects) | No |
async | Deferred<T> | Concurrent computation returning a value | No |
runBlocking | T | Bridge from blocking to suspend world | Yes |
launch — fire and forget
Section titled “launch — fire and forget”import kotlinx.coroutines.*
fun main() = runBlocking { val job: Job = launch { println("Working on thread: ${Thread.currentThread().name}") delay(1000) println("Done!") } println("Launched") job.join() // Optional: wait for completion // Without join(), runBlocking still waits (structured concurrency)}| Pattern | TypeScript | Go | Kotlin |
|---|---|---|---|
| Fire and forget | void function call (dangerous) | go func() | launch { } |
| Wait for completion | Can’t easily (no handle) | wg.Wait() | job.join() |
| Cancel | No built-in way | context.WithCancel | job.cancel() |
async — get a result back
Section titled “async — get a result back”import kotlinx.coroutines.*
fun main() = runBlocking { val deferred: Deferred<Int> = async { delay(1000) 42 } println("Computing...") val result: Int = deferred.await() println("Result: $result")}Deferred<T> is Kotlin’s Promise<T> (TS) or the result you’d get from a channel (Go):
| Pattern | TypeScript | Go | Kotlin |
|---|---|---|---|
| Start concurrent computation | const p = fetchData() | ch := make(chan int); go func() { ch <- compute() }() | val d = async { compute() } |
| Get result | await p | result := <-ch | d.await() |
| Multiple concurrent | Promise.all([p1, p2]) | Read from multiple channels | awaitAll(d1, d2) |
Parallel decomposition with async
Section titled “Parallel decomposition with async”Launch independent fetches concurrently, then combine — total time is the slowest branch, not the sum.
import kotlinx.coroutines.*
data class UserProfile(val name: String, val orders: List<String>, val prefs: Map<String, String>)
suspend fun fetchName(): String { delay(300); return "Alice" }suspend fun fetchOrders(): List<String> { delay(500); return listOf("ORD-1", "ORD-2") }suspend fun fetchPrefs(): Map<String, String> { delay(200); return mapOf("theme" to "dark") }
suspend fun getUserProfile(): UserProfile = coroutineScope { val name = async { fetchName() } val orders = async { fetchOrders() } val prefs = async { fetchPrefs() }
// All three run concurrently — total time ≈ 500ms (max of the three) UserProfile(name.await(), orders.await(), prefs.await())}
fun main() = runBlocking { val start = System.currentTimeMillis() val profile = getUserProfile() println("$profile in ${System.currentTimeMillis() - start}ms") // UserProfile(name=Alice, orders=[ORD-1, ORD-2], prefs={theme=dark}) in ~500ms}runBlocking — bridge to the suspend world
Section titled “runBlocking — bridge to the suspend world”Use runBlocking only at the edges: main(), tests, and rare bridges from blocking
code to coroutines.
import kotlinx.coroutines.*
fun main() = runBlocking { // <-- Blocks the main thread until all children complete launch { delay(200); println("Task 1") } launch { delay(100); println("Task 2") } println("Started")}// Output:// Started// Task 2// Task 1Coroutine Context & Dispatchers
Section titled “Coroutine Context & Dispatchers”A dispatcher decides which thread (or pool) a coroutine runs on. Unlike Go, where the runtime schedules goroutines for you, Kotlin lets you choose explicitly.
| Dispatcher | Thread Pool | Use For | Go Equivalent | TS Equivalent |
|---|---|---|---|---|
Dispatchers.Default | Shared pool (CPU cores) | CPU-bound work (parsing, sorting, computation) | GOMAXPROCS worker threads | Worker threads |
Dispatchers.IO | Elastic pool (up to 64+ threads) | I/O-bound work (network, disk, DB) | goroutine on I/O | async/await (event loop) |
Dispatchers.Unconfined | No specific thread | Testing, rare edge cases | N/A | N/A |
Dispatchers.Main | Main/UI thread | Android/Desktop UI only — skip for backend | N/A | N/A |
Using dispatchers
Section titled “Using dispatchers”import kotlinx.coroutines.*
fun main() = runBlocking { launch(Dispatchers.Default) { // CPU-intensive work val result = (1..1_000_000).sumOf { it.toLong() } println("CPU work on: ${Thread.currentThread().name}") // DefaultDispatcher-worker-1 }
launch(Dispatchers.IO) { // I/O work (file, network, database) println("IO work on: ${Thread.currentThread().name}") // DefaultDispatcher-worker-2 (IO shares Default's pool) }
launch(Dispatchers.Unconfined) { println("Unconfined before delay: ${Thread.currentThread().name}") // main delay(100) println("Unconfined after delay: ${Thread.currentThread().name}") // some worker thread }}Switching context with withContext
Section titled “Switching context with withContext”import kotlinx.coroutines.*
suspend fun readFile(path: String): String = withContext(Dispatchers.IO) { // Switch to IO dispatcher for file read java.io.File(path).readText()}
suspend fun parseJson(json: String): Map<String, Any> = withContext(Dispatchers.Default) { // Switch to Default dispatcher for CPU-bound parsing // (simplified — use kotlinx.serialization in real code) mapOf("parsed" to json)}
fun main() = runBlocking { val content = readFile("/etc/hostname") val parsed = parseJson(content) println(parsed)}Custom dispatchers (limit parallelism)
Section titled “Custom dispatchers (limit parallelism)”limitedParallelism caps how many coroutines run at once — the Kotlin equivalent of
a Go worker pool built from a buffered channel.
// Go equivalent: buffered channel as semaphoresem := make(chan struct{}, 2) // limit to 2 concurrentfor i := 0; i < 10; i++ { go func(i int) { sem <- struct{}{} // acquire defer func() { <-sem }() // release doWork(i) }(i)}import kotlinx.coroutines.*
@OptIn(ExperimentalCoroutinesApi::class)fun main() = runBlocking { // Limit to 2 concurrent coroutines — like a semaphore/worker pool val limitedDispatcher = Dispatchers.IO.limitedParallelism(2)
val jobs = (1..10).map { i -> launch(limitedDispatcher) { println("Task $i started on ${Thread.currentThread().name}") delay(1000) println("Task $i done") } } jobs.forEach { it.join() }}// Only 2 tasks run simultaneously, even though 10 were launchedSuspending Functions
Section titled “Suspending Functions”suspend is Kotlin’s async keyword (TS) — it marks a function that can pause and
resume without blocking a thread.
// This function can suspend (yield the thread) at delay() and fetchFromNetwork()suspend fun processData(): String { delay(100) // suspends here — thread is free to do other work val data = fetchFromNetwork() // suspends here too return data.uppercase()}
suspend fun fetchFromNetwork(): String { delay(500) // simulate network call return "response-data"}Rules:
suspendfunctions can call othersuspendfunctions.suspendfunctions can only be called from coroutines or othersuspendfunctions.- Regular functions cannot call
suspendfunctions directly.
// COMPILE ERROR:fun regularFunction() { delay(100) // Error: suspend function 'delay' should be called only from a coroutine}
// OK:suspend fun suspendFunction() { delay(100) // Fine — we're in a suspend context}Marking async functions
Section titled “Marking async functions”// Must mark with 'async', returns Promise<T>async function getData(): Promise<string> { const response = await fetch("https://api.example.com/data"); return await response.text();}// No special marking — any function can be run as goroutine// But you need channels/WaitGroup for resultsfunc getData() string { resp, _ := http.Get("https://api.example.com/data") body, _ := io.ReadAll(resp.Body) return string(body)}
// To run concurrently:ch := make(chan string)go func() { ch <- getData() }()result := <-ch// Must mark with 'suspend', returns T directly (no wrapper type)suspend fun getData(): String { val response = httpClient.get("https://api.example.com/data") return response.bodyAsText()}Key difference: Kotlin suspend functions return the actual type (String), not
a wrapper (Promise<String>). The suspension is invisible to the caller — it looks
like synchronous code.
Channels
Section titled “Channels”Channels are the primary way to communicate between coroutines — just like Go channels.
ch := make(chan string, 10) // buffered channel, capacity 10ch <- "hello" // sendmsg := <-ch // receiveclose(ch) // closeimport kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun main() = runBlocking { val channel = Channel<String>(capacity = 10) // buffered channel, capacity 10
launch { channel.send("hello") // send (suspend if full) channel.close() // close }
val msg = channel.receive() // receive (suspend if empty) println(msg) // hello}Channel types
Section titled “Channel types”| Type | Kotlin | Go Equivalent | Behavior |
|---|---|---|---|
| Rendezvous | Channel<T>() or Channel<T>(0) | make(chan T) | Sender suspends until receiver is ready |
| Buffered | Channel<T>(10) | make(chan T, 10) | Sender suspends when buffer is full |
| Unlimited | Channel<T>(Channel.UNLIMITED) | N/A | Never suspends sender (unbounded buffer) |
| Conflated | Channel<T>(Channel.CONFLATED) | N/A | Keeps only latest value, drops old |
Producer-consumer pattern
Section titled “Producer-consumer pattern”func producer(ch chan<- int) { for i := 0; i < 5; i++ { ch <- i } close(ch)}
func main() { ch := make(chan int) go producer(ch) for val := range ch { // iterate until closed fmt.Println(val) }}import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun CoroutineScope.produceNumbers(): ReceiveChannel<Int> = produce { for (i in 0 until 5) { send(i) } // Channel is closed automatically when produce block completes}
fun main() = runBlocking { val numbers = produceNumbers() for (value in numbers) { // iterate until closed println(value) }}Iterating over channels
Section titled “Iterating over channels”import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun main() = runBlocking { val channel = Channel<Int>()
launch { for (i in 1..5) { channel.send(i) } channel.close() }
// Method 1: for loop (like Go's range) for (value in channel) { println(value) }
// Method 2: consumeEach (auto-closes on completion or exception) val channel2 = Channel<Int>() launch { for (i in 1..5) channel2.send(i) channel2.close() } channel2.consumeEach { println(it) }}Select expression (like Go’s select)
Section titled “Select expression (like Go’s select)”select {case msg := <-ch1: fmt.Println("From ch1:", msg)case msg := <-ch2: fmt.Println("From ch2:", msg)case <-time.After(1 * time.Second): fmt.Println("Timeout")}import kotlinx.coroutines.*import kotlinx.coroutines.channels.*import kotlinx.coroutines.selects.*
fun main() = runBlocking { val ch1 = Channel<String>() val ch2 = Channel<String>()
launch { delay(100); ch1.send("from ch1") } launch { delay(200); ch2.send("from ch2") }
val result = select { ch1.onReceive { "Ch1: $it" } ch2.onReceive { "Ch2: $it" } onTimeout(1000) { "Timeout" } } println(result) // Ch1: from ch1}Fan-Out & Fan-In Patterns
Section titled “Fan-Out & Fan-In Patterns”Fan-out: one producer, multiple consumers
Section titled “Fan-out: one producer, multiple consumers”This is the worker pool pattern — distribute work across multiple coroutines, each pulling from the same jobs channel.
flowchart LR J["jobs channel"] --> W1["Worker 1"] J --> W2["Worker 2"] J --> W3["Worker 3"] W1 --> R["results channel"] W2 --> R W3 --> R
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) { defer wg.Done() for j := range jobs { time.Sleep(100 * time.Millisecond) results <- fmt.Sprintf("worker %d processed job %d", id, j) }}
func main() { jobs := make(chan int, 100) results := make(chan string, 100) var wg sync.WaitGroup
// Start 3 workers for w := 1; w <= 3; w++ { wg.Add(1) go worker(w, jobs, results, &wg) }
// Send jobs for j := 1; j <= 9; j++ { jobs <- j } close(jobs)
// Collect go func() { wg.Wait(); close(results) }() for r := range results { fmt.Println(r) }}import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun main() = runBlocking { val jobs = Channel<Int>(capacity = 100) val results = Channel<String>(capacity = 100)
// Start 3 workers (fan-out) val workers = (1..3).map { workerId -> launch { for (job in jobs) { // Each worker pulls from the same channel delay(100) results.send("worker $workerId processed job $job") } } }
// Send jobs launch { for (j in 1..9) { jobs.send(j) } jobs.close() }
// Wait for workers, then close results launch { workers.forEach { it.join() } results.close() }
// Collect results for (result in results) { println(result) }}Fan-in: multiple producers, one consumer
Section titled “Fan-in: multiple producers, one consumer”Merge multiple channels into one.
flowchart LR P1["Producer: fast"] --> M["fanIn (merge)"] P2["Producer: slow"] --> M M --> C["Consumer"]
import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun CoroutineScope.producer(name: String, delayMs: Long): ReceiveChannel<String> = produce { var count = 0 while (true) { delay(delayMs) send("$name-${count++}") }}
fun CoroutineScope.fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> = produce { for (channel in channels) { launch { for (msg in channel) { send(msg) } } }}
fun main() = runBlocking { val fast = producer("fast", 200) val slow = producer("slow", 500)
val merged = fanIn(fast, slow)
repeat(10) { println(merged.receive()) }
coroutineContext.cancelChildren() // cancel all producers}Pipeline pattern
Section titled “Pipeline pattern”Chain producers and consumers — each stage processes and forwards to the next.
flowchart LR G["generateNumbers"] -->|"Int"| S["square"] S -->|"Int"| F["filterEven"] F -->|"Int"| O["Consumer (println)"]
func generate(count int) <-chan int { ch := make(chan int) go func() { for i := 1; i <= count; i++ { ch <- i } close(ch) }() return ch}
func square(in <-chan int) <-chan int { ch := make(chan int) go func() { for v := range in { ch <- v * v } close(ch) }() return ch}import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
fun CoroutineScope.generateNumbers(count: Int): ReceiveChannel<Int> = produce { for (i in 1..count) { send(i) }}
fun CoroutineScope.square(input: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (value in input) { send(value * value) }}
fun CoroutineScope.filterEven(input: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (value in input) { if (value % 2 == 0) { send(value) } }}
fun main() = runBlocking { // Pipeline: generate → square → filter val numbers = generateNumbers(10) val squared = square(numbers) val evenSquares = filterEven(squared)
for (value in evenSquares) { println(value) // 4, 16, 36, 64, 100 }}The pattern is nearly identical. Kotlin’s produce builder handles the goroutine
launch and channel creation in one step.
Exception Handling
Section titled “Exception Handling”The default behavior
Section titled “The default behavior”In a regular coroutineScope, an exception in any child cancels all siblings and
propagates to the parent.
import kotlinx.coroutines.*
fun main() = runBlocking { try { coroutineScope { launch { delay(1000) println("Task 1 completed") // Never runs } launch { delay(500) throw RuntimeException("Task 2 failed!") } } } catch (e: RuntimeException) { println("Caught: ${e.message}") // Caught: Task 2 failed! }}launch vs async exception behavior
Section titled “launch vs async exception behavior”import kotlinx.coroutines.*
fun main() = runBlocking { // launch: exception propagates to parent immediately val job = launch { throw RuntimeException("launch exception") // This crashes the parent scope }
// async: exception is stored, thrown on .await() val deferred = async { throw RuntimeException("async exception") } // Exception doesn't propagate until: try { deferred.await() // Exception thrown here } catch (e: RuntimeException) { println("Caught: ${e.message}") }}CoroutineExceptionHandler
Section titled “CoroutineExceptionHandler”For launch-based coroutines (not async), you can install a global handler:
import kotlinx.coroutines.*
fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("Global handler caught: ${exception.message}") }
// Must be on a NEW scope (not coroutineScope which always propagates) val scope = CoroutineScope(SupervisorJob() + handler)
scope.launch { throw RuntimeException("Something broke") }
scope.launch { delay(1000) println("I'm still running!") // This prints — supervised }
delay(1500) // wait for scope's children}supervisorScope for independent children
Section titled “supervisorScope for independent children”import kotlinx.coroutines.*
fun main() = runBlocking { supervisorScope { val child1 = launch { println("Child 1 starting") throw RuntimeException("Child 1 failed") }
val child2 = launch { delay(100) // Give child1 time to fail println("Child 2 is fine!") // This DOES print }
// Wait for both child1.join() // joins without re-throwing child2.join() } println("Supervisor scope done")}Error handling in concurrent code
Section titled “Error handling in concurrent code”// Promise.all: one rejection rejects alltry { await Promise.all([task1(), task2()]);} catch (e) { // First rejection — others may still be running!}
// Promise.allSettled: get all results regardlessconst results = await Promise.allSettled([task1(), task2()]);results.forEach(r => { if (r.status === 'fulfilled') console.log(r.value); if (r.status === 'rejected') console.log(r.reason);});g, ctx := errgroup.WithContext(context.Background())g.Go(func() error { return task1(ctx) })g.Go(func() error { return task2(ctx) })if err := g.Wait(); err != nil { // First error — context cancels others}// Like Promise.all — fail fast (coroutineScope)coroutineScope { val r1 = async { task1() } // if this fails, task2 is cancelled val r2 = async { task2() } process(r1.await(), r2.await())}
// Like Promise.allSettled — collect all results (supervisorScope)supervisorScope { val r1 = async { runCatching { task1() } } val r2 = async { runCatching { task2() } } val results = listOf(r1.await(), r2.await()) results.forEach { result -> result.onSuccess { println("OK: $it") } result.onFailure { println("ERR: ${it.message}") } }}Cancellation & Timeouts
Section titled “Cancellation & Timeouts”Cancelling a coroutine
Section titled “Cancelling a coroutine”import kotlinx.coroutines.*
fun main() = runBlocking { val job = launch { repeat(1000) { i -> println("Working $i ...") delay(200) // <-- cancellation is checked here } }
delay(1000) println("Cancelling...") job.cancel() // request cancellation job.join() // wait for it to finish // job.cancelAndJoin() // shorthand for cancel + join println("Cancelled")}Cooperative cancellation
Section titled “Cooperative cancellation”Cancellation only works at suspension points (delay, yield, withContext, etc.).
CPU-bound loops must check isActive actively.
const controller = new AbortController();const signal = controller.signal;
async function work(signal: AbortSignal) { while (!signal.aborted) { await doWork(); }}controller.abort(); // Cancelctx, cancel := context.WithCancel(context.Background())go func() { for { select { case <-ctx.Done(): // Check cancellation return default: doWork() } }}()cancel() // Request cancellationimport kotlinx.coroutines.*
fun main() = runBlocking { val job = launch(Dispatchers.Default) { var i = 0 while (isActive) { // Check cancellation manually i++ if (i % 1_000_000 == 0) println("Iteration $i") } println("Loop ended at $i") }
delay(100) job.cancelAndJoin() println("Done")}Timeouts
Section titled “Timeouts”import kotlinx.coroutines.*
fun main() = runBlocking { // Throws TimeoutCancellationException if not done in time try { val result = withTimeout(1000) { delay(2000) // Takes too long "done" } } catch (e: TimeoutCancellationException) { println("Timed out: ${e.message}") }
// Returns null on timeout (no exception) val result = withTimeoutOrNull(1000) { delay(500) "done" } println("Result: $result") // Result: done
val timedOut = withTimeoutOrNull(1000) { delay(2000) "done" } println("Result: $timedOut") // Result: null}| Pattern | TypeScript | Go | Kotlin |
|---|---|---|---|
| Timeout | Promise.race([task, timeout]) | context.WithTimeout(ctx, time) | withTimeout(time) { } |
| Timeout returning null | Custom wrapper | Check ctx.Err() | withTimeoutOrNull(time) { } |
Non-cancellable cleanup
Section titled “Non-cancellable cleanup”After cancellation, suspend functions throw CancellationException. Wrap cleanup
that must suspend in withContext(NonCancellable).
import kotlinx.coroutines.*
fun main() = runBlocking { val job = launch { try { repeat(1000) { i -> println("Working $i") delay(200) } } finally { // After cancellation, suspend functions throw CancellationException // Use NonCancellable to run cleanup suspend functions withContext(NonCancellable) { println("Cleaning up...") delay(500) // This works because we're NonCancellable println("Cleanup done") } } }
delay(500) job.cancelAndJoin() println("Done")}Shared Mutable State
Section titled “Shared Mutable State”The problem
Section titled “The problem”import kotlinx.coroutines.*
// BROKEN: race conditionvar counter = 0
fun main() = runBlocking { val jobs = (1..100).map { launch(Dispatchers.Default) { repeat(1000) { counter++ // Not thread-safe! } } } jobs.forEach { it.join() } println("Counter = $counter") // Less than 100000 — race condition!}Solution 1: AtomicInteger
Section titled “Solution 1: AtomicInteger”import kotlinx.coroutines.*import java.util.concurrent.atomic.AtomicInteger
fun main() = runBlocking { val counter = AtomicInteger(0)
val jobs = (1..100).map { launch(Dispatchers.Default) { repeat(1000) { counter.incrementAndGet() } } } jobs.forEach { it.join() } println("Counter = ${counter.get()}") // 100000 — correct!}Solution 2: Mutex (coroutine-friendly lock)
Section titled “Solution 2: Mutex (coroutine-friendly lock)”var mu sync.Mutexvar counter int
for i := 0; i < 100; i++ { go func() { for j := 0; j < 1000; j++ { mu.Lock() counter++ mu.Unlock() } }()}import kotlinx.coroutines.*import kotlinx.coroutines.sync.Muteximport kotlinx.coroutines.sync.withLock
fun main() = runBlocking { val mutex = Mutex() var counter = 0
val jobs = (1..100).map { launch(Dispatchers.Default) { repeat(1000) { mutex.withLock { counter++ } } } } jobs.forEach { it.join() } println("Counter = $counter") // 100000 — correct!}Why Mutex instead of synchronized? Kotlin’s Mutex suspends instead of
blocking the thread. This means other coroutines can run while waiting for the lock:
// BAD: synchronized blocks the threadsynchronized(lock) { // Other coroutines on this thread are blocked}
// GOOD: Mutex suspends the coroutinemutex.withLock { // Other coroutines can run while we wait}Solution 3: Semaphore (limit concurrency)
Section titled “Solution 3: Semaphore (limit concurrency)”import kotlinx.coroutines.*import kotlinx.coroutines.sync.Semaphoreimport kotlinx.coroutines.sync.withPermit
fun main() = runBlocking { val semaphore = Semaphore(permits = 3) // Max 3 concurrent
val jobs = (1..10).map { i -> launch { semaphore.withPermit { println("Task $i running") delay(1000) println("Task $i done") } } } jobs.forEach { it.join() }}Solution 4: Channel as actor (message passing)
Section titled “Solution 4: Channel as actor (message passing)”Instead of protecting shared state with locks, send messages to a single coroutine that owns the state (the actor pattern).
import kotlinx.coroutines.*import kotlinx.coroutines.channels.*
sealed class CounterMsgdata object Increment : CounterMsg()data class GetCount(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor(): Channel<CounterMsg> { val channel = Channel<CounterMsg>() launch { var counter = 0 for (msg in channel) { when (msg) { is Increment -> counter++ is GetCount -> msg.response.complete(counter) } } } return channel}
fun main() = runBlocking { val counter = counterActor()
val jobs = (1..100).map { launch { repeat(1000) { counter.send(Increment) } } } jobs.forEach { it.join() }
val response = CompletableDeferred<Int>() counter.send(GetCount(response)) println("Counter = ${response.await()}") // 100000 — correct! counter.close()}Shared state summary
Section titled “Shared state summary”| Approach | When to Use | Go Equivalent |
|---|---|---|
AtomicInteger / AtomicReference | Simple counters, flags | atomic.AddInt64 |
Mutex | Protecting complex state | sync.Mutex |
Semaphore | Limiting concurrency | Buffered channel as semaphore |
| Channel/Actor | State owned by single coroutine | Goroutine + channel |
| Single-thread confinement | newSingleThreadContext | N/A |
Testing Coroutines
Section titled “Testing Coroutines”The kotlinx-coroutines-test library gives you runTest, which auto-advances
virtual time so delay(5000) completes instantly.
import kotlinx.coroutines.*import kotlinx.coroutines.test.*import kotlin.test.*
class MyServiceTest {
@Test fun `test suspending function`() = runTest { // runTest auto-advances virtual time — delay(1000) completes instantly val result = fetchData() assertEquals("data", result) }
private suspend fun fetchData(): String { delay(5000) // This doesn't actually wait 5 seconds in runTest return "data" }}Controlling virtual time
Section titled “Controlling virtual time”import kotlinx.coroutines.*import kotlinx.coroutines.test.*import kotlin.test.*
class TimerTest {
@Test fun `test delayed behavior`() = runTest { var ticks = 0 val job = launch { while (true) { delay(1000) ticks++ } }
// Time hasn't advanced yet assertEquals(0, ticks)
// Advance time by 3 seconds advanceTimeBy(3000) assertEquals(3, ticks)
// Advance to the next scheduled event advanceTimeBy(1000) assertEquals(4, ticks)
job.cancel() }}Testing with TestDispatcher
Section titled “Testing with TestDispatcher”import kotlinx.coroutines.*import kotlinx.coroutines.test.*import kotlin.test.*
class DispatcherTest {
@Test fun `test with injected dispatcher`() = runTest { val testDispatcher = StandardTestDispatcher(testScheduler)
val service = MyService(testDispatcher) service.startProcessing()
// Nothing has run yet with StandardTestDispatcher advanceUntilIdle() // Run all pending coroutines
assertEquals(true, service.isComplete) }}
class MyService(private val dispatcher: CoroutineDispatcher) { var isComplete = false private set
fun CoroutineScope.startProcessing() { launch(dispatcher) { delay(1000) isComplete = true } }}Testing channels and exceptions
Section titled “Testing channels and exceptions”import kotlinx.coroutines.*import kotlinx.coroutines.channels.*import kotlinx.coroutines.test.*import kotlin.test.*
class ChannelTest {
@Test fun `test producer`() = runTest { val channel = produce { send(1) send(2) send(3) }
assertEquals(1, channel.receive()) assertEquals(2, channel.receive()) assertEquals(3, channel.receive()) }
@Test fun `test exception is thrown`() = runTest { assertFailsWith<IllegalArgumentException> { coroutineScope { launch { throw IllegalArgumentException("bad input") } } } }
@Test fun `test supervisor handles failure`() = runTest { var task2Completed = false
supervisorScope { launch { throw RuntimeException("task 1 failed") } launch { delay(100) task2Completed = true } }
assertTrue(task2Completed) }}Concurrency Patterns Cheat Sheet
Section titled “Concurrency Patterns Cheat Sheet”Concurrent map (like Promise.all / WaitGroup)
Section titled “Concurrent map (like Promise.all / WaitGroup)”import kotlinx.coroutines.*
suspend fun <T, R> Iterable<T>.concurrentMap(transform: suspend (T) -> R): List<R> = coroutineScope { map { async { transform(it) } }.awaitAll() }
fun main() = runBlocking { val urls = listOf("url1", "url2", "url3", "url4", "url5")
val results = urls.concurrentMap { url -> delay(500) // simulate HTTP fetch "Response from $url" }
println(results)}Retry with exponential backoff
Section titled “Retry with exponential backoff”import kotlinx.coroutines.*
suspend fun <T> retry( times: Int = 3, initialDelayMs: Long = 100, maxDelayMs: Long = 5000, factor: Double = 2.0, block: suspend () -> T,): T { var currentDelay = initialDelayMs repeat(times - 1) { try { return block() } catch (e: Exception) { println("Retry after ${currentDelay}ms: ${e.message}") } delay(currentDelay) currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelayMs) } return block() // Last attempt — let exception propagate}
fun main() = runBlocking { var attempt = 0 val result = retry(times = 3) { attempt++ if (attempt < 3) throw RuntimeException("Attempt $attempt failed") "Success on attempt $attempt" } println(result) // Success on attempt 3}Rate limiter
Section titled “Rate limiter”import kotlinx.coroutines.*import kotlinx.coroutines.sync.Semaphoreimport kotlinx.coroutines.sync.withPermit
class RateLimiter(private val permits: Int, private val periodMs: Long) { private val semaphore = Semaphore(permits)
suspend fun <T> execute(block: suspend () -> T): T { semaphore.withPermit { try { return block() } finally { // Release after period to maintain rate CoroutineScope(Dispatchers.Default).launch { delay(periodMs) } } } }}First result wins (like Promise.race)
Section titled “First result wins (like Promise.race)”import kotlinx.coroutines.*import kotlinx.coroutines.selects.*
suspend fun <T> raceOf(vararg blocks: suspend () -> T): T = coroutineScope { select { for (block in blocks) { async { block() }.onAwait { it } } } // Note: remaining coroutines are cancelled via structured concurrency // when coroutineScope completes}
fun main() = runBlocking { val result = raceOf( { delay(300); "slow" }, { delay(100); "fast" }, { delay(200); "medium" }, ) println(result) // fast}Full comparison table
Section titled “Full comparison table”| Pattern | TypeScript | Go | Kotlin |
|---|---|---|---|
| Run concurrently | Promise.all([...]) | go func(); wg.Wait() | coroutineScope { async {} } |
| First wins | Promise.race([...]) | select {} on channels | select { onAwait {} } |
| Worker pool | Manual with queues | goroutines + buffered channel | Channel + N launch |
| Timeout | Promise.race([p, timeout]) | context.WithTimeout | withTimeout(ms) {} |
| Retry | for loop with try/catch | for loop with err != nil | retry {} (custom) |
| Mutex | N/A (single-threaded) | sync.Mutex | Mutex (suspending) |
| Rate limit | Custom / library | time.Ticker + channel | Semaphore + delay |
| Event broadcast | EventEmitter | Fan-out from channel | SharedFlow (see Module 06) |
| Pipeline | .pipe() streams | Chained channels | Chained produce or Flow |
| Cancel | AbortController | context.Cancel() | job.cancel() / scope cancel |
Practice
Section titled “Practice”Put structured concurrency and channels to work in two hands-on builds.