Data Pipeline
Build a sales-data pipeline that streams CSV records through a chain of lazy
stages — parse → filter → group → aggregate — and prints a summary report. The
point is composition without materialization: the records flow one at a time, the
way you’d consume a Go channel or an async stream, never holding the whole dataset
in memory. This is where Python’s generators + itertools + dataclasses combine
into something that reads like a small query language.
What you’ll practice
Section titled “What you’ll practice”- Generators (
yield) and generator expressions for lazy evaluation - Composing stages with
itertools(groupby,islice) - Modeling records with a
frozen/slotsdataclassand a@property collections.defaultdictfor group-then-aggregatesorted(key=...),enumerate, f-string formatting (:,.2f)- Reading files lazily with
pathlib
Requirements
Section titled “Requirements”The program reads CSV rows with columns
date,region,product,quantity,unit_price and produces a report with five
sections.
- Parse each row into a
Saledataclass, streaming (a generator), skipping the header and silently dropping malformed rows. - Print totals: total revenue, order count, average order value.
- Revenue by region, sorted by revenue descending.
- Top products by revenue, with total units.
- Daily trend — revenue per date, in date order.
- High-value orders — revenue over $100, highest first.
Example input
Section titled “Example input”date,region,product,quantity,unit_price2024-01-15,North,Widget,10,9.992024-01-15,South,Gadget,5,24.992024-01-16,North,Gadget,3,24.992024-01-16,East,Widget,20,9.992024-01-17,South,Widget,15,9.992024-01-17,North,Doohickey,8,4.992024-01-18,East,Gadget,12,24.992024-01-18,West,Widget,7,9.992024-01-19,North,Widget,25,9.992024-01-19,South,Doohickey,30,4.99Expected output
Section titled “Expected output”Parsed 10 sales records
=== Sales Report ===
Total Revenue: $1,458.65Total Orders: 10Average Order Value: $145.87
--- Revenue by Region --- East $499.68 (2 orders) North $464.54 (4 orders) South $424.50 (3 orders) West $69.93 (1 orders)
--- Top Products --- 1. Widget $769.23 (77 units) 2. Gadget $499.80 (20 units) 3. Doohickey $189.62 (38 units)
--- Daily Trend --- 2024-01-15: $224.85 2024-01-16: $274.77 2024-01-17: $189.77 2024-01-18: $369.81 2024-01-19: $399.45
--- High Value Orders (> $100) --- 2024-01-18, East: 12x Gadget = $299.88 2024-01-19, North: 25x Widget = $249.75 2024-01-16, East: 20x Widget = $199.80 2024-01-17, South: 15x Widget = $149.85 2024-01-19, South: 30x Doohickey = $149.70 2024-01-15, South: 5x Gadget = $124.95The worked solution
Section titled “The worked solution”A single-module uv project. No third-party deps — csv, itertools, and
collections are all standard library. The CSV is read from a file via
pathlib, parsed lazily, then reported.
Directorydata-pipeline/
- pyproject.toml created by
uv init - sales.csv the sample data
Directorysrc/
Directorydata_pipeline/
- __init__.py
- pipeline.py the whole pipeline
- pyproject.toml created by
Project setup
Section titled “Project setup”uv init --package data-pipelinecd data-pipeline# no runtime deps to add — it's all stdlibuv add --dev ruff ty[project]name = "data-pipeline"version = "0.1.0"requires-python = ">=3.13"dependencies = []
[project.scripts]pipeline = "data_pipeline.pipeline:main"
[build-system]requires = ["uv_build>=0.8"]build-backend = "uv_build"
[dependency-groups]dev = ["ruff", "ty"]The record model
Section titled “The record model”Sale is frozen (records shouldn’t mutate after parsing) and uses slots (we
may stream millions of these, so the memory saving matters). revenue is a
@property — derived on access, not stored, the same as a TS getter or Kotlin’s
val revenue get() = ....
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)class Sale: date: str region: str product: str quantity: int unit_price: float
@property def revenue(self) -> float: return self.quantity * self.unit_price
@dataclass(frozen=True, slots=True)class GroupSummary: key: str revenue: float count: int units: intParsing as a generator (stage 1)
Section titled “Parsing as a generator (stage 1)”parse_sales takes an iterator of lines and yields Sale objects one at a
time — it never builds a list. csv.DictReader consumes the header itself, so we
don’t have to skip it. A bad int/float raises ValueError, which we catch to
skip the row — the streaming equivalent of the mapNotNull/toIntOrNull pattern
you’d use elsewhere.
import csvfrom collections.abc import Iterable, Iterator
def parse_sales(lines: Iterable[str]) -> Iterator[Sale]: reader = csv.DictReader(lines) for row in reader: # lazy: pulls one row per request try: yield Sale( date=row["date"].strip(), region=row["region"].strip(), product=row["product"].strip(), quantity=int(row["quantity"]), unit_price=float(row["unit_price"]), ) except (KeyError, ValueError): continue # malformed row — skip itGroup-and-aggregate (stage 2)
Section titled “Group-and-aggregate (stage 2)”summarize is the group-then-aggregate move. A defaultdict whose factory makes
a fresh [revenue, count, units] accumulator removes the “is this key present
yet?” bookkeeping. getattr(s, by) lets one function group by "region" or
"product" — the key selector, passed as a string.
from collections import defaultdict
def summarize(sales: Iterable[Sale], by: str) -> list[GroupSummary]: acc: defaultdict[str, list[float]] = defaultdict(lambda: [0.0, 0, 0]) for s in sales: bucket = acc[getattr(s, by)] # auto-creates [0.0, 0, 0] if new bucket[0] += s.revenue bucket[1] += 1 bucket[2] += s.quantity summaries = [ GroupSummary(key, rev, int(cnt), int(units)) for key, (rev, cnt, units) in acc.items() ] return sorted(summaries, key=lambda g: g.revenue, reverse=True)The report (stage 3)
Section titled “The report (stage 3)”Each section is a small pipeline. The daily trend uses itertools.groupby — which
groups consecutive equal keys — so the input must be sorted by date first;
that sort also gives us date order for free (ISO dates sort lexicographically).
High-value orders is a generator expression piped straight into sorted.
from itertools import groupby
def generate_report(sales: list[Sale]) -> None: total = sum(s.revenue for s in sales) orders = len(sales) avg = total / orders if orders else 0.0
print("=== Sales Report ===\n") print(f"Total Revenue: ${total:,.2f}") print(f"Total Orders: {orders}") print(f"Average Order Value: ${avg:,.2f}")
print("\n--- Revenue by Region ---") for g in summarize(sales, "region"): print(f" {g.key:<6} ${g.revenue:,.2f} ({g.count} orders)")
print("\n--- Top Products ---") for rank, g in enumerate(summarize(sales, "product"), start=1): print(f" {rank}. {g.key:<10} ${g.revenue:,.2f} ({g.units} units)")
print("\n--- Daily Trend ---") by_date = sorted(sales, key=lambda s: s.date) # groupby needs sorted input for date, group in groupby(by_date, key=lambda s: s.date): revenue = sum(s.revenue for s in group) print(f" {date}: ${revenue:,.2f}")
print("\n--- High Value Orders (> $100) ---") high_value = (s for s in sales if s.revenue > 100.0) # lazy genexp for s in sorted(high_value, key=lambda s: s.revenue, reverse=True): print(f" {s.date}, {s.region}: {s.quantity}x {s.product} = ${s.revenue:,.2f}")Wiring it together
Section titled “Wiring it together”main reads the file lazily with pathlib, runs it through the parser, and — the
one non-streaming step — materializes the result into a list so the five sections
can each iterate it. The also-style “Parsed N records” log is just a print
before reporting.
from pathlib import Path
def main() -> None: csv_path = Path(__file__).parent.parent.parent / "sales.csv"
with csv_path.open(encoding="utf-8") as f: sales = list(parse_sales(f)) # drain the stream once, then reuse
print(f"Parsed {len(sales)} sales records\n") generate_report(sales)
if __name__ == "__main__": main()date,region,product,quantity,unit_price2024-01-15,North,Widget,10,9.992024-01-15,South,Gadget,5,24.992024-01-16,North,Gadget,3,24.992024-01-16,East,Widget,20,9.992024-01-17,South,Widget,15,9.992024-01-17,North,Doohickey,8,4.992024-01-18,East,Gadget,12,24.992024-01-18,West,Widget,7,9.992024-01-19,North,Widget,25,9.992024-01-19,South,Doohickey,30,4.99Run it
Section titled “Run it”-
Sync the project (creates the venv, installs the dev tools):
Terminal window uv sync -
Run the pipeline (the script entry point, or the module directly):
Terminal window uv run pipeline# or: uv run python -m data_pipeline.pipeline -
Lint, format, and type-check (
tyis Astral’s fast checker;mypyis the mature equivalent):Terminal window uv run ruff check .uv run ruff format .uv run ty check