Skip to content

Data Pipeline

Build a sales-data pipeline that streams CSV records through a chain of lazy stages — parse → filter → group → aggregate — and prints a summary report. The point is composition without materialization: the records flow one at a time, the way you’d consume a Go channel or an async stream, never holding the whole dataset in memory. This is where Python’s generators + itertools + dataclasses combine into something that reads like a small query language.

  • Generators (yield) and generator expressions for lazy evaluation
  • Composing stages with itertools (groupby, islice)
  • Modeling records with a frozen/slots dataclass and a @property
  • collections.defaultdict for group-then-aggregate
  • sorted(key=...), enumerate, f-string formatting (:,.2f)
  • Reading files lazily with pathlib

The program reads CSV rows with columns date,region,product,quantity,unit_price and produces a report with five sections.

  1. Parse each row into a Sale dataclass, streaming (a generator), skipping the header and silently dropping malformed rows.
  2. Print totals: total revenue, order count, average order value.
  3. Revenue by region, sorted by revenue descending.
  4. Top products by revenue, with total units.
  5. Daily trend — revenue per date, in date order.
  6. High-value orders — revenue over $100, highest first.
date,region,product,quantity,unit_price
2024-01-15,North,Widget,10,9.99
2024-01-15,South,Gadget,5,24.99
2024-01-16,North,Gadget,3,24.99
2024-01-16,East,Widget,20,9.99
2024-01-17,South,Widget,15,9.99
2024-01-17,North,Doohickey,8,4.99
2024-01-18,East,Gadget,12,24.99
2024-01-18,West,Widget,7,9.99
2024-01-19,North,Widget,25,9.99
2024-01-19,South,Doohickey,30,4.99
Parsed 10 sales records
=== Sales Report ===
Total Revenue: $1,458.65
Total Orders: 10
Average Order Value: $145.87
--- Revenue by Region ---
East $499.68 (2 orders)
North $464.54 (4 orders)
South $424.50 (3 orders)
West $69.93 (1 orders)
--- Top Products ---
1. Widget $769.23 (77 units)
2. Gadget $499.80 (20 units)
3. Doohickey $189.62 (38 units)
--- Daily Trend ---
2024-01-15: $224.85
2024-01-16: $274.77
2024-01-17: $189.77
2024-01-18: $369.81
2024-01-19: $399.45
--- High Value Orders (> $100) ---
2024-01-18, East: 12x Gadget = $299.88
2024-01-19, North: 25x Widget = $249.75
2024-01-16, East: 20x Widget = $199.80
2024-01-17, South: 15x Widget = $149.85
2024-01-19, South: 30x Doohickey = $149.70
2024-01-15, South: 5x Gadget = $124.95

A single-module uv project. No third-party deps — csv, itertools, and collections are all standard library. The CSV is read from a file via pathlib, parsed lazily, then reported.

  • Directorydata-pipeline/
    • pyproject.toml created by uv init
    • sales.csv the sample data
    • Directorysrc/
      • Directorydata_pipeline/
        • __init__.py
        • pipeline.py the whole pipeline
Terminal window
uv init --package data-pipeline
cd data-pipeline
# no runtime deps to add — it's all stdlib
uv add --dev ruff ty
pyproject.toml
[project]
name = "data-pipeline"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = []
[project.scripts]
pipeline = "data_pipeline.pipeline:main"
[build-system]
requires = ["uv_build>=0.8"]
build-backend = "uv_build"
[dependency-groups]
dev = ["ruff", "ty"]

Sale is frozen (records shouldn’t mutate after parsing) and uses slots (we may stream millions of these, so the memory saving matters). revenue is a @property — derived on access, not stored, the same as a TS getter or Kotlin’s val revenue get() = ....

src/data_pipeline/pipeline.py
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Sale:
date: str
region: str
product: str
quantity: int
unit_price: float
@property
def revenue(self) -> float:
return self.quantity * self.unit_price
@dataclass(frozen=True, slots=True)
class GroupSummary:
key: str
revenue: float
count: int
units: int

parse_sales takes an iterator of lines and yields Sale objects one at a time — it never builds a list. csv.DictReader consumes the header itself, so we don’t have to skip it. A bad int/float raises ValueError, which we catch to skip the row — the streaming equivalent of the mapNotNull/toIntOrNull pattern you’d use elsewhere.

src/data_pipeline/pipeline.py
import csv
from collections.abc import Iterable, Iterator
def parse_sales(lines: Iterable[str]) -> Iterator[Sale]:
reader = csv.DictReader(lines)
for row in reader: # lazy: pulls one row per request
try:
yield Sale(
date=row["date"].strip(),
region=row["region"].strip(),
product=row["product"].strip(),
quantity=int(row["quantity"]),
unit_price=float(row["unit_price"]),
)
except (KeyError, ValueError):
continue # malformed row — skip it

summarize is the group-then-aggregate move. A defaultdict whose factory makes a fresh [revenue, count, units] accumulator removes the “is this key present yet?” bookkeeping. getattr(s, by) lets one function group by "region" or "product" — the key selector, passed as a string.

src/data_pipeline/pipeline.py
from collections import defaultdict
def summarize(sales: Iterable[Sale], by: str) -> list[GroupSummary]:
acc: defaultdict[str, list[float]] = defaultdict(lambda: [0.0, 0, 0])
for s in sales:
bucket = acc[getattr(s, by)] # auto-creates [0.0, 0, 0] if new
bucket[0] += s.revenue
bucket[1] += 1
bucket[2] += s.quantity
summaries = [
GroupSummary(key, rev, int(cnt), int(units))
for key, (rev, cnt, units) in acc.items()
]
return sorted(summaries, key=lambda g: g.revenue, reverse=True)

Each section is a small pipeline. The daily trend uses itertools.groupby — which groups consecutive equal keys — so the input must be sorted by date first; that sort also gives us date order for free (ISO dates sort lexicographically). High-value orders is a generator expression piped straight into sorted.

src/data_pipeline/pipeline.py
from itertools import groupby
def generate_report(sales: list[Sale]) -> None:
total = sum(s.revenue for s in sales)
orders = len(sales)
avg = total / orders if orders else 0.0
print("=== Sales Report ===\n")
print(f"Total Revenue: ${total:,.2f}")
print(f"Total Orders: {orders}")
print(f"Average Order Value: ${avg:,.2f}")
print("\n--- Revenue by Region ---")
for g in summarize(sales, "region"):
print(f" {g.key:<6} ${g.revenue:,.2f} ({g.count} orders)")
print("\n--- Top Products ---")
for rank, g in enumerate(summarize(sales, "product"), start=1):
print(f" {rank}. {g.key:<10} ${g.revenue:,.2f} ({g.units} units)")
print("\n--- Daily Trend ---")
by_date = sorted(sales, key=lambda s: s.date) # groupby needs sorted input
for date, group in groupby(by_date, key=lambda s: s.date):
revenue = sum(s.revenue for s in group)
print(f" {date}: ${revenue:,.2f}")
print("\n--- High Value Orders (> $100) ---")
high_value = (s for s in sales if s.revenue > 100.0) # lazy genexp
for s in sorted(high_value, key=lambda s: s.revenue, reverse=True):
print(f" {s.date}, {s.region}: {s.quantity}x {s.product} = ${s.revenue:,.2f}")

main reads the file lazily with pathlib, runs it through the parser, and — the one non-streaming step — materializes the result into a list so the five sections can each iterate it. The also-style “Parsed N records” log is just a print before reporting.

src/data_pipeline/pipeline.py
from pathlib import Path
def main() -> None:
csv_path = Path(__file__).parent.parent.parent / "sales.csv"
with csv_path.open(encoding="utf-8") as f:
sales = list(parse_sales(f)) # drain the stream once, then reuse
print(f"Parsed {len(sales)} sales records\n")
generate_report(sales)
if __name__ == "__main__":
main()
sales.csv
date,region,product,quantity,unit_price
2024-01-15,North,Widget,10,9.99
2024-01-15,South,Gadget,5,24.99
2024-01-16,North,Gadget,3,24.99
2024-01-16,East,Widget,20,9.99
2024-01-17,South,Widget,15,9.99
2024-01-17,North,Doohickey,8,4.99
2024-01-18,East,Gadget,12,24.99
2024-01-18,West,Widget,7,9.99
2024-01-19,North,Widget,25,9.99
2024-01-19,South,Doohickey,30,4.99
  1. Sync the project (creates the venv, installs the dev tools):

    Terminal window
    uv sync
  2. Run the pipeline (the script entry point, or the module directly):

    Terminal window
    uv run pipeline
    # or: uv run python -m data_pipeline.pipeline
  3. Lint, format, and type-check (ty is Astral’s fast checker; mypy is the mature equivalent):

    Terminal window
    uv run ruff check .
    uv run ruff format .
    uv run ty check