Skip to content

Channel Pipeline

Build a multi-stage producer-processor-consumer pipeline using coroutine channels — the Kotlin equivalent of a Go channel pipeline. Each stage is its own coroutine that reads from an input channel and emits onto a new one, so the whole thing streams concurrently and shuts down cleanly when the source dries up.

If you’ve wired up Go pipelines with chan int and a goroutine per stage, this is the same shape. Kotlin’s produce { } builder is the analogue of “spawn a goroutine that owns and closes an output channel.”

A five-stage pipeline that flows numbers through transformation, filtering, and formatting before a consumer collects them:

  1. Producer — generates numbers 1..N and emits them on a ReceiveChannel<Int>.
  2. Stage 1 — Square — reads from the producer, emits value * value.
  3. Stage 2 — Filter — keeps only values above a configurable threshold.
  4. Stage 3 — Format — converts each number to a formatted String (e.g. "Result: 144").
  5. Consumer — collects every formatted string and prints them.

Each stage is a separate coroutine built with produce. On top of the core flow, the solution also:

  • Prints metrics — items handled per stage, plus end-to-end throughput (items/second).
  • Supports a configurable number of workers per stage (fan-out).
  • Demonstrates graceful shutdown — when the producer finishes, each stage drains its input and completes, closing its own output in turn.
Channel pipeline stages
Rendering diagram…

A single-module Gradle project — one Main.kt holds every stage plus the orchestration in main.

  • Directorypipeline/
    • build.gradle.kts coroutines dep + application plugin
    • settings.gradle.kts project name
    • Directorysrc/main/kotlin/
      • Main.kt all five stages plus the runBlocking driver

The only runtime dependency is kotlinx-coroutines-core, which provides produce, Channel, and the structured-concurrency machinery. The application plugin gives you ./gradlew run against the top-level main (compiled to the synthetic MainKt class).

build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
testImplementation(kotlin("test"))
}
application {
mainClass.set("MainKt")
}
tasks.test {
useJUnitPlatform()
}
settings.gradle.kts
rootProject.name = "pipeline"

produce { } is the key builder. It’s an extension on CoroutineScope that launches a coroutine, hands you a ProducerScope<T> (so you can send), and returns a ReceiveChannel<T> for downstream consumers. Crucially, the channel is closed automatically when the block finishes — that’s the graceful-shutdown signal that cascades down the pipeline. No manual close(), no Go-style defer close(ch).

The capacity = 10 argument gives each channel a buffered slot count, so a fast producer can run ahead of a slow consumer up to 10 items before it suspends — that’s backpressure for free.

src/main/kotlin/Main.kt
fun CoroutineScope.produceNumbers(count: Int): ReceiveChannel<Int> = produce(capacity = 10) {
for (i in 1..count) {
send(i)
}
println("[Producer] Sent $count numbers, closing.")
}

This stage takes a workers parameter and demonstrates fan-out: it launches workers child coroutines that all read from the same input channel with for (value in input). The channel hands each item to exactly one worker, so they naturally load-balance. An AtomicInteger tallies items across the workers safely.

The stage sends its squared Long onto the output channel as work completes. Once every worker job has joined (the input channel is exhausted and all loops have ended), the produce block returns — closing the output channel and signaling the next stage.

src/main/kotlin/Main.kt
fun CoroutineScope.squareStage(
input: ReceiveChannel<Int>,
workers: Int = 1,
): ReceiveChannel<Long> = produce(capacity = 10) {
val processed = AtomicInteger(0)
val jobs = (1..workers).map { workerId ->
launch {
for (value in input) {
val squared = value.toLong() * value.toLong()
send(squared)
processed.incrementAndGet()
}
}
}
jobs.forEach { it.join() }
println("[Square] Processed ${processed.get()} items with $workers worker(s).")
}

A single-coroutine stage: read each Long, forward only the ones above the threshold, and count both the passed and dropped items for the metrics line. Same auto-close behavior — when input is drained, the block ends and the output channel closes.

src/main/kotlin/Main.kt
fun CoroutineScope.filterStage(
input: ReceiveChannel<Long>,
threshold: Long,
): ReceiveChannel<Long> = produce(capacity = 10) {
var passed = 0
var dropped = 0
for (value in input) {
if (value > threshold) {
send(value)
passed++
} else {
dropped++
}
}
println("[Filter] Passed $passed, dropped $dropped (threshold > $threshold).")
}

The simplest stage: turn each Long into a display String via a string template ("Result: $value"). It changes the channel’s element type from Long to String, which is why the next channel is a ReceiveChannel<String>.

src/main/kotlin/Main.kt
fun CoroutineScope.formatStage(
input: ReceiveChannel<Long>,
): ReceiveChannel<String> = produce(capacity = 10) {
var count = 0
for (value in input) {
send("Result: $value")
count++
}
println("[Format] Formatted $count items.")
}

The terminal stage isn’t a produce builder — it doesn’t emit anything — so it’s an ordinary suspend fun that loops the final channel into a list. When the formatting channel closes, the loop ends and the collected results are returned.

src/main/kotlin/Main.kt
suspend fun consume(input: ReceiveChannel<String>): List<String> {
val results = mutableListOf<String>()
for (value in input) {
results.add(value)
}
return results
}

runBlocking provides the CoroutineScope that every produce extension hangs off, and it won’t return until all the child coroutines (every stage) have finished — that’s structured concurrency doing the lifecycle management for you. Building the pipeline is just chaining each stage’s output into the next stage’s input. measureTimeMillis wraps the whole run so we can report throughput.

src/main/kotlin/Main.kt
fun main() = runBlocking {
val count = 100
val threshold = 50L
val workerCount = 3
println("Pipeline: produce(1..$count) -> square(x$workerCount workers) -> filter(>$threshold) -> format -> consume")
println("=".repeat(80))
val results: List<String>
val elapsed = measureTimeMillis {
// Build pipeline: each stage reads from the previous and returns a new channel
val numbers = produceNumbers(count)
val squared = squareStage(numbers, workers = workerCount)
val filtered = filterStage(squared, threshold)
val formatted = formatStage(filtered)
results = consume(formatted)
}
// Print results
println("=".repeat(80))
println("Results (${results.size} items):")
results.take(10).forEach { println(" $it") }
if (results.size > 10) {
println(" ... and ${results.size - 10} more")
}
// Metrics
println("=".repeat(80))
val throughput = if (elapsed > 0) results.size * 1000.0 / elapsed else 0.0
println("Total time: ${elapsed}ms")
println("Throughput: ${"%.1f".format(throughput)} items/sec")
println("Pipeline complete.")
}

The full file also imports the coroutine machinery at the top:

src/main/kotlin/Main.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
  1. Run the pipeline (the application plugin runs main directly):

    Terminal window
    ./gradlew run
  2. Run the tests:

    Terminal window
    ./gradlew test

Each stage prints its own metrics line as it drains, then main prints the first 10 results, total time, and throughput — so you can watch the stages complete in order as the channels close from the producer downward.