Channel Pipeline
Build a multi-stage producer-processor-consumer pipeline using coroutine channels — the Kotlin equivalent of a Go channel pipeline. Each stage is its own coroutine that reads from an input channel and emits onto a new one, so the whole thing streams concurrently and shuts down cleanly when the source dries up.
If you’ve wired up Go pipelines with chan int and a goroutine per stage, this is
the same shape. Kotlin’s produce { } builder is the analogue of “spawn a
goroutine that owns and closes an output channel.”
What you’ll build
Section titled “What you’ll build”A five-stage pipeline that flows numbers through transformation, filtering, and formatting before a consumer collects them:
- Producer — generates numbers
1..Nand emits them on aReceiveChannel<Int>. - Stage 1 — Square — reads from the producer, emits
value * value. - Stage 2 — Filter — keeps only values above a configurable threshold.
- Stage 3 — Format — converts each number to a formatted
String(e.g."Result: 144"). - Consumer — collects every formatted string and prints them.
Each stage is a separate coroutine built with produce. On top of the core flow,
the solution also:
- Prints metrics — items handled per stage, plus end-to-end throughput (items/second).
- Supports a configurable number of workers per stage (fan-out).
- Demonstrates graceful shutdown — when the producer finishes, each stage drains its input and completes, closing its own output in turn.
flowchart TB P["Producer (1..N)"] -->|"ReceiveChannel<Int>"| S["Square (x workers)"] S -->|"ReceiveChannel<Long>"| F["Filter (> threshold)"] F -->|"ReceiveChannel<Long>"| FM["Format"] FM -->|"ReceiveChannel<String>"| C["Consumer (collect + print)"]
The worked solution
Section titled “The worked solution”A single-module Gradle project — one Main.kt holds every stage plus the
orchestration in main.
Directorypipeline/
- build.gradle.kts coroutines dep + application plugin
- settings.gradle.kts project name
Directorysrc/main/kotlin/
- Main.kt all five stages plus the runBlocking driver
build.gradle.kts
Section titled “build.gradle.kts”The only runtime dependency is kotlinx-coroutines-core, which provides
produce, Channel, and the structured-concurrency machinery. The application
plugin gives you ./gradlew run against the top-level main (compiled to the
synthetic MainKt class).
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}rootProject.name = "pipeline"The producer
Section titled “The producer”produce { } is the key builder. It’s an extension on CoroutineScope that
launches a coroutine, hands you a ProducerScope<T> (so you can send), and
returns a ReceiveChannel<T> for downstream consumers. Crucially, the channel
is closed automatically when the block finishes — that’s the graceful-shutdown
signal that cascades down the pipeline. No manual close(), no Go-style
defer close(ch).
The capacity = 10 argument gives each channel a buffered slot count, so a fast
producer can run ahead of a slow consumer up to 10 items before it suspends —
that’s backpressure for free.
fun CoroutineScope.produceNumbers(count: Int): ReceiveChannel<Int> = produce(capacity = 10) { for (i in 1..count) { send(i) } println("[Producer] Sent $count numbers, closing.")}Stage 1 — Square (with fan-out)
Section titled “Stage 1 — Square (with fan-out)”This stage takes a workers parameter and demonstrates fan-out: it launches
workers child coroutines that all read from the same input channel with
for (value in input). The channel hands each item to exactly one worker, so they
naturally load-balance. An AtomicInteger tallies items across the workers safely.
The stage sends its squared Long onto the output channel as work completes.
Once every worker job has joined (the input channel is exhausted and all loops
have ended), the produce block returns — closing the output channel and signaling
the next stage.
fun CoroutineScope.squareStage( input: ReceiveChannel<Int>, workers: Int = 1,): ReceiveChannel<Long> = produce(capacity = 10) { val processed = AtomicInteger(0) val jobs = (1..workers).map { workerId -> launch { for (value in input) { val squared = value.toLong() * value.toLong() send(squared) processed.incrementAndGet() } } } jobs.forEach { it.join() } println("[Square] Processed ${processed.get()} items with $workers worker(s).")}Stage 2 — Filter
Section titled “Stage 2 — Filter”A single-coroutine stage: read each Long, forward only the ones above the
threshold, and count both the passed and dropped items for the metrics line. Same
auto-close behavior — when input is drained, the block ends and the output
channel closes.
fun CoroutineScope.filterStage( input: ReceiveChannel<Long>, threshold: Long,): ReceiveChannel<Long> = produce(capacity = 10) { var passed = 0 var dropped = 0 for (value in input) { if (value > threshold) { send(value) passed++ } else { dropped++ } } println("[Filter] Passed $passed, dropped $dropped (threshold > $threshold).")}Stage 3 — Format
Section titled “Stage 3 — Format”The simplest stage: turn each Long into a display String via a string template
("Result: $value"). It changes the channel’s element type from Long to
String, which is why the next channel is a ReceiveChannel<String>.
fun CoroutineScope.formatStage( input: ReceiveChannel<Long>,): ReceiveChannel<String> = produce(capacity = 10) { var count = 0 for (value in input) { send("Result: $value") count++ } println("[Format] Formatted $count items.")}The consumer
Section titled “The consumer”The terminal stage isn’t a produce builder — it doesn’t emit anything — so it’s
an ordinary suspend fun that loops the final channel into a list. When the
formatting channel closes, the loop ends and the collected results are returned.
suspend fun consume(input: ReceiveChannel<String>): List<String> { val results = mutableListOf<String>() for (value in input) { results.add(value) } return results}Wiring it together in main
Section titled “Wiring it together in main”runBlocking provides the CoroutineScope that every produce extension hangs
off, and it won’t return until all the child coroutines (every stage) have
finished — that’s structured concurrency doing the lifecycle management for you.
Building the pipeline is just chaining each stage’s output into the next stage’s
input. measureTimeMillis wraps the whole run so we can report throughput.
fun main() = runBlocking { val count = 100 val threshold = 50L val workerCount = 3
println("Pipeline: produce(1..$count) -> square(x$workerCount workers) -> filter(>$threshold) -> format -> consume") println("=".repeat(80))
val results: List<String> val elapsed = measureTimeMillis { // Build pipeline: each stage reads from the previous and returns a new channel val numbers = produceNumbers(count) val squared = squareStage(numbers, workers = workerCount) val filtered = filterStage(squared, threshold) val formatted = formatStage(filtered) results = consume(formatted) }
// Print results println("=".repeat(80)) println("Results (${results.size} items):") results.take(10).forEach { println(" $it") } if (results.size > 10) { println(" ... and ${results.size - 10} more") }
// Metrics println("=".repeat(80)) val throughput = if (elapsed > 0) results.size * 1000.0 / elapsed else 0.0 println("Total time: ${elapsed}ms") println("Throughput: ${"%.1f".format(throughput)} items/sec") println("Pipeline complete.")}The full file also imports the coroutine machinery at the top:
import kotlinx.coroutines.*import kotlinx.coroutines.channels.*import java.util.concurrent.atomic.AtomicIntegerimport kotlin.system.measureTimeMillisRun it
Section titled “Run it”-
Run the pipeline (the
applicationplugin runsmaindirectly):Terminal window ./gradlew run -
Run the tests:
Terminal window ./gradlew test
Each stage prints its own metrics line as it drains, then main prints the first
10 results, total time, and throughput — so you can watch the stages complete in
order as the channels close from the producer downward.