Real-Time Processing Pipeline
Build a complete in-process real-time processing pipeline with nothing but
asyncio: a simulated event source (an async generator) feeds a chain of
transform stages (parse → filter → enrich), which feed a windowed
aggregator that emits a rolling summary every few seconds. The stages are
decoupled by bounded asyncio.Queues so a slow stage applies backpressure to
a fast one, and the whole thing shuts down gracefully on Ctrl-C — every
stage drains, every resource closes.
If you’ve built a Kotlin Flow pipeline or chained RxJS operators, this is the
same shape. The difference: you’ll see the parts — the generator, the queues,
the worker tasks — instead of them hiding behind operator names. That’s the
point.
What you’ll practice
Section titled “What you’ll practice”- Async generators as a stream source (
async def+yield). - Hand-built stream operators (parse, filter, enrich) as async generators that consume async generators.
asyncio.Queue(maxsize=...)for backpressure between stages.- A windowed aggregator — time-bucketed
reduce, the bread-and-butter of real-time analytics. asyncio.TaskGroupto supervise the stages.- Graceful shutdown: sentinel propagation,
CancelledErrorhandling, andfinally/aclose()cleanup.
Requirements
Section titled “Requirements”- The source emits simulated events forever (until cancelled), at a slightly
irregular rate, as raw
strlines (simulating a socket/log feed). - parse turns each raw line into an
Event, dropping malformed lines. - filter drops events below a severity threshold.
- enrich adds a derived field (a region looked up from an async “service”, with a small simulated latency — this is the stage that benefits from buffering).
- The aggregator keeps a tumbling time window (e.g. 2s) and, when the window closes, emits a summary: count and per-type breakdown for that window.
- Stages are connected by bounded queues; the pipeline must apply backpressure, not grow memory without bound.
- Ctrl-C triggers a clean shutdown: the source stops, in-flight items drain, the final partial window flushes, and every task exits.
The pipeline at a glance
Section titled “The pipeline at a glance”flowchart LR SRC["event_source()<br/>async generator"] --> Q1["raw Queue<br/>maxsize=50"] Q1 --> PRS["parse worker"] PRS --> Q2["parsed Queue<br/>maxsize=50"] Q2 --> FLT["filter worker"] FLT --> Q3["filtered Queue<br/>maxsize=50"] Q3 --> ENR["enrich worker<br/>(async lookup)"] ENR --> Q4["enriched Queue<br/>maxsize=50"] Q4 --> AGG["windowed aggregator<br/>(2s tumbling)"] AGG --> OUT["print summary"]
Each stage is a task that reads from its input queue, does its work, and writes
to its output queue. The bounded queues are the backpressure: if enrich (slow,
it awaits a lookup) falls behind, its input queue fills, which stalls filter,
which stalls parse, which stalls the source. The whole chain self-throttles to
the slowest stage — exactly like a buffered Go channel pipeline.
The worked solution
Section titled “The worked solution”A single-module uv project. The whole pipeline lives in src/pipeline/main.py;
the data model is a dataclass.
Directoryrealtime-pipeline/
- pyproject.toml uv project, ruff + ty config
Directorysrc/
Directorypipeline/
- init .py
- main.py source, stages, aggregator, wiring, shutdown
pyproject.toml
Section titled “pyproject.toml”No runtime dependencies — asyncio is stdlib. The dev group carries ruff and
ty. (If you wanted the operator-library route, this is where aiostream would
go; we’re doing it by hand.)
[project]name = "realtime-pipeline"version = "0.1.0"description = "An in-process async streaming pipeline with bounded queues"requires-python = ">=3.13"dependencies = []
[dependency-groups]dev = ["ruff", "ty"]
[build-system]requires = ["uv_build>=0.8"]build-backend = "uv_build"
[tool.ruff]target-version = "py313"
[tool.ruff.lint]select = ["E", "F", "I", "UP", "B", "ASYNC"]
[tool.ty.rules]# ty is young; loosen as needed while it matures.The data model and the source
Section titled “The data model and the source”The source is an async generator that fabricates raw event lines forever. Real feeds are messy, so ~1 in 8 lines is deliberately malformed to exercise the parse stage’s error handling.
from __future__ import annotations
import asyncioimport randomimport signalfrom collections import Counterfrom collections.abc import AsyncIteratorfrom dataclasses import dataclass, field
# --- Data model -----------------------------------------------------------
SEVERITY = {"debug": 10, "info": 20, "warn": 30, "error": 40}
@dataclass(slots=True)class Event: ts: float type: str # e.g. "order", "auth", "payment" severity: str # debug | info | warn | error user_id: int region: str = "" # filled in by the enrich stage
# --- Stage 0: the source (async generator) --------------------------------
_TYPES = ("order", "auth", "payment", "search")_SEVS = ("debug", "info", "info", "warn", "error") # weighted toward info
async def event_source() -> AsyncIterator[str]: """Simulate a raw event feed: emit malformed-ish CSV lines forever.""" seq = 0 try: while True: await asyncio.sleep(random.uniform(0.01, 0.06)) # irregular rate seq += 1 if seq % 8 == 0: yield "GARBAGE;not;a;valid;line" # malformed continue ts = asyncio.get_running_loop().time() etype = random.choice(_TYPES) sev = random.choice(_SEVS) uid = random.randint(1, 5) yield f"{ts:.3f},{etype},{sev},{uid}" finally: print("[source] stopped") # runs on cancelThe finally is the source’s cleanup hook — when the pipeline is cancelled, the
generator receives GeneratorExit at its await asyncio.sleep, and finally
runs. A real source would close its socket here.
The transform stages
Section titled “The transform stages”Each stage is a coroutine that loops: read input queue → transform → write output queue, and propagates a sentinel when its input is exhausted so the next stage knows to stop. This is the explicit version of an async-generator chain; queues sit between stages so they run concurrently with backpressure.
SENTINEL = object() # the "no more items" signal (asyncio.Queue has no close())
async def parse_stage(src: asyncio.Queue, out: asyncio.Queue) -> None: """str -> Event, dropping malformed lines.""" while True: line = await src.get() if line is SENTINEL: await out.put(SENTINEL) return parts = line.split(",") if len(parts) != 4: continue # drop malformed try: event = Event( ts=float(parts[0]), type=parts[1], severity=parts[2], user_id=int(parts[3]), ) except ValueError: continue # drop unparseable await out.put(event)
async def filter_stage(src: asyncio.Queue, out: asyncio.Queue, min_sev: str) -> None: """Drop events below the severity threshold.""" threshold = SEVERITY[min_sev] while True: event = await src.get() if event is SENTINEL: await out.put(SENTINEL) return if SEVERITY.get(event.severity, 0) >= threshold: await out.put(event)
# A tiny async "service" the enrich stage calls — note the latency.async def lookup_region(user_id: int) -> str: await asyncio.sleep(0.03) # simulated I/O — this is the slow stage return ("us", "eu", "apac", "sa", "af")[user_id % 5]
async def enrich_stage(src: asyncio.Queue, out: asyncio.Queue) -> None: """Add a region by calling an async lookup service.""" while True: event = await src.get() if event is SENTINEL: await out.put(SENTINEL) return event.region = await lookup_region(event.user_id) # the await that costs await out.put(event)enrich_stage is the bottleneck on purpose: every event pays a 30ms lookup. With
bounded queues, that latency backpressures all the way to the source, so memory
stays flat. Remove the maxsize on the queues and watch the upstream queues grow
unbounded — the classic incident.
The windowed aggregator
Section titled “The windowed aggregator”The aggregator is where streaming gets interesting. It keeps a tumbling
window: it accumulates events into the current window and, every window
seconds, emits a summary and starts a fresh window. We drive the window boundary
with asyncio.wait_for on the queue get() — if no event arrives before the
window deadline, the timeout fires and we flush.
@dataclass(slots=True)class WindowSummary: started_at: float count: int by_type: dict[str, int] = field(default_factory=dict) by_region: dict[str, int] = field(default_factory=dict)
async def windowed_aggregator(src: asyncio.Queue, window: float) -> None: """Tumbling window: every `window` seconds, summarize what arrived.""" loop = asyncio.get_running_loop() deadline = loop.time() + window by_type: Counter[str] = Counter() by_region: Counter[str] = Counter() count = 0 window_start = loop.time()
def flush() -> None: nonlocal count, window_start, deadline if count: summary = WindowSummary( started_at=window_start, count=count, by_type=dict(by_type), by_region=dict(by_region), ) print( f"[window] {summary.count:3d} events | " f"types={summary.by_type} | regions={summary.by_region}" ) by_type.clear() by_region.clear() count = 0 window_start = loop.time() deadline = window_start + window
while True: timeout = max(0.0, deadline - loop.time()) try: event = await asyncio.wait_for(src.get(), timeout=timeout) except TimeoutError: flush() # window elapsed with no new event continue
if event is SENTINEL: flush() # flush the final partial window return
count += 1 by_type[event.type] += 1 by_region[event.region] += 1 if loop.time() >= deadline: flush() # window elapsedThis is a hand-rolled equivalent of a Kotlin Flow chunked-by-time, or RxJS
bufferTime followed by a scan/reduce. The nonlocal closure keeps the
window state without a class; the flush() is the only place that resets it,
which keeps the boundary logic in one spot.
Wiring it together with TaskGroup and graceful shutdown
Section titled “Wiring it together with TaskGroup and graceful shutdown”main builds the bounded queues, then a TaskGroup supervises every stage. The
source-pump task is what turns the cold generator into a feed: it pulls from the
generator and puts into the first queue, and on shutdown it sends the sentinel
that cascades down the whole chain.
async def pump_source(out: asyncio.Queue, stop: asyncio.Event) -> None: """Drive the cold generator into the first queue until told to stop.""" from contextlib import aclosing
async with aclosing(event_source()) as stream: # deterministic cleanup async for line in stream: if stop.is_set(): break await out.put(line) # backpressure happens here await out.put(SENTINEL) # cascade the done-signal
async def run_pipeline() -> None: q_raw: asyncio.Queue = asyncio.Queue(maxsize=50) q_parsed: asyncio.Queue = asyncio.Queue(maxsize=50) q_filtered: asyncio.Queue = asyncio.Queue(maxsize=50) q_enriched: asyncio.Queue = asyncio.Queue(maxsize=50) stop = asyncio.Event()
# Ctrl-C / SIGTERM set the stop event; the source then drains cleanly. loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set)
print("[pipeline] running — Ctrl-C to stop") async with asyncio.TaskGroup() as tg: tg.create_task(pump_source(q_raw, stop)) tg.create_task(parse_stage(q_raw, q_parsed)) tg.create_task(filter_stage(q_parsed, q_filtered, min_sev="info")) tg.create_task(enrich_stage(q_filtered, q_enriched)) tg.create_task(windowed_aggregator(q_enriched, window=2.0)) # TaskGroup exits only when EVERY task has returned — i.e. the sentinel # has cascaded source -> parse -> filter -> enrich -> aggregator and each # stage returned. Clean, ordered shutdown with no leaked tasks. print("[pipeline] drained and stopped")
def main() -> None: try: asyncio.run(run_pipeline()) except KeyboardInterrupt: pass # already handled via the signal handler / stop event
if __name__ == "__main__": main()The shutdown story is the part worth studying. Ctrl-C sets stop; pump_source
sees it, breaks its async for, the aclosing context closes the generator (its
finally prints [source] stopped), and then it puts a single SENTINEL. That
sentinel flows down the chain — each stage forwards it and returns — until the
aggregator gets it, flushes its final partial window, and returns. The
TaskGroup block exits only once every task has returned. No CancelledError
fireworks, no leaked tasks, no lost in-flight data. That ordered drain is the
difference between a toy and a pipeline you’d ship.
Run it
Section titled “Run it”-
Scaffold and enter the project (or copy the files above into this layout):
Terminal window uv init realtime-pipeline --packagecd realtime-pipelineuv add --dev ruff ty -
Run the pipeline. It prints a window summary every ~2 seconds; let it run a few windows, then hit Ctrl-C and watch the ordered shutdown:
Terminal window uv run pipelineYou’ll see output like:
[pipeline] running — Ctrl-C to stop[window] 41 events | types={'order': 12, 'payment': 9, ...} | regions={'us': 8, ...}[window] 39 events | types={'auth': 10, 'search': 11, ...} | regions={'eu': 9, ...}^C[source] stopped[window] 18 events | types={...} | regions={...}[pipeline] drained and stopped -
Lint and type-check:
Terminal window uv run ruff check .uv run ty check(
tyis the fast Astral type checker; swap inuv run mypy srcif your team standardized on the mature option — the code is annotated for both.)