Skip to content

Real-Time Processing Pipeline

Build a complete in-process real-time processing pipeline with nothing but asyncio: a simulated event source (an async generator) feeds a chain of transform stages (parse → filter → enrich), which feed a windowed aggregator that emits a rolling summary every few seconds. The stages are decoupled by bounded asyncio.Queues so a slow stage applies backpressure to a fast one, and the whole thing shuts down gracefully on Ctrl-C — every stage drains, every resource closes.

If you’ve built a Kotlin Flow pipeline or chained RxJS operators, this is the same shape. The difference: you’ll see the parts — the generator, the queues, the worker tasks — instead of them hiding behind operator names. That’s the point.

  • Async generators as a stream source (async def + yield).
  • Hand-built stream operators (parse, filter, enrich) as async generators that consume async generators.
  • asyncio.Queue(maxsize=...) for backpressure between stages.
  • A windowed aggregator — time-bucketed reduce, the bread-and-butter of real-time analytics.
  • asyncio.TaskGroup to supervise the stages.
  • Graceful shutdown: sentinel propagation, CancelledError handling, and finally/aclose() cleanup.
  1. The source emits simulated events forever (until cancelled), at a slightly irregular rate, as raw str lines (simulating a socket/log feed).
  2. parse turns each raw line into an Event, dropping malformed lines.
  3. filter drops events below a severity threshold.
  4. enrich adds a derived field (a region looked up from an async “service”, with a small simulated latency — this is the stage that benefits from buffering).
  5. The aggregator keeps a tumbling time window (e.g. 2s) and, when the window closes, emits a summary: count and per-type breakdown for that window.
  6. Stages are connected by bounded queues; the pipeline must apply backpressure, not grow memory without bound.
  7. Ctrl-C triggers a clean shutdown: the source stops, in-flight items drain, the final partial window flushes, and every task exits.
Real-time pipeline: stages decoupled by bounded queues
Rendering diagram…

Each stage is a task that reads from its input queue, does its work, and writes to its output queue. The bounded queues are the backpressure: if enrich (slow, it awaits a lookup) falls behind, its input queue fills, which stalls filter, which stalls parse, which stalls the source. The whole chain self-throttles to the slowest stage — exactly like a buffered Go channel pipeline.

A single-module uv project. The whole pipeline lives in src/pipeline/main.py; the data model is a dataclass.

  • Directoryrealtime-pipeline/
    • pyproject.toml uv project, ruff + ty config
    • Directorysrc/
      • Directorypipeline/
        • init .py
        • main.py source, stages, aggregator, wiring, shutdown

No runtime dependencies — asyncio is stdlib. The dev group carries ruff and ty. (If you wanted the operator-library route, this is where aiostream would go; we’re doing it by hand.)

pyproject.toml
[project]
name = "realtime-pipeline"
version = "0.1.0"
description = "An in-process async streaming pipeline with bounded queues"
requires-python = ">=3.13"
dependencies = []
[dependency-groups]
dev = ["ruff", "ty"]
[build-system]
requires = ["uv_build>=0.8"]
build-backend = "uv_build"
[tool.ruff]
target-version = "py313"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "ASYNC"]
[tool.ty.rules]
# ty is young; loosen as needed while it matures.

The source is an async generator that fabricates raw event lines forever. Real feeds are messy, so ~1 in 8 lines is deliberately malformed to exercise the parse stage’s error handling.

src/pipeline/main.py
from __future__ import annotations
import asyncio
import random
import signal
from collections import Counter
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
# --- Data model -----------------------------------------------------------
SEVERITY = {"debug": 10, "info": 20, "warn": 30, "error": 40}
@dataclass(slots=True)
class Event:
ts: float
type: str # e.g. "order", "auth", "payment"
severity: str # debug | info | warn | error
user_id: int
region: str = "" # filled in by the enrich stage
# --- Stage 0: the source (async generator) --------------------------------
_TYPES = ("order", "auth", "payment", "search")
_SEVS = ("debug", "info", "info", "warn", "error") # weighted toward info
async def event_source() -> AsyncIterator[str]:
"""Simulate a raw event feed: emit malformed-ish CSV lines forever."""
seq = 0
try:
while True:
await asyncio.sleep(random.uniform(0.01, 0.06)) # irregular rate
seq += 1
if seq % 8 == 0:
yield "GARBAGE;not;a;valid;line" # malformed
continue
ts = asyncio.get_running_loop().time()
etype = random.choice(_TYPES)
sev = random.choice(_SEVS)
uid = random.randint(1, 5)
yield f"{ts:.3f},{etype},{sev},{uid}"
finally:
print("[source] stopped") # runs on cancel

The finally is the source’s cleanup hook — when the pipeline is cancelled, the generator receives GeneratorExit at its await asyncio.sleep, and finally runs. A real source would close its socket here.

Each stage is a coroutine that loops: read input queue → transform → write output queue, and propagates a sentinel when its input is exhausted so the next stage knows to stop. This is the explicit version of an async-generator chain; queues sit between stages so they run concurrently with backpressure.

src/pipeline/main.py
SENTINEL = object() # the "no more items" signal (asyncio.Queue has no close())
async def parse_stage(src: asyncio.Queue, out: asyncio.Queue) -> None:
"""str -> Event, dropping malformed lines."""
while True:
line = await src.get()
if line is SENTINEL:
await out.put(SENTINEL)
return
parts = line.split(",")
if len(parts) != 4:
continue # drop malformed
try:
event = Event(
ts=float(parts[0]),
type=parts[1],
severity=parts[2],
user_id=int(parts[3]),
)
except ValueError:
continue # drop unparseable
await out.put(event)
async def filter_stage(src: asyncio.Queue, out: asyncio.Queue, min_sev: str) -> None:
"""Drop events below the severity threshold."""
threshold = SEVERITY[min_sev]
while True:
event = await src.get()
if event is SENTINEL:
await out.put(SENTINEL)
return
if SEVERITY.get(event.severity, 0) >= threshold:
await out.put(event)
# A tiny async "service" the enrich stage calls — note the latency.
async def lookup_region(user_id: int) -> str:
await asyncio.sleep(0.03) # simulated I/O — this is the slow stage
return ("us", "eu", "apac", "sa", "af")[user_id % 5]
async def enrich_stage(src: asyncio.Queue, out: asyncio.Queue) -> None:
"""Add a region by calling an async lookup service."""
while True:
event = await src.get()
if event is SENTINEL:
await out.put(SENTINEL)
return
event.region = await lookup_region(event.user_id) # the await that costs
await out.put(event)

enrich_stage is the bottleneck on purpose: every event pays a 30ms lookup. With bounded queues, that latency backpressures all the way to the source, so memory stays flat. Remove the maxsize on the queues and watch the upstream queues grow unbounded — the classic incident.

The aggregator is where streaming gets interesting. It keeps a tumbling window: it accumulates events into the current window and, every window seconds, emits a summary and starts a fresh window. We drive the window boundary with asyncio.wait_for on the queue get() — if no event arrives before the window deadline, the timeout fires and we flush.

src/pipeline/main.py
@dataclass(slots=True)
class WindowSummary:
started_at: float
count: int
by_type: dict[str, int] = field(default_factory=dict)
by_region: dict[str, int] = field(default_factory=dict)
async def windowed_aggregator(src: asyncio.Queue, window: float) -> None:
"""Tumbling window: every `window` seconds, summarize what arrived."""
loop = asyncio.get_running_loop()
deadline = loop.time() + window
by_type: Counter[str] = Counter()
by_region: Counter[str] = Counter()
count = 0
window_start = loop.time()
def flush() -> None:
nonlocal count, window_start, deadline
if count:
summary = WindowSummary(
started_at=window_start,
count=count,
by_type=dict(by_type),
by_region=dict(by_region),
)
print(
f"[window] {summary.count:3d} events | "
f"types={summary.by_type} | regions={summary.by_region}"
)
by_type.clear()
by_region.clear()
count = 0
window_start = loop.time()
deadline = window_start + window
while True:
timeout = max(0.0, deadline - loop.time())
try:
event = await asyncio.wait_for(src.get(), timeout=timeout)
except TimeoutError:
flush() # window elapsed with no new event
continue
if event is SENTINEL:
flush() # flush the final partial window
return
count += 1
by_type[event.type] += 1
by_region[event.region] += 1
if loop.time() >= deadline:
flush() # window elapsed

This is a hand-rolled equivalent of a Kotlin Flow chunked-by-time, or RxJS bufferTime followed by a scan/reduce. The nonlocal closure keeps the window state without a class; the flush() is the only place that resets it, which keeps the boundary logic in one spot.

Wiring it together with TaskGroup and graceful shutdown

Section titled “Wiring it together with TaskGroup and graceful shutdown”

main builds the bounded queues, then a TaskGroup supervises every stage. The source-pump task is what turns the cold generator into a feed: it pulls from the generator and puts into the first queue, and on shutdown it sends the sentinel that cascades down the whole chain.

src/pipeline/main.py
async def pump_source(out: asyncio.Queue, stop: asyncio.Event) -> None:
"""Drive the cold generator into the first queue until told to stop."""
from contextlib import aclosing
async with aclosing(event_source()) as stream: # deterministic cleanup
async for line in stream:
if stop.is_set():
break
await out.put(line) # backpressure happens here
await out.put(SENTINEL) # cascade the done-signal
async def run_pipeline() -> None:
q_raw: asyncio.Queue = asyncio.Queue(maxsize=50)
q_parsed: asyncio.Queue = asyncio.Queue(maxsize=50)
q_filtered: asyncio.Queue = asyncio.Queue(maxsize=50)
q_enriched: asyncio.Queue = asyncio.Queue(maxsize=50)
stop = asyncio.Event()
# Ctrl-C / SIGTERM set the stop event; the source then drains cleanly.
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop.set)
print("[pipeline] running — Ctrl-C to stop")
async with asyncio.TaskGroup() as tg:
tg.create_task(pump_source(q_raw, stop))
tg.create_task(parse_stage(q_raw, q_parsed))
tg.create_task(filter_stage(q_parsed, q_filtered, min_sev="info"))
tg.create_task(enrich_stage(q_filtered, q_enriched))
tg.create_task(windowed_aggregator(q_enriched, window=2.0))
# TaskGroup exits only when EVERY task has returned — i.e. the sentinel
# has cascaded source -> parse -> filter -> enrich -> aggregator and each
# stage returned. Clean, ordered shutdown with no leaked tasks.
print("[pipeline] drained and stopped")
def main() -> None:
try:
asyncio.run(run_pipeline())
except KeyboardInterrupt:
pass # already handled via the signal handler / stop event
if __name__ == "__main__":
main()

The shutdown story is the part worth studying. Ctrl-C sets stop; pump_source sees it, breaks its async for, the aclosing context closes the generator (its finally prints [source] stopped), and then it puts a single SENTINEL. That sentinel flows down the chain — each stage forwards it and returns — until the aggregator gets it, flushes its final partial window, and returns. The TaskGroup block exits only once every task has returned. No CancelledError fireworks, no leaked tasks, no lost in-flight data. That ordered drain is the difference between a toy and a pipeline you’d ship.

  1. Scaffold and enter the project (or copy the files above into this layout):

    Terminal window
    uv init realtime-pipeline --package
    cd realtime-pipeline
    uv add --dev ruff ty
  2. Run the pipeline. It prints a window summary every ~2 seconds; let it run a few windows, then hit Ctrl-C and watch the ordered shutdown:

    Terminal window
    uv run pipeline

    You’ll see output like:

    [pipeline] running — Ctrl-C to stop
    [window] 41 events | types={'order': 12, 'payment': 9, ...} | regions={'us': 8, ...}
    [window] 39 events | types={'auth': 10, 'search': 11, ...} | regions={'eu': 9, ...}
    ^C[source] stopped
    [window] 18 events | types={...} | regions={...}
    [pipeline] drained and stopped
  3. Lint and type-check:

    Terminal window
    uv run ruff check .
    uv run ty check

    (ty is the fast Astral type checker; swap in uv run mypy src if your team standardized on the mature option — the code is annotated for both.)