Streams & Background Work
Module 05 gave you async/await,
TaskGroup, timeouts, and cancellation — the tools for running a finite set of
concurrent operations and waiting for them. This module is about streams:
data that arrives over time, one item at a time, possibly forever — and the
background work that processes it.
If you’ve reached for RxJS Observables, Kotlin Flow, or a Go channel pipeline,
you already have the mental model. Python’s primitive is humbler and more
built-in than any of those: a function with yield in it. An async generator
is a lazy, backpressure-aware, cancellable async stream, and async for is how
you consume one. There’s no library to import — it’s syntax.
The mental model
Section titled “The mental model”| Concept | TypeScript | Go | Python |
|---|---|---|---|
| Lazy async stream | AsyncIterable / RxJS Observable | <-chan T + goroutine | async generator (async def + yield) |
| Consume it | for await (const x of s) | for x := range ch | async for x in s |
| Transform | .pipe(map(...)) | goroutine reads/writes channels | async for in another async gen |
| Buffer / backpressure | RxJS bufferCount, push-based | buffered channel (make(chan T, n)) | asyncio.Queue(maxsize=n) |
| Fan-out / fan-in | merge, mergeMap | multiple goroutines + one channel | multiple tasks + one Queue |
| Background worker | setInterval, detached promise | go worker() | asyncio.create_task / TaskGroup |
| Cancellation | AbortController / unsubscribe() | close(ch) / ctx.Done() | cancel the task; GeneratorExit |
| Cleanup | finally in the generator | defer close(ch) | finally / aclose() |
Two distinct shapes hide in that table, and the whole module hinges on telling them apart:
- Pull-based streams (async generators) — cold. Nothing runs until a
consumer pulls with
async for. The consumer sets the pace; the producer suspends atyielduntil asked for the next value. This is KotlinFlowand RxJS cold Observables. - Push-based queues (
asyncio.Queue) — hot. A producer task runs on its own and pushes items in; a consumer task pulls them out. The queue’smaxsizeis your backpressure knob. This is a Go channel.
You build pipelines out of async generators, and you decouple producers from consumers (fan-out, fan-in, hot sources) with queues. Let’s build both.
Async generators: lazy async streams
Section titled “Async generators: lazy async streams”A regular generator yields values lazily and synchronously. Make the function
async def and you can await between yields — now it’s an async generator,
and you consume it with async for.
// Async generator — async function* + yieldasync function* ticker(n: number): AsyncGenerator<number> { for (let i = 1; i <= n; i++) { await sleep(100); // await between yields yield i; }}
for await (const x of ticker(3)) { console.log("got", x);}// Go has no generators — you model a stream as a channel + goroutine.func ticker(ctx context.Context, n int) <-chan int { ch := make(chan int) go func() { defer close(ch) // signals "done" to the range loop for i := 1; i <= n; i++ { select { case <-time.After(100 * time.Millisecond): case <-ctx.Done(): return // cancellation } ch <- i } }() return ch}
for x := range ticker(ctx, 3) { fmt.Println("got", x)}import asynciofrom collections.abc import AsyncIterator
async def ticker(n: int) -> AsyncIterator[int]: for i in range(1, n + 1): await asyncio.sleep(0.1) # await between yields yield i
async def main() -> None: async for x in ticker(3): print("got", x)
asyncio.run(main())Key differences:
- TS:
async function*+for await. Same idea, near-identical syntax. - Go: no generators at all — you fake a stream with a channel and a
goroutine. The goroutine is hot (runs immediately);
close(ch)is your “stream done”; cancellation is manual viactx. - Python:
async defwithyieldmakes the function an async generator. It’s cold — callingticker(3)runs nothing; the body only advances whenasync forpulls. Cancellation is automatic (cancel the consuming task and the generator gets aGeneratorExitat its currentyield).
Cold, like Kotlin Flow
Section titled “Cold, like Kotlin Flow”Just like a Kotlin Flow or an RxJS cold Observable, an async generator does
nothing until consumed, and each consumption restarts it from scratch:
async def source() -> AsyncIterator[int]: print("source started") # only prints when consumed for i in range(3): yield i
async def main() -> None: gen = source() # prints nothing print("created, nothing ran yet")
async for x in gen: # NOW "source started" prints print("first pass", x)
# A generator object is single-use — exhausted after one pass. # To run again, call source() again for a fresh generator. async for x in source(): print("second pass", x)Async comprehensions
Section titled “Async comprehensions”Comprehensions go async too: use async for inside them. Handy for draining a
stream into a list, or building one stream from another inline.
# Drain an async stream into a list (terminal — like Flow.toList())nums = [x async for x in ticker(5)]
# Async set / dict comprehensions work the same wayseen = {x async for x in ticker(5)}by_id = {item.id: item async for item in fetch_items()}
# A generator expression that is itself an async generator:squared = (x * x async for x in ticker(5))async for y in squared: print(y)
# You can mix a sync condition into an async comprehension:evens = [x async for x in ticker(10) if x % 2 == 0]The list comprehension form [... async for ...] is the idiomatic Python
equivalent of flow.toList() or RxJS lastValueFrom(toArray()) — it runs the
cold stream to completion and collects the results. Use it only on bounded
streams; an infinite generator will hang it forever.
Building stream operators by hand
Section titled “Building stream operators by hand”Kotlin gives you .map, .filter, .buffer, .debounce as built-in Flow
operators. RxJS gives you a hundred of them. Python’s stdlib gives you… async for and yield. That’s deliberate: an operator is just an async generator that
consumes another async generator. You write the handful you need, and they
compose exactly like a Unix pipe.
map and filter
Section titled “map and filter”source$.pipe( map(x => x * 2), filter(x => x > 5),);// Each operator is a goroutine: read from in, write to out.func mapStage(in <-chan int, f func(int) int) <-chan int { out := make(chan int) go func() { defer close(out) for x := range in { out <- f(x) } }() return out}from collections.abc import AsyncIterator, Callable, Awaitable
async def amap[T, R]( src: AsyncIterator[T], fn: Callable[[T], R],) -> AsyncIterator[R]: async for x in src: yield fn(x)
async def afilter[T]( src: AsyncIterator[T], pred: Callable[[T], bool],) -> AsyncIterator[T]: async for x in src: if pred(x): yield xNote the PEP 695 generics (def amap[T, R](...)) — no TypeVar boilerplate.
Each operator takes a source stream and returns a new stream; nothing executes
until the final async for pulls. That’s the cold, pull-driven model paying off:
afilter only asks amap for a value when its own consumer asks it for one.
Composing into a pipeline
Section titled “Composing into a pipeline”Because each operator returns an AsyncIterator, you nest them — the innermost
call is the source, each wrapping call is a stage:
async def main() -> None: pipeline = afilter( amap(ticker(10), lambda x: x * 2), lambda x: x > 5, ) async for x in pipeline: print(x) # 6, 8, 10, 12, ... (skips 2, 4)Nested calls read inside-out, which gets ugly fast. A tiny pipe helper flips it
to read top-to-bottom, like a Kotlin operator chain or a .pipe() in RxJS:
from collections.abc import AsyncIterator, Callablefrom functools import reduce
type Stage[T, R] = Callable[[AsyncIterator[T]], AsyncIterator[R]]
def pipe[T](src: AsyncIterator[T], *stages: Stage) -> AsyncIterator: """Thread a source through a series of stage factories, left to right.""" return reduce(lambda acc, stage: stage(acc), stages, src)
async def main() -> None: result = pipe( ticker(10), lambda s: amap(s, lambda x: x * 2), lambda s: afilter(s, lambda x: x > 5), ) async for x in result: print(x)flowchart LR S["ticker(10)"] -->|"async for"| M["amap: x*2"] M -->|"async for"| F["afilter: x>5"] F -->|"async for"| C["consumer / list comp"] C -.->|"pulls next"| F F -.->|"pulls next"| M M -.->|"pulls next"| S
The dashed arrows are the point: demand flows backwards. The consumer pulls, which pulls the filter, which pulls the map, which pulls the source. No item is produced until someone downstream wants it — backpressure for free.
batch (chunked)
Section titled “batch (chunked)”Grouping a stream into fixed-size batches is the single most useful custom
operator for backends — you batch rows before a bulk INSERT, events before a
flush. Kotlin calls it chunked; RxJS calls it bufferCount.
from collections.abc import AsyncIterator
async def batch[T](src: AsyncIterator[T], size: int) -> AsyncIterator[list[T]]: buf: list[T] = [] async for x in src: buf.append(x) if len(buf) >= size: yield buf buf = [] if buf: # flush the partial final batch yield buf
async def main() -> None: async for chunk in batch(ticker(25), 10): print(f"batch of {len(chunk)}: {chunk}") # batch of 10: [1..10] # batch of 10: [11..20] # batch of 5: [21..25]throttle and debounce
Section titled “throttle and debounce”These are time-based and where people usually reach for a library — but they’re small. Throttle emits at most once per interval (rate limiting a firehose); debounce waits for a quiet gap before emitting the latest (search-as-you-type).
import asynciofrom collections.abc import AsyncIterator
async def throttle[T](src: AsyncIterator[T], interval: float) -> AsyncIterator[T]: """Emit the first value, then at most one value per `interval` seconds.""" loop = asyncio.get_running_loop() last = 0.0 async for x in src: now = loop.time() if now - last >= interval: last = now yield x
async def debounce[T](src: AsyncIterator[T], delay: float) -> AsyncIterator[T]: """Emit a value only after `delay` seconds of silence; keep the latest.""" queue: asyncio.Queue[T] = asyncio.Queue()
async def feed() -> None: async for x in src: await queue.put(x) await queue.put(_DONE) # sentinel
task = asyncio.create_task(feed()) try: pending: T | None = None while True: if pending is None: item = await queue.get() # block for first item else: try: item = await asyncio.wait_for(queue.get(), timeout=delay) except TimeoutError: yield pending # quiet gap → emit latest pending = None continue if item is _DONE: if pending is not None: yield pending return pending = item finally: task.cancel()
_DONE = object() # module-level sentinelDebounce is the first operator that needs an internal task and queue, because it
has to consume ahead of what it emits (to know when the source went quiet). It
also shows the cleanup discipline this module keeps hammering: the finally
cancels the feeder task no matter how the consumer leaves.
merge: fan-in of multiple streams
Section titled “merge: fan-in of multiple streams”Merging interleaves several streams into one as values arrive — RxJS merge,
Kotlin merge, the Go fan-in pattern. You can’t async for two generators at
once, so you spin up a task per source feeding a shared queue:
import asynciofrom collections.abc import AsyncIterator
async def merge[T](*sources: AsyncIterator[T]) -> AsyncIterator[T]: queue: asyncio.Queue[T] = asyncio.Queue() _DONE = object()
async def drain(src: AsyncIterator[T]) -> None: async for x in src: await queue.put(x)
async def run_all() -> None: async with asyncio.TaskGroup() as tg: for src in sources: tg.create_task(drain(src)) await queue.put(_DONE) # all drains finished
runner = asyncio.create_task(run_all()) try: while True: item = await queue.get() if item is _DONE: return yield item finally: runner.cancel()This is the bridge from cold pull-based generators to hot push-based queues — which is exactly the next section.
Backpressure with asyncio.Queue
Section titled “Backpressure with asyncio.Queue”An async generator’s backpressure is implicit: the producer can’t get ahead
because it’s suspended at yield. But sometimes you want the producer to run
ahead — to decouple a bursty source from a steady consumer, or to fan work out to
several workers. That’s what asyncio.Queue is for, and it’s the direct analog
of a buffered Go channel.
The single most important parameter is maxsize. A bounded queue (maxsize > 0)
gives you backpressure: when the queue is full, await queue.put(x) suspends
the producer until a consumer makes room. An unbounded queue (maxsize=0,
the default) will happily grow until you run out of memory if the producer
outpaces the consumer — a real production incident, not a hypothetical.
// Node has no built-in bounded async queue. You either use a library// (e.g. async generators with a semaphore) or a stream with highWaterMark:import { Readable } from "node:stream";const stream = new Readable({ highWaterMark: 100 /* backpressure */ });// A bounded buffered channel IS the queue. Send blocks when full.ch := make(chan Job, 100) // maxsize = 100
go func() { for _, job := range jobs { ch <- job // blocks when the buffer is full → backpressure } close(ch)}()
for job := range ch { // consumer process(job)}import asyncio
queue: asyncio.Queue[Job] = asyncio.Queue(maxsize=100) # bounded
async def producer() -> None: for job in jobs: await queue.put(job) # suspends when full → backpressure await queue.put(SENTINEL) # tell the consumer we're done
async def consumer() -> None: while True: job = await queue.get() if job is SENTINEL: break await process(job) queue.task_done()Key differences:
- Go: the buffered channel is the bounded queue, and
close(ch)cleanly signals “no more values” to everyrangeloop. Idiomatic and complete. - Python:
asyncio.Queue(maxsize=n)mirrors the buffered channel, but it has no close — there is no built-in “done” signal. You signal completion yourself, usually with a sentinel object, or withjoin()/task_done()bookkeeping. This trips up Go developers constantly. - TS: no first-class bounded async queue in the stdlib; you lean on Node
streams’
highWaterMarkor a library.
Producer / consumer pipeline
Section titled “Producer / consumer pipeline”Here’s the full bounded producer/consumer, supervised by a TaskGroup so a
crash in either side tears down both cleanly:
import asyncio
SENTINEL = object()
async def producer(queue: asyncio.Queue[object]) -> None: for i in range(20): await asyncio.sleep(0.05) # bursty source await queue.put(i) print(f" produced {i} (queue size {queue.qsize()})") await queue.put(SENTINEL)
async def consumer(queue: asyncio.Queue[object]) -> None: while True: item = await queue.get() if item is SENTINEL: queue.task_done() return await asyncio.sleep(0.2) # slow consumer print(f"consumed {item}") queue.task_done()
async def main() -> None: queue: asyncio.Queue[object] = asyncio.Queue(maxsize=5) async with asyncio.TaskGroup() as tg: tg.create_task(producer(queue)) tg.create_task(consumer(queue))
asyncio.run(main())Watch the output: the producer races ahead until queue.qsize() hits 5, then
stalls — await queue.put(...) is suspended — until the slow consumer drains
one. That stall is backpressure working. Bump maxsize to 0 and the producer
will blast all 20 items in before the consumer finishes its first, and the queue
balloons. This is the lesson; feel it by changing the number.
Fan-out / fan-in
Section titled “Fan-out / fan-in”To go faster, fan the work out to N consumer tasks pulling from one queue, then
fan their results back into a results queue. This is a goroutine worker pool, or
RxJS mergeMap with a concurrency limit.
import asyncio
SENTINEL = object()
async def worker(name: str, jobs: asyncio.Queue, results: asyncio.Queue) -> None: while True: job = await jobs.get() if job is SENTINEL: jobs.task_done() return await asyncio.sleep(0.1) # simulate work await results.put(f"{name} did job {job}") jobs.task_done()
async def main() -> None: jobs: asyncio.Queue = asyncio.Queue(maxsize=10) results: asyncio.Queue = asyncio.Queue() n_workers = 4
async with asyncio.TaskGroup() as tg: # Fan-out: N workers share the jobs queue. for i in range(n_workers): tg.create_task(worker(f"w{i}", jobs, results))
# Feed jobs, then one sentinel per worker so each one exits. async def feed() -> None: for j in range(20): await jobs.put(j) for _ in range(n_workers): await jobs.put(SENTINEL) tg.create_task(feed())
# Fan-in: collect exactly 20 results. async def collect() -> None: for _ in range(20): print(await results.get()) tg.create_task(collect())
asyncio.run(main())flowchart LR P["producer / feed"] --> Q["jobs Queue<br/>maxsize=10"] Q --> W0["worker w0"] Q --> W1["worker w1"] Q --> W2["worker w2"] Q --> W3["worker w3"] W0 --> R["results Queue"] W1 --> R W2 --> R W3 --> R R --> C["collector"]
The “one sentinel per worker” detail is the Python tax for the missing
close(): each worker consumes exactly one sentinel and exits, so you send N of
them. (The cleaner alternative for unknown counts: await jobs.join() then
tg.create_task cancellation — but inside a TaskGroup, sentinels keep every
task finishing on its own, which TaskGroup prefers.)
Long-running background tasks
Section titled “Long-running background tasks”A stream pipeline is one kind of background work. The other is the detached worker: a task you kick off that runs alongside your request handlers — a metrics flusher, a cache warmer, a queue drainer. This is where Python’s footguns live.
create_task and the “fire and forget” trap
Section titled “create_task and the “fire and forget” trap”// "Fire and forget" — the floating-promise footgun.doBackgroundWork(); // ⚠️ unhandled rejection if it throws
// Better: keep the reference and attach a handler.const p = doBackgroundWork();p.catch(err => log.error(err));// A goroutine is fire-and-forget by design.go backgroundWork() // if it panics, it can crash the whole process
// Discipline: recover inside, and use a context for shutdown.go func() { defer func() { recover() }() backgroundWork(ctx)}()import asyncio
# ⚠️ THE TRAP: the event loop only keeps a *weak* reference to the task.# If nothing holds a strong reference, the GC can collect it mid-flight —# the task silently vanishes, and exceptions disappear with it.asyncio.create_task(background_work()) # may never finish!
# ✅ Keep a strong reference until it's done._tasks: set[asyncio.Task] = set()
def spawn(coro) -> asyncio.Task: task = asyncio.create_task(coro) _tasks.add(task) task.add_done_callback(_tasks.discard) # let it go when finished return taskTaskGroup for supervised background work
Section titled “TaskGroup for supervised background work”For background tasks with a known lifetime (the life of a request, a batch, a
test), asyncio.TaskGroup is the right tool, exactly as in
Module 05. It keeps strong references,
propagates exceptions, and cancels siblings on failure. The catch: the async with block waits for every task to finish before exiting — so it’s for work
that ends, not for daemons that run forever.
import asyncio
async def handle_batch(items: list[int]) -> None: async with asyncio.TaskGroup() as tg: for item in items: tg.create_task(process(item)) # All tasks done here. If any raised, an ExceptionGroup propagates # and the others were cancelled. No leaked tasks. No weak-ref trap.Graceful shutdown of a daemon task
Section titled “Graceful shutdown of a daemon task”For a task that runs for the lifetime of the service (not a request), you own
it explicitly: keep the reference, cancel it on shutdown, and await it so its
finally cleanup runs. This is the pattern FastAPI’s lifespan wraps for you
(forward reference: Module 07).
import asyncioimport contextlib
async def metrics_flusher(interval: float) -> None: try: while True: await asyncio.sleep(interval) await flush_metrics() except asyncio.CancelledError: await flush_metrics() # final flush on shutdown raise # always re-raise CancelledError! finally: print("flusher stopped")
async def main() -> None: task = asyncio.create_task(metrics_flusher(5.0)) try: await run_service() # the app's real work finally: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task # wait for the finally/cleanup to runPeriodic work: the interval loop
Section titled “Periodic work: the interval loop”The asyncio-native “do this every N seconds” is a while True + await asyncio.sleep(n), run as a background task. It’s the equivalent of
setInterval (TS) or a time.Ticker loop (Go) — but it’s a real cancellable
coroutine, so cleanup is structured.
import asyncio
async def every(interval: float, fn) -> None: """Run fn() every `interval` seconds until cancelled.""" while True: await asyncio.sleep(interval) try: await fn() except Exception: log.exception("periodic task failed") # don't let one failure kill the loopCancellation and cleanup in streams
Section titled “Cancellation and cleanup in streams”Because async generators can run forever and hold resources (a file handle, a DB cursor, a websocket), cleanup is not optional. Two mechanisms cover it.
finally runs on cancellation
Section titled “finally runs on cancellation”When the consumer stops early (a break, a cancellation, an exception
downstream), Python throws GeneratorExit into the generator at its suspended
yield. Your finally block runs — this is where you close resources.
from collections.abc import AsyncIterator
async def tail_file(path: str) -> AsyncIterator[str]: f = await aopen(path) try: while True: line = await f.readline() if line: yield line else: await asyncio.sleep(0.1) # poll for new lines finally: await f.close() # ALWAYS runs — even on break/cancel print("file closed")
async def main() -> None: async for line in tail_file("app.log"): print(line) if "FATAL" in line: break # triggers GeneratorExit → finally → file closedaclose() for explicit, deterministic cleanup
Section titled “aclose() for explicit, deterministic cleanup”A plain async for that runs to completion or breaks will eventually close
the generator — but “eventually” depends on the GC, and on the event loop still
running. For deterministic cleanup (and to avoid “coroutine ignored
GeneratorExit” warnings at shutdown), close it yourself with aclose(), or use
contextlib.aclosing to make it async with-scoped:
import asynciofrom contextlib import aclosing
async def main() -> None: # aclosing() guarantees gen.aclose() runs when the block exits, # exactly like Go's `defer close(ch)` or a Kotlin Flow's onCompletion. async with aclosing(tail_file("app.log")) as stream: async for line in stream: print(line) if "FATAL" in line: break # gen.aclose() has run here: the file is closed, finally executed, # deterministically, before we move on.Key differences:
- Go:
defer close(ch)in the producer goroutine; consumers see the closed channel. Cleanup is the producer’s job and runs at function exit. - Kotlin:
Flowcleanup goes inonCompletion {}/flow {}’s owntry/finally; cancelling the collector triggers it. - Python:
try/finallyinside the generator, andaclosing()at the consumer for deterministic, scoped teardown. Preferaclosingover relying on the GC.
Streaming responses (preview)
Section titled “Streaming responses (preview)”The payoff for all of this on the web tier: an async generator is a streaming HTTP response. Server-Sent Events (SSE), chunked responses, and streamed LLM tokens are all “yield bytes/events over time,” and FastAPI consumes an async generator directly.
# Preview — full treatment in Module 07.from fastapi import FastAPIfrom fastapi.responses import StreamingResponse
app = FastAPI()
async def event_stream() -> AsyncIterator[str]: for i in range(10): await asyncio.sleep(1) yield f"data: tick {i}\n\n" # SSE wire format
@app.get("/stream")async def stream() -> StreamingResponse: return StreamingResponse(event_stream(), media_type="text/event-stream")The same async generator that powered your in-memory pipeline becomes an
HTTP stream with zero changes to the generator itself — the cold, pull-driven
model means FastAPI pulls one chunk, sends it, and pulls the next only when the
socket is ready. Backpressure all the way to the client.
Module 07 does this properly: SSE, StreamingResponse,
and cancellation when the client disconnects.
Choosing the right tool
Section titled “Choosing the right tool”| You have… | Use | Why |
|---|---|---|
| A transformation over a stream of items | async generator + async for | Cold, composable, automatic backpressure |
| A bursty producer + a steady consumer | asyncio.Queue(maxsize=n) | Decouple them; the buffer absorbs bursts |
| Multiple producers → one consumer | Queue, one task per producer | Fan-in |
| One stream → N parallel workers | Queue + N worker tasks | Fan-out worker pool |
| Background work with a known end | TaskGroup | Supervised, no leaks, exceptions propagate |
| A daemon for the service’s lifetime | create_task + keep ref + cancel on shutdown | Explicit lifecycle, graceful cleanup |
| Periodic in-process work | while True + asyncio.sleep | The asyncio setInterval |
| Persistent / distributed / cron jobs | APScheduler / Arq | A loop is not a job queue |
| A rich operator library | aiostream | Don’t reinvent RxJS |
Practice
Section titled “Practice”Build a real in-process streaming pipeline end to end — an async-generator source, transform stages, bounded queues for backpressure, a windowed aggregator, and graceful shutdown.