Skip to content

Coroutines & Structured Concurrency

You already know concurrent programming — async/await in TypeScript, goroutines in Go. This module shows you the Kotlin way: coroutines mapped to the primitives you already use, plus Kotlin’s standout feature, structured concurrency, which gives you parent-child lifecycle management you have to build by hand elsewhere.

Add coroutine dependencies to your build.gradle.kts:

build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
testImplementation(kotlin("test"))
}
application {
mainClass.set("MainKt")
}
tasks.test {
useJUnitPlatform()
}

You already use concurrency daily. Here’s the mental model:

ConceptTypeScriptGoKotlin
Lightweight taskasync function + event loopgoroutinecoroutine
Concurrency modelSingle-threaded event loopM:N green threads (goroutines on OS threads)Suspendable computations on thread pools
Blocking avoidanceNon-blocking by nature (everything is async)Goroutines yield at I/O automaticallysuspend functions yield cooperatively
ParallelismWorker threads / Promise.allgoroutines + GOMAXPROCSDispatchers control thread pools
Structured lifecycleNo built-in hierarchyNo built-in hierarchy (errgroup helps)Built-in: parent-child scope hierarchy
import kotlinx.coroutines.*
fun main() = runBlocking {
val jobs = List(100_000) {
launch {
delay(1000)
}
}
println("Launched ${jobs.size} coroutines")
jobs.forEach { it.join() }
println("All done")
}

100,000 coroutines use a few MB of memory. Try that with OS threads.

Let’s do the same thing in all three languages: run two tasks concurrently, collect results.

async function fetchUser(): Promise<string> {
await new Promise(r => setTimeout(r, 1000));
return "Alice";
}
async function fetchOrder(): Promise<string> {
await new Promise(r => setTimeout(r, 1000));
return "Order-42";
}
async function main() {
const start = Date.now();
const [user, order] = await Promise.all([fetchUser(), fetchOrder()]);
console.log(`${user}, ${order} in ${Date.now() - start}ms`);
// Alice, Order-42 in ~1000ms
}
main();

Key Differences:

  • TS: Promise.all runs tasks concurrently on the single event loop thread.
  • Go: goroutines + WaitGroup for synchronization; result passed via pointer.
  • Kotlin: async returns a Deferred<T> (like a typed Promise); .await() gets the result.
  • Kotlin’s delay() is non-blocking — it suspends the coroutine, freeing the thread.

This is Kotlin’s biggest advantage over Go and TypeScript for concurrent code. In Go, unsupervised goroutines get orphaned; in TS, floating promises become unhandled rejections. Kotlin makes the parent scope responsible for its children.

function handleRequest() {
fetchFromDB(); // Floating promise — no one awaits it
callExternalAPI(); // If this throws, it's an unhandled rejection
sendNotification(); // Fire and forget — hope it works
}

Key Differences:

  • TS / Go: leaked work is on you to track and cancel — every Go developer has dealt with leaked goroutines.
  • Kotlin: coroutineScope won’t return until every child finishes, and failure or cancellation propagates automatically.

Rules of structured concurrency:

  1. Every coroutine has a parent scope.
  2. A parent scope does not complete until all children complete.
  3. If a parent is cancelled, all children are cancelled.
  4. If a child fails (uncaught exception), the parent is cancelled (and thus all siblings).

A scope forms a tree. Cancellation and failure flow along its edges:

Coroutine scope hierarchy
Rendering diagram…
  • Cancel Job A → A1 and A2 are cancelled; B is unaffected.
  • Cancel root → everything is cancelled.
  • A1 throws → A2 is cancelled, then A is cancelled (propagates up).

coroutineScope fails fast: one child’s failure cancels its siblings. supervisorScope isolates children: one failure leaves the others running.

import kotlinx.coroutines.*
// coroutineScope: one child fails → all siblings cancelled
suspend fun strictScope() = coroutineScope {
launch {
delay(1000)
println("Task 1 done") // Never prints — cancelled by Task 2's failure
}
launch {
delay(500)
throw RuntimeException("Task 2 failed")
}
}
// supervisorScope: one child fails → siblings continue
suspend fun lenientScope() = supervisorScope {
launch {
delay(1000)
println("Task 1 done") // DOES print — not affected by Task 2
}
launch {
delay(500)
throw RuntimeException("Task 2 failed")
// Exception is NOT propagated to parent
}
}
fun main() = runBlocking {
try {
strictScope()
} catch (e: RuntimeException) {
println("Caught from strict: ${e.message}")
}
println("---")
lenientScope()
println("Supervisor scope completed")
}

Coroutine Builders: launch, async, runBlocking

Section titled “Coroutine Builders: launch, async, runBlocking”
BuilderReturnsUse ForBlocks Thread?
launchJobFire-and-forget work (side effects)No
asyncDeferred<T>Concurrent computation returning a valueNo
runBlockingTBridge from blocking to suspend worldYes
import kotlinx.coroutines.*
fun main() = runBlocking {
val job: Job = launch {
println("Working on thread: ${Thread.currentThread().name}")
delay(1000)
println("Done!")
}
println("Launched")
job.join() // Optional: wait for completion
// Without join(), runBlocking still waits (structured concurrency)
}
PatternTypeScriptGoKotlin
Fire and forgetvoid function call (dangerous)go func()launch { }
Wait for completionCan’t easily (no handle)wg.Wait()job.join()
CancelNo built-in waycontext.WithCanceljob.cancel()
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
delay(1000)
42
}
println("Computing...")
val result: Int = deferred.await()
println("Result: $result")
}

Deferred<T> is Kotlin’s Promise<T> (TS) or the result you’d get from a channel (Go):

PatternTypeScriptGoKotlin
Start concurrent computationconst p = fetchData()ch := make(chan int); go func() { ch <- compute() }()val d = async { compute() }
Get resultawait presult := <-chd.await()
Multiple concurrentPromise.all([p1, p2])Read from multiple channelsawaitAll(d1, d2)

Launch independent fetches concurrently, then combine — total time is the slowest branch, not the sum.

import kotlinx.coroutines.*
data class UserProfile(val name: String, val orders: List<String>, val prefs: Map<String, String>)
suspend fun fetchName(): String { delay(300); return "Alice" }
suspend fun fetchOrders(): List<String> { delay(500); return listOf("ORD-1", "ORD-2") }
suspend fun fetchPrefs(): Map<String, String> { delay(200); return mapOf("theme" to "dark") }
suspend fun getUserProfile(): UserProfile = coroutineScope {
val name = async { fetchName() }
val orders = async { fetchOrders() }
val prefs = async { fetchPrefs() }
// All three run concurrently — total time ≈ 500ms (max of the three)
UserProfile(name.await(), orders.await(), prefs.await())
}
fun main() = runBlocking {
val start = System.currentTimeMillis()
val profile = getUserProfile()
println("$profile in ${System.currentTimeMillis() - start}ms")
// UserProfile(name=Alice, orders=[ORD-1, ORD-2], prefs={theme=dark}) in ~500ms
}

runBlocking — bridge to the suspend world

Section titled “runBlocking — bridge to the suspend world”

Use runBlocking only at the edges: main(), tests, and rare bridges from blocking code to coroutines.

import kotlinx.coroutines.*
fun main() = runBlocking { // <-- Blocks the main thread until all children complete
launch { delay(200); println("Task 1") }
launch { delay(100); println("Task 2") }
println("Started")
}
// Output:
// Started
// Task 2
// Task 1

A dispatcher decides which thread (or pool) a coroutine runs on. Unlike Go, where the runtime schedules goroutines for you, Kotlin lets you choose explicitly.

DispatcherThread PoolUse ForGo EquivalentTS Equivalent
Dispatchers.DefaultShared pool (CPU cores)CPU-bound work (parsing, sorting, computation)GOMAXPROCS worker threadsWorker threads
Dispatchers.IOElastic pool (up to 64+ threads)I/O-bound work (network, disk, DB)goroutine on I/Oasync/await (event loop)
Dispatchers.UnconfinedNo specific threadTesting, rare edge casesN/AN/A
Dispatchers.MainMain/UI threadAndroid/Desktop UI only — skip for backendN/AN/A
import kotlinx.coroutines.*
fun main() = runBlocking {
launch(Dispatchers.Default) {
// CPU-intensive work
val result = (1..1_000_000).sumOf { it.toLong() }
println("CPU work on: ${Thread.currentThread().name}") // DefaultDispatcher-worker-1
}
launch(Dispatchers.IO) {
// I/O work (file, network, database)
println("IO work on: ${Thread.currentThread().name}") // DefaultDispatcher-worker-2 (IO shares Default's pool)
}
launch(Dispatchers.Unconfined) {
println("Unconfined before delay: ${Thread.currentThread().name}") // main
delay(100)
println("Unconfined after delay: ${Thread.currentThread().name}") // some worker thread
}
}
import kotlinx.coroutines.*
suspend fun readFile(path: String): String = withContext(Dispatchers.IO) {
// Switch to IO dispatcher for file read
java.io.File(path).readText()
}
suspend fun parseJson(json: String): Map<String, Any> = withContext(Dispatchers.Default) {
// Switch to Default dispatcher for CPU-bound parsing
// (simplified — use kotlinx.serialization in real code)
mapOf("parsed" to json)
}
fun main() = runBlocking {
val content = readFile("/etc/hostname")
val parsed = parseJson(content)
println(parsed)
}

limitedParallelism caps how many coroutines run at once — the Kotlin equivalent of a Go worker pool built from a buffered channel.

// Go equivalent: buffered channel as semaphore
sem := make(chan struct{}, 2) // limit to 2 concurrent
for i := 0; i < 10; i++ {
go func(i int) {
sem <- struct{}{} // acquire
defer func() { <-sem }() // release
doWork(i)
}(i)
}

suspend is Kotlin’s async keyword (TS) — it marks a function that can pause and resume without blocking a thread.

// This function can suspend (yield the thread) at delay() and fetchFromNetwork()
suspend fun processData(): String {
delay(100) // suspends here — thread is free to do other work
val data = fetchFromNetwork() // suspends here too
return data.uppercase()
}
suspend fun fetchFromNetwork(): String {
delay(500) // simulate network call
return "response-data"
}

Rules:

  • suspend functions can call other suspend functions.
  • suspend functions can only be called from coroutines or other suspend functions.
  • Regular functions cannot call suspend functions directly.
// COMPILE ERROR:
fun regularFunction() {
delay(100) // Error: suspend function 'delay' should be called only from a coroutine
}
// OK:
suspend fun suspendFunction() {
delay(100) // Fine — we're in a suspend context
}
// Must mark with 'async', returns Promise<T>
async function getData(): Promise<string> {
const response = await fetch("https://api.example.com/data");
return await response.text();
}

Key difference: Kotlin suspend functions return the actual type (String), not a wrapper (Promise<String>). The suspension is invisible to the caller — it looks like synchronous code.

Channels are the primary way to communicate between coroutines — just like Go channels.

ch := make(chan string, 10) // buffered channel, capacity 10
ch <- "hello" // send
msg := <-ch // receive
close(ch) // close
TypeKotlinGo EquivalentBehavior
RendezvousChannel<T>() or Channel<T>(0)make(chan T)Sender suspends until receiver is ready
BufferedChannel<T>(10)make(chan T, 10)Sender suspends when buffer is full
UnlimitedChannel<T>(Channel.UNLIMITED)N/ANever suspends sender (unbounded buffer)
ConflatedChannel<T>(Channel.CONFLATED)N/AKeeps only latest value, drops old
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func main() {
ch := make(chan int)
go producer(ch)
for val := range ch { // iterate until closed
fmt.Println(val)
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
channel.send(i)
}
channel.close()
}
// Method 1: for loop (like Go's range)
for (value in channel) {
println(value)
}
// Method 2: consumeEach (auto-closes on completion or exception)
val channel2 = Channel<Int>()
launch {
for (i in 1..5) channel2.send(i)
channel2.close()
}
channel2.consumeEach { println(it) }
}
select {
case msg := <-ch1:
fmt.Println("From ch1:", msg)
case msg := <-ch2:
fmt.Println("From ch2:", msg)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}

This is the worker pool pattern — distribute work across multiple coroutines, each pulling from the same jobs channel.

Fan-out worker pool
Rendering diagram…
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
time.Sleep(100 * time.Millisecond)
results <- fmt.Sprintf("worker %d processed job %d", id, j)
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan string, 100)
var wg sync.WaitGroup
// Start 3 workers
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// Collect
go func() { wg.Wait(); close(results) }()
for r := range results {
fmt.Println(r)
}
}

Merge multiple channels into one.

Fan-in merge
Rendering diagram…
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.producer(name: String, delayMs: Long): ReceiveChannel<String> = produce {
var count = 0
while (true) {
delay(delayMs)
send("$name-${count++}")
}
}
fun CoroutineScope.fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> = produce {
for (channel in channels) {
launch {
for (msg in channel) {
send(msg)
}
}
}
}
fun main() = runBlocking {
val fast = producer("fast", 200)
val slow = producer("slow", 500)
val merged = fanIn(fast, slow)
repeat(10) {
println(merged.receive())
}
coroutineContext.cancelChildren() // cancel all producers
}

Chain producers and consumers — each stage processes and forwards to the next.

Channel pipeline
Rendering diagram…
func generate(count int) <-chan int {
ch := make(chan int)
go func() {
for i := 1; i <= count; i++ {
ch <- i
}
close(ch)
}()
return ch
}
func square(in <-chan int) <-chan int {
ch := make(chan int)
go func() {
for v := range in {
ch <- v * v
}
close(ch)
}()
return ch
}

The pattern is nearly identical. Kotlin’s produce builder handles the goroutine launch and channel creation in one step.

In a regular coroutineScope, an exception in any child cancels all siblings and propagates to the parent.

import kotlinx.coroutines.*
fun main() = runBlocking {
try {
coroutineScope {
launch {
delay(1000)
println("Task 1 completed") // Never runs
}
launch {
delay(500)
throw RuntimeException("Task 2 failed!")
}
}
} catch (e: RuntimeException) {
println("Caught: ${e.message}") // Caught: Task 2 failed!
}
}
import kotlinx.coroutines.*
fun main() = runBlocking {
// launch: exception propagates to parent immediately
val job = launch {
throw RuntimeException("launch exception")
// This crashes the parent scope
}
// async: exception is stored, thrown on .await()
val deferred = async {
throw RuntimeException("async exception")
}
// Exception doesn't propagate until:
try {
deferred.await() // Exception thrown here
} catch (e: RuntimeException) {
println("Caught: ${e.message}")
}
}

For launch-based coroutines (not async), you can install a global handler:

import kotlinx.coroutines.*
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Global handler caught: ${exception.message}")
}
// Must be on a NEW scope (not coroutineScope which always propagates)
val scope = CoroutineScope(SupervisorJob() + handler)
scope.launch {
throw RuntimeException("Something broke")
}
scope.launch {
delay(1000)
println("I'm still running!") // This prints — supervised
}
delay(1500) // wait for scope's children
}
import kotlinx.coroutines.*
fun main() = runBlocking {
supervisorScope {
val child1 = launch {
println("Child 1 starting")
throw RuntimeException("Child 1 failed")
}
val child2 = launch {
delay(100) // Give child1 time to fail
println("Child 2 is fine!") // This DOES print
}
// Wait for both
child1.join() // joins without re-throwing
child2.join()
}
println("Supervisor scope done")
}
// Promise.all: one rejection rejects all
try {
await Promise.all([task1(), task2()]);
} catch (e) {
// First rejection — others may still be running!
}
// Promise.allSettled: get all results regardless
const results = await Promise.allSettled([task1(), task2()]);
results.forEach(r => {
if (r.status === 'fulfilled') console.log(r.value);
if (r.status === 'rejected') console.log(r.reason);
});
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("Working $i ...")
delay(200) // <-- cancellation is checked here
}
}
delay(1000)
println("Cancelling...")
job.cancel() // request cancellation
job.join() // wait for it to finish
// job.cancelAndJoin() // shorthand for cancel + join
println("Cancelled")
}

Cancellation only works at suspension points (delay, yield, withContext, etc.). CPU-bound loops must check isActive actively.

const controller = new AbortController();
const signal = controller.signal;
async function work(signal: AbortSignal) {
while (!signal.aborted) {
await doWork();
}
}
controller.abort(); // Cancel
import kotlinx.coroutines.*
fun main() = runBlocking {
// Throws TimeoutCancellationException if not done in time
try {
val result = withTimeout(1000) {
delay(2000) // Takes too long
"done"
}
} catch (e: TimeoutCancellationException) {
println("Timed out: ${e.message}")
}
// Returns null on timeout (no exception)
val result = withTimeoutOrNull(1000) {
delay(500)
"done"
}
println("Result: $result") // Result: done
val timedOut = withTimeoutOrNull(1000) {
delay(2000)
"done"
}
println("Result: $timedOut") // Result: null
}
PatternTypeScriptGoKotlin
TimeoutPromise.race([task, timeout])context.WithTimeout(ctx, time)withTimeout(time) { }
Timeout returning nullCustom wrapperCheck ctx.Err()withTimeoutOrNull(time) { }

After cancellation, suspend functions throw CancellationException. Wrap cleanup that must suspend in withContext(NonCancellable).

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("Working $i")
delay(200)
}
} finally {
// After cancellation, suspend functions throw CancellationException
// Use NonCancellable to run cleanup suspend functions
withContext(NonCancellable) {
println("Cleaning up...")
delay(500) // This works because we're NonCancellable
println("Cleanup done")
}
}
}
delay(500)
job.cancelAndJoin()
println("Done")
}
import kotlinx.coroutines.*
// BROKEN: race condition
var counter = 0
fun main() = runBlocking {
val jobs = (1..100).map {
launch(Dispatchers.Default) {
repeat(1000) {
counter++ // Not thread-safe!
}
}
}
jobs.forEach { it.join() }
println("Counter = $counter") // Less than 100000 — race condition!
}
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
fun main() = runBlocking {
val counter = AtomicInteger(0)
val jobs = (1..100).map {
launch(Dispatchers.Default) {
repeat(1000) {
counter.incrementAndGet()
}
}
}
jobs.forEach { it.join() }
println("Counter = ${counter.get()}") // 100000 — correct!
}

Solution 2: Mutex (coroutine-friendly lock)

Section titled “Solution 2: Mutex (coroutine-friendly lock)”
var mu sync.Mutex
var counter int
for i := 0; i < 100; i++ {
go func() {
for j := 0; j < 1000; j++ {
mu.Lock()
counter++
mu.Unlock()
}
}()
}

Why Mutex instead of synchronized? Kotlin’s Mutex suspends instead of blocking the thread. This means other coroutines can run while waiting for the lock:

// BAD: synchronized blocks the thread
synchronized(lock) {
// Other coroutines on this thread are blocked
}
// GOOD: Mutex suspends the coroutine
mutex.withLock {
// Other coroutines can run while we wait
}
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
fun main() = runBlocking {
val semaphore = Semaphore(permits = 3) // Max 3 concurrent
val jobs = (1..10).map { i ->
launch {
semaphore.withPermit {
println("Task $i running")
delay(1000)
println("Task $i done")
}
}
}
jobs.forEach { it.join() }
}

Solution 4: Channel as actor (message passing)

Section titled “Solution 4: Channel as actor (message passing)”

Instead of protecting shared state with locks, send messages to a single coroutine that owns the state (the actor pattern).

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
sealed class CounterMsg
data object Increment : CounterMsg()
data class GetCount(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor(): Channel<CounterMsg> {
val channel = Channel<CounterMsg>()
launch {
var counter = 0
for (msg in channel) {
when (msg) {
is Increment -> counter++
is GetCount -> msg.response.complete(counter)
}
}
}
return channel
}
fun main() = runBlocking {
val counter = counterActor()
val jobs = (1..100).map {
launch {
repeat(1000) {
counter.send(Increment)
}
}
}
jobs.forEach { it.join() }
val response = CompletableDeferred<Int>()
counter.send(GetCount(response))
println("Counter = ${response.await()}") // 100000 — correct!
counter.close()
}
ApproachWhen to UseGo Equivalent
AtomicInteger / AtomicReferenceSimple counters, flagsatomic.AddInt64
MutexProtecting complex statesync.Mutex
SemaphoreLimiting concurrencyBuffered channel as semaphore
Channel/ActorState owned by single coroutineGoroutine + channel
Single-thread confinementnewSingleThreadContextN/A

The kotlinx-coroutines-test library gives you runTest, which auto-advances virtual time so delay(5000) completes instantly.

import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class MyServiceTest {
@Test
fun `test suspending function`() = runTest {
// runTest auto-advances virtual time — delay(1000) completes instantly
val result = fetchData()
assertEquals("data", result)
}
private suspend fun fetchData(): String {
delay(5000) // This doesn't actually wait 5 seconds in runTest
return "data"
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class TimerTest {
@Test
fun `test delayed behavior`() = runTest {
var ticks = 0
val job = launch {
while (true) {
delay(1000)
ticks++
}
}
// Time hasn't advanced yet
assertEquals(0, ticks)
// Advance time by 3 seconds
advanceTimeBy(3000)
assertEquals(3, ticks)
// Advance to the next scheduled event
advanceTimeBy(1000)
assertEquals(4, ticks)
job.cancel()
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class DispatcherTest {
@Test
fun `test with injected dispatcher`() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val service = MyService(testDispatcher)
service.startProcessing()
// Nothing has run yet with StandardTestDispatcher
advanceUntilIdle() // Run all pending coroutines
assertEquals(true, service.isComplete)
}
}
class MyService(private val dispatcher: CoroutineDispatcher) {
var isComplete = false
private set
fun CoroutineScope.startProcessing() {
launch(dispatcher) {
delay(1000)
isComplete = true
}
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class ChannelTest {
@Test
fun `test producer`() = runTest {
val channel = produce {
send(1)
send(2)
send(3)
}
assertEquals(1, channel.receive())
assertEquals(2, channel.receive())
assertEquals(3, channel.receive())
}
@Test
fun `test exception is thrown`() = runTest {
assertFailsWith<IllegalArgumentException> {
coroutineScope {
launch {
throw IllegalArgumentException("bad input")
}
}
}
}
@Test
fun `test supervisor handles failure`() = runTest {
var task2Completed = false
supervisorScope {
launch {
throw RuntimeException("task 1 failed")
}
launch {
delay(100)
task2Completed = true
}
}
assertTrue(task2Completed)
}
}

Concurrent map (like Promise.all / WaitGroup)

Section titled “Concurrent map (like Promise.all / WaitGroup)”
import kotlinx.coroutines.*
suspend fun <T, R> Iterable<T>.concurrentMap(transform: suspend (T) -> R): List<R> =
coroutineScope {
map { async { transform(it) } }.awaitAll()
}
fun main() = runBlocking {
val urls = listOf("url1", "url2", "url3", "url4", "url5")
val results = urls.concurrentMap { url ->
delay(500) // simulate HTTP fetch
"Response from $url"
}
println(results)
}
import kotlinx.coroutines.*
suspend fun <T> retry(
times: Int = 3,
initialDelayMs: Long = 100,
maxDelayMs: Long = 5000,
factor: Double = 2.0,
block: suspend () -> T,
): T {
var currentDelay = initialDelayMs
repeat(times - 1) {
try {
return block()
} catch (e: Exception) {
println("Retry after ${currentDelay}ms: ${e.message}")
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelayMs)
}
return block() // Last attempt — let exception propagate
}
fun main() = runBlocking {
var attempt = 0
val result = retry(times = 3) {
attempt++
if (attempt < 3) throw RuntimeException("Attempt $attempt failed")
"Success on attempt $attempt"
}
println(result) // Success on attempt 3
}
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
class RateLimiter(private val permits: Int, private val periodMs: Long) {
private val semaphore = Semaphore(permits)
suspend fun <T> execute(block: suspend () -> T): T {
semaphore.withPermit {
try {
return block()
} finally {
// Release after period to maintain rate
CoroutineScope(Dispatchers.Default).launch {
delay(periodMs)
}
}
}
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
suspend fun <T> raceOf(vararg blocks: suspend () -> T): T = coroutineScope {
select {
for (block in blocks) {
async { block() }.onAwait { it }
}
}
// Note: remaining coroutines are cancelled via structured concurrency
// when coroutineScope completes
}
fun main() = runBlocking {
val result = raceOf(
{ delay(300); "slow" },
{ delay(100); "fast" },
{ delay(200); "medium" },
)
println(result) // fast
}
PatternTypeScriptGoKotlin
Run concurrentlyPromise.all([...])go func(); wg.Wait()coroutineScope { async {} }
First winsPromise.race([...])select {} on channelsselect { onAwait {} }
Worker poolManual with queuesgoroutines + buffered channelChannel + N launch
TimeoutPromise.race([p, timeout])context.WithTimeoutwithTimeout(ms) {}
Retryfor loop with try/catchfor loop with err != nilretry {} (custom)
MutexN/A (single-threaded)sync.MutexMutex (suspending)
Rate limitCustom / librarytime.Ticker + channelSemaphore + delay
Event broadcastEventEmitterFan-out from channelSharedFlow (see Module 06)
Pipeline.pipe() streamsChained channelsChained produce or Flow
CancelAbortControllercontext.Cancel()job.cancel() / scope cancel

Put structured concurrency and channels to work in two hands-on builds.