Data Processing Pipeline with Flow
Build a file-processing pipeline with Kotlin Flow operators: read CSV data line
by line, parse it, filter and transform records, aggregate the results, and write
a summary out — all with proper dispatcher usage and error handling. If you’ve
ever chained .filter().map().reduce() over an array in TS, this is the same
shape, except each stage is a suspend-friendly, backpressure-aware step in an
async stream.
What you’ll build
Section titled “What you’ll build”A six-stage pipeline, each stage a Flow operator:
- Read stage — read a CSV file line by line as a
Flow<String>, onDispatchers.IO. - Parse stage — turn each line into a
Transaction, skipping malformed lines. - Filter stage — drop transactions below a minimum amount.
- Transform stage — normalize the data (uppercase category, round the amount).
- Aggregate stage — group by category, compute totals and averages.
- Output stage — write the summary to a file, back on
Dispatchers.IO.
The data model:
data class Transaction( val id: String, val amount: Double, val category: String, val date: String, val description: String,)
data class CategorySummary( val category: String, val count: Int, val totalAmount: Double, val averageAmount: Double,)And the input it consumes — a CSV with a header row, where some lines are
deliberately malformed (the invalid amount below):
id,amount,category,date,descriptionTXN001,150.00,electronics,2024-01-15,Wireless headphonesTXN002,25.50,food,2024-01-15,Lunch deliveryTXN003,invalid,food,2024-01-16,Bad recordThe pipeline at a glance
Section titled “The pipeline at a glance”A Flow is a cold, sequential async stream: nothing runs until a terminal
operator (toList, collect) pulls on it, and each upstream stage only produces
the next value when the downstream is ready for it — that’s the built-in
backpressure. flowOn shifts the dispatcher for everything upstream of it, so
the blocking file read happens on Dispatchers.IO while the CPU-bound transform
runs on Dispatchers.Default.
flowchart TB R["readCsvLines (flow builder)"] -->|"flowOn Dispatchers.IO"| O1["onEach: count lines read"] O1 --> P["mapNotNull: parseLine, skip nulls"] P --> F["filter: amount >= minAmount"] F --> O2["onEach: count after filter"] O2 --> N["map: normalize"] N -->|"flowOn Dispatchers.Default"| C["catch: log pipeline errors"] C --> T["toList (terminal)"] T --> A["aggregate: groupBy category"] A --> W["writeSummary (Dispatchers.IO)"]
The worked solution
Section titled “The worked solution”A single-module Gradle project — one Main.kt holds the whole pipeline.
Directorydata-pipeline/
- build.gradle.kts coroutines deps + application plugin
- settings.gradle.kts project name
Directorysrc/main/kotlin/
- Main.kt data models, pipeline stages, and main
build.gradle.kts
Section titled “build.gradle.kts”The only runtime dependency is kotlinx-coroutines-core, which brings both
coroutines and the Flow API. The application plugin gives us ./gradlew run.
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}rootProject.name = "data-pipeline"The read stage — a flow { } builder
Section titled “The read stage — a flow { } builder”The flow { } builder lets you emit values one at a time. Here we open a
BufferedReader, drop the header line, and emit each remaining line. The
.flowOn(Dispatchers.IO) at the end moves the blocking file read onto the IO
thread pool — the same reasoning as wrapping a blocking call in
withContext(Dispatchers.IO), but for a whole stream.
fun readCsvLines(file: File): Flow<String> = flow { file.bufferedReader().use { reader -> reader.lineSequence() .drop(1) // skip header .forEach { line -> emit(line) } }}.flowOn(Dispatchers.IO)lineSequence() is itself lazy, and emit suspends until the collector is
ready, so a million-line file never materializes in memory at once — that’s the
backpressure paying off.
The parse, transform, and aggregate stages
Section titled “The parse, transform, and aggregate stages”parseLine splits on commas and returns null for anything malformed — a bad
column count, or an amount that isn’t a number (toDoubleOrNull() is the safe,
exception-free parse). Returning null is what lets the pipeline use
.mapNotNull { } to skip bad rows without a try/catch per line.
fun parseLine(line: String): Transaction? { val parts = line.split(",", limit = 5) if (parts.size < 5) return null val amount = parts[1].trim().toDoubleOrNull() ?: return null return Transaction( id = parts[0].trim(), amount = amount, category = parts[2].trim(), date = parts[3].trim(), description = parts[4].trim(), )}normalize uses data class copy to produce a new record with an uppercased
category and a rounded amount — data classes are immutable, so you transform by
copying, not mutating.
fun normalize(tx: Transaction): Transaction = tx.copy( category = tx.category.uppercase(), amount = Math.round(tx.amount * 100.0) / 100.0,)The aggregate stage is plain collection math once the Flow has been drained to a
List: groupBy { it.category } yields a Map<String, List<Transaction>>, then
each group folds down to a CategorySummary. This is the same code you’d write
over a regular list — Flow only governs how the items arrive, not how you
summarize them.
fun aggregate(transactions: List<Transaction>): List<CategorySummary> = transactions .groupBy { it.category } .map { (category, txns) -> CategorySummary( category = category, count = txns.size, totalAmount = Math.round(txns.sumOf { it.amount } * 100.0) / 100.0, averageAmount = Math.round(txns.sumOf { it.amount } / txns.size * 100.0) / 100.0, ) } .sortedByDescending { it.totalAmount }The write stage
Section titled “The write stage”Writing is a suspend function that flips to Dispatchers.IO with
withContext — the standard way to run a blocking call from a coroutine without
freezing the calling thread.
suspend fun writeSummary(file: File, summaries: List<CategorySummary>) = withContext(Dispatchers.IO) { file.bufferedWriter().use { writer -> writer.write("category,count,total,average") writer.newLine() for (summary in summaries) { writer.write("${summary.category},${summary.count},${summary.totalAmount},${summary.averageAmount}") writer.newLine() } }}Wiring the stages together in main
Section titled “Wiring the stages together in main”This is where the Flow operators chain into the actual pipeline. Read it top-to-bottom — it reads like the diagram above:
.onEach { totalRead++ }— a side-effect tap for progress/metrics; it passes each item through unchanged..mapNotNull { ... }— parse, and drop thenulls (the bad rows), counting errors as we go..filter { it.amount >= minAmount }— the filter stage..map { normalize(it) }— the transform stage..flowOn(Dispatchers.Default)— everything above this line (parse, filter, transform) runs on the CPU-bound default dispatcher; the read stage already reset itself to IO with its ownflowOn..catch { e -> ... }— catches exceptions from anywhere upstream, so one bad record can’t tear down the whole stream..toList()— the terminal operator that finally runs the cold Flow.
fun main() = runBlocking { val inputFile = File("sample-transactions.csv") val outputFile = File("summary-output.csv") val minAmount = 50.0
// Generate sample data if not present if (!inputFile.exists()) { println("Generating sample CSV with 1000 transactions...") generateSampleCsv(inputFile, 1000) }
println("Processing ${inputFile.name}...") println("Filter: amount > $minAmount") println("=".repeat(60))
var parseErrors = 0 var totalRead = 0 var totalFiltered = 0
val elapsed = measureTimeMillis { val transactions = readCsvLines(inputFile) .onEach { totalRead++ } .mapNotNull { line -> parseLine(line).also { if (it == null) parseErrors++ } } .filter { it.amount >= minAmount } .onEach { totalFiltered++ } .map { normalize(it) } .flowOn(Dispatchers.Default) .catch { e -> println("Pipeline error: ${e.message}") } .toList()
val summaries = aggregate(transactions)
// Print results println("\nCategory Summaries:") println("-".repeat(60)) println("%-15s %6s %12s %12s".format("Category", "Count", "Total", "Average")) println("-".repeat(60)) for (s in summaries) { println("%-15s %6d %12.2f %12.2f".format(s.category, s.count, s.totalAmount, s.averageAmount)) }
// Write to file writeSummary(outputFile, summaries) }
// Stats println("=".repeat(60)) println("Lines read: $totalRead") println("Parse errors: $parseErrors") println("After filter: $totalFiltered") println("Time: ${elapsed}ms") println("Output written: ${outputFile.absolutePath}")}The harness also includes a generateSampleCsv helper that seeds a
1000-transaction file (with ~5% deliberately malformed rows) on first run, so the
pipeline has something to chew on without you supplying input.
Run it
Section titled “Run it”-
Run the pipeline. On first run it generates
sample-transactions.csvautomatically:Terminal window ./gradlew run -
Inspect the written summary:
Terminal window cat summary-output.csv -
Run the tests:
Terminal window ./gradlew test