Skip to content

Data Processing Pipeline with Flow

Build a file-processing pipeline with Kotlin Flow operators: read CSV data line by line, parse it, filter and transform records, aggregate the results, and write a summary out — all with proper dispatcher usage and error handling. If you’ve ever chained .filter().map().reduce() over an array in TS, this is the same shape, except each stage is a suspend-friendly, backpressure-aware step in an async stream.

A six-stage pipeline, each stage a Flow operator:

  1. Read stage — read a CSV file line by line as a Flow<String>, on Dispatchers.IO.
  2. Parse stage — turn each line into a Transaction, skipping malformed lines.
  3. Filter stage — drop transactions below a minimum amount.
  4. Transform stage — normalize the data (uppercase category, round the amount).
  5. Aggregate stage — group by category, compute totals and averages.
  6. Output stage — write the summary to a file, back on Dispatchers.IO.

The data model:

data class Transaction(
val id: String,
val amount: Double,
val category: String,
val date: String,
val description: String,
)
data class CategorySummary(
val category: String,
val count: Int,
val totalAmount: Double,
val averageAmount: Double,
)

And the input it consumes — a CSV with a header row, where some lines are deliberately malformed (the invalid amount below):

id,amount,category,date,description
TXN001,150.00,electronics,2024-01-15,Wireless headphones
TXN002,25.50,food,2024-01-15,Lunch delivery
TXN003,invalid,food,2024-01-16,Bad record

A Flow is a cold, sequential async stream: nothing runs until a terminal operator (toList, collect) pulls on it, and each upstream stage only produces the next value when the downstream is ready for it — that’s the built-in backpressure. flowOn shifts the dispatcher for everything upstream of it, so the blocking file read happens on Dispatchers.IO while the CPU-bound transform runs on Dispatchers.Default.

Flow pipeline stages
Rendering diagram…

A single-module Gradle project — one Main.kt holds the whole pipeline.

  • Directorydata-pipeline/
    • build.gradle.kts coroutines deps + application plugin
    • settings.gradle.kts project name
    • Directorysrc/main/kotlin/
      • Main.kt data models, pipeline stages, and main

The only runtime dependency is kotlinx-coroutines-core, which brings both coroutines and the Flow API. The application plugin gives us ./gradlew run.

build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
testImplementation(kotlin("test"))
}
application {
mainClass.set("MainKt")
}
tasks.test {
useJUnitPlatform()
}
settings.gradle.kts
rootProject.name = "data-pipeline"

The flow { } builder lets you emit values one at a time. Here we open a BufferedReader, drop the header line, and emit each remaining line. The .flowOn(Dispatchers.IO) at the end moves the blocking file read onto the IO thread pool — the same reasoning as wrapping a blocking call in withContext(Dispatchers.IO), but for a whole stream.

src/main/kotlin/Main.kt
fun readCsvLines(file: File): Flow<String> = flow {
file.bufferedReader().use { reader ->
reader.lineSequence()
.drop(1) // skip header
.forEach { line ->
emit(line)
}
}
}.flowOn(Dispatchers.IO)

lineSequence() is itself lazy, and emit suspends until the collector is ready, so a million-line file never materializes in memory at once — that’s the backpressure paying off.

The parse, transform, and aggregate stages

Section titled “The parse, transform, and aggregate stages”

parseLine splits on commas and returns null for anything malformed — a bad column count, or an amount that isn’t a number (toDoubleOrNull() is the safe, exception-free parse). Returning null is what lets the pipeline use .mapNotNull { } to skip bad rows without a try/catch per line.

src/main/kotlin/Main.kt
fun parseLine(line: String): Transaction? {
val parts = line.split(",", limit = 5)
if (parts.size < 5) return null
val amount = parts[1].trim().toDoubleOrNull() ?: return null
return Transaction(
id = parts[0].trim(),
amount = amount,
category = parts[2].trim(),
date = parts[3].trim(),
description = parts[4].trim(),
)
}

normalize uses data class copy to produce a new record with an uppercased category and a rounded amount — data classes are immutable, so you transform by copying, not mutating.

src/main/kotlin/Main.kt
fun normalize(tx: Transaction): Transaction = tx.copy(
category = tx.category.uppercase(),
amount = Math.round(tx.amount * 100.0) / 100.0,
)

The aggregate stage is plain collection math once the Flow has been drained to a List: groupBy { it.category } yields a Map<String, List<Transaction>>, then each group folds down to a CategorySummary. This is the same code you’d write over a regular list — Flow only governs how the items arrive, not how you summarize them.

src/main/kotlin/Main.kt
fun aggregate(transactions: List<Transaction>): List<CategorySummary> =
transactions
.groupBy { it.category }
.map { (category, txns) ->
CategorySummary(
category = category,
count = txns.size,
totalAmount = Math.round(txns.sumOf { it.amount } * 100.0) / 100.0,
averageAmount = Math.round(txns.sumOf { it.amount } / txns.size * 100.0) / 100.0,
)
}
.sortedByDescending { it.totalAmount }

Writing is a suspend function that flips to Dispatchers.IO with withContext — the standard way to run a blocking call from a coroutine without freezing the calling thread.

src/main/kotlin/Main.kt
suspend fun writeSummary(file: File, summaries: List<CategorySummary>) = withContext(Dispatchers.IO) {
file.bufferedWriter().use { writer ->
writer.write("category,count,total,average")
writer.newLine()
for (summary in summaries) {
writer.write("${summary.category},${summary.count},${summary.totalAmount},${summary.averageAmount}")
writer.newLine()
}
}
}

This is where the Flow operators chain into the actual pipeline. Read it top-to-bottom — it reads like the diagram above:

  • .onEach { totalRead++ } — a side-effect tap for progress/metrics; it passes each item through unchanged.
  • .mapNotNull { ... } — parse, and drop the nulls (the bad rows), counting errors as we go.
  • .filter { it.amount >= minAmount } — the filter stage.
  • .map { normalize(it) } — the transform stage.
  • .flowOn(Dispatchers.Default) — everything above this line (parse, filter, transform) runs on the CPU-bound default dispatcher; the read stage already reset itself to IO with its own flowOn.
  • .catch { e -> ... } — catches exceptions from anywhere upstream, so one bad record can’t tear down the whole stream.
  • .toList() — the terminal operator that finally runs the cold Flow.
src/main/kotlin/Main.kt
fun main() = runBlocking {
val inputFile = File("sample-transactions.csv")
val outputFile = File("summary-output.csv")
val minAmount = 50.0
// Generate sample data if not present
if (!inputFile.exists()) {
println("Generating sample CSV with 1000 transactions...")
generateSampleCsv(inputFile, 1000)
}
println("Processing ${inputFile.name}...")
println("Filter: amount > $minAmount")
println("=".repeat(60))
var parseErrors = 0
var totalRead = 0
var totalFiltered = 0
val elapsed = measureTimeMillis {
val transactions = readCsvLines(inputFile)
.onEach { totalRead++ }
.mapNotNull { line ->
parseLine(line).also { if (it == null) parseErrors++ }
}
.filter { it.amount >= minAmount }
.onEach { totalFiltered++ }
.map { normalize(it) }
.flowOn(Dispatchers.Default)
.catch { e -> println("Pipeline error: ${e.message}") }
.toList()
val summaries = aggregate(transactions)
// Print results
println("\nCategory Summaries:")
println("-".repeat(60))
println("%-15s %6s %12s %12s".format("Category", "Count", "Total", "Average"))
println("-".repeat(60))
for (s in summaries) {
println("%-15s %6d %12.2f %12.2f".format(s.category, s.count, s.totalAmount, s.averageAmount))
}
// Write to file
writeSummary(outputFile, summaries)
}
// Stats
println("=".repeat(60))
println("Lines read: $totalRead")
println("Parse errors: $parseErrors")
println("After filter: $totalFiltered")
println("Time: ${elapsed}ms")
println("Output written: ${outputFile.absolutePath}")
}

The harness also includes a generateSampleCsv helper that seeds a 1000-transaction file (with ~5% deliberately malformed rows) on first run, so the pipeline has something to chew on without you supplying input.

  1. Run the pipeline. On first run it generates sample-transactions.csv automatically:

    Terminal window
    ./gradlew run
  2. Inspect the written summary:

    Terminal window
    cat summary-output.csv
  3. Run the tests:

    Terminal window
    ./gradlew test