Skip to content

Streams & Background Work

Module 05 gave you async/await, TaskGroup, timeouts, and cancellation — the tools for running a finite set of concurrent operations and waiting for them. This module is about streams: data that arrives over time, one item at a time, possibly forever — and the background work that processes it.

If you’ve reached for RxJS Observables, Kotlin Flow, or a Go channel pipeline, you already have the mental model. Python’s primitive is humbler and more built-in than any of those: a function with yield in it. An async generator is a lazy, backpressure-aware, cancellable async stream, and async for is how you consume one. There’s no library to import — it’s syntax.

ConceptTypeScriptGoPython
Lazy async streamAsyncIterable / RxJS Observable<-chan T + goroutineasync generator (async def + yield)
Consume itfor await (const x of s)for x := range chasync for x in s
Transform.pipe(map(...))goroutine reads/writes channelsasync for in another async gen
Buffer / backpressureRxJS bufferCount, push-basedbuffered channel (make(chan T, n))asyncio.Queue(maxsize=n)
Fan-out / fan-inmerge, mergeMapmultiple goroutines + one channelmultiple tasks + one Queue
Background workersetInterval, detached promisego worker()asyncio.create_task / TaskGroup
CancellationAbortController / unsubscribe()close(ch) / ctx.Done()cancel the task; GeneratorExit
Cleanupfinally in the generatordefer close(ch)finally / aclose()

Two distinct shapes hide in that table, and the whole module hinges on telling them apart:

  • Pull-based streams (async generators) — cold. Nothing runs until a consumer pulls with async for. The consumer sets the pace; the producer suspends at yield until asked for the next value. This is Kotlin Flow and RxJS cold Observables.
  • Push-based queues (asyncio.Queue) — hot. A producer task runs on its own and pushes items in; a consumer task pulls them out. The queue’s maxsize is your backpressure knob. This is a Go channel.

You build pipelines out of async generators, and you decouple producers from consumers (fan-out, fan-in, hot sources) with queues. Let’s build both.

A regular generator yields values lazily and synchronously. Make the function async def and you can await between yields — now it’s an async generator, and you consume it with async for.

// Async generator — async function* + yield
async function* ticker(n: number): AsyncGenerator<number> {
for (let i = 1; i <= n; i++) {
await sleep(100); // await between yields
yield i;
}
}
for await (const x of ticker(3)) {
console.log("got", x);
}

Key differences:

  • TS: async function* + for await. Same idea, near-identical syntax.
  • Go: no generators at all — you fake a stream with a channel and a goroutine. The goroutine is hot (runs immediately); close(ch) is your “stream done”; cancellation is manual via ctx.
  • Python: async def with yield makes the function an async generator. It’s cold — calling ticker(3) runs nothing; the body only advances when async for pulls. Cancellation is automatic (cancel the consuming task and the generator gets a GeneratorExit at its current yield).

Just like a Kotlin Flow or an RxJS cold Observable, an async generator does nothing until consumed, and each consumption restarts it from scratch:

async def source() -> AsyncIterator[int]:
print("source started") # only prints when consumed
for i in range(3):
yield i
async def main() -> None:
gen = source() # prints nothing
print("created, nothing ran yet")
async for x in gen: # NOW "source started" prints
print("first pass", x)
# A generator object is single-use — exhausted after one pass.
# To run again, call source() again for a fresh generator.
async for x in source():
print("second pass", x)

Comprehensions go async too: use async for inside them. Handy for draining a stream into a list, or building one stream from another inline.

# Drain an async stream into a list (terminal — like Flow.toList())
nums = [x async for x in ticker(5)]
# Async set / dict comprehensions work the same way
seen = {x async for x in ticker(5)}
by_id = {item.id: item async for item in fetch_items()}
# A generator expression that is itself an async generator:
squared = (x * x async for x in ticker(5))
async for y in squared:
print(y)
# You can mix a sync condition into an async comprehension:
evens = [x async for x in ticker(10) if x % 2 == 0]

The list comprehension form [... async for ...] is the idiomatic Python equivalent of flow.toList() or RxJS lastValueFrom(toArray()) — it runs the cold stream to completion and collects the results. Use it only on bounded streams; an infinite generator will hang it forever.

Kotlin gives you .map, .filter, .buffer, .debounce as built-in Flow operators. RxJS gives you a hundred of them. Python’s stdlib gives you… async for and yield. That’s deliberate: an operator is just an async generator that consumes another async generator. You write the handful you need, and they compose exactly like a Unix pipe.

source$.pipe(
map(x => x * 2),
filter(x => x > 5),
);

Note the PEP 695 generics (def amap[T, R](...)) — no TypeVar boilerplate. Each operator takes a source stream and returns a new stream; nothing executes until the final async for pulls. That’s the cold, pull-driven model paying off: afilter only asks amap for a value when its own consumer asks it for one.

Because each operator returns an AsyncIterator, you nest them — the innermost call is the source, each wrapping call is a stage:

async def main() -> None:
pipeline = afilter(
amap(ticker(10), lambda x: x * 2),
lambda x: x > 5,
)
async for x in pipeline:
print(x) # 6, 8, 10, 12, ... (skips 2, 4)

Nested calls read inside-out, which gets ugly fast. A tiny pipe helper flips it to read top-to-bottom, like a Kotlin operator chain or a .pipe() in RxJS:

from collections.abc import AsyncIterator, Callable
from functools import reduce
type Stage[T, R] = Callable[[AsyncIterator[T]], AsyncIterator[R]]
def pipe[T](src: AsyncIterator[T], *stages: Stage) -> AsyncIterator:
"""Thread a source through a series of stage factories, left to right."""
return reduce(lambda acc, stage: stage(acc), stages, src)
async def main() -> None:
result = pipe(
ticker(10),
lambda s: amap(s, lambda x: x * 2),
lambda s: afilter(s, lambda x: x > 5),
)
async for x in result:
print(x)
Async generator pipeline (pull-driven)
Rendering diagram…

The dashed arrows are the point: demand flows backwards. The consumer pulls, which pulls the filter, which pulls the map, which pulls the source. No item is produced until someone downstream wants it — backpressure for free.

Grouping a stream into fixed-size batches is the single most useful custom operator for backends — you batch rows before a bulk INSERT, events before a flush. Kotlin calls it chunked; RxJS calls it bufferCount.

from collections.abc import AsyncIterator
async def batch[T](src: AsyncIterator[T], size: int) -> AsyncIterator[list[T]]:
buf: list[T] = []
async for x in src:
buf.append(x)
if len(buf) >= size:
yield buf
buf = []
if buf: # flush the partial final batch
yield buf
async def main() -> None:
async for chunk in batch(ticker(25), 10):
print(f"batch of {len(chunk)}: {chunk}")
# batch of 10: [1..10]
# batch of 10: [11..20]
# batch of 5: [21..25]

These are time-based and where people usually reach for a library — but they’re small. Throttle emits at most once per interval (rate limiting a firehose); debounce waits for a quiet gap before emitting the latest (search-as-you-type).

import asyncio
from collections.abc import AsyncIterator
async def throttle[T](src: AsyncIterator[T], interval: float) -> AsyncIterator[T]:
"""Emit the first value, then at most one value per `interval` seconds."""
loop = asyncio.get_running_loop()
last = 0.0
async for x in src:
now = loop.time()
if now - last >= interval:
last = now
yield x
async def debounce[T](src: AsyncIterator[T], delay: float) -> AsyncIterator[T]:
"""Emit a value only after `delay` seconds of silence; keep the latest."""
queue: asyncio.Queue[T] = asyncio.Queue()
async def feed() -> None:
async for x in src:
await queue.put(x)
await queue.put(_DONE) # sentinel
task = asyncio.create_task(feed())
try:
pending: T | None = None
while True:
if pending is None:
item = await queue.get() # block for first item
else:
try:
item = await asyncio.wait_for(queue.get(), timeout=delay)
except TimeoutError:
yield pending # quiet gap → emit latest
pending = None
continue
if item is _DONE:
if pending is not None:
yield pending
return
pending = item
finally:
task.cancel()
_DONE = object() # module-level sentinel

Debounce is the first operator that needs an internal task and queue, because it has to consume ahead of what it emits (to know when the source went quiet). It also shows the cleanup discipline this module keeps hammering: the finally cancels the feeder task no matter how the consumer leaves.

Merging interleaves several streams into one as values arrive — RxJS merge, Kotlin merge, the Go fan-in pattern. You can’t async for two generators at once, so you spin up a task per source feeding a shared queue:

import asyncio
from collections.abc import AsyncIterator
async def merge[T](*sources: AsyncIterator[T]) -> AsyncIterator[T]:
queue: asyncio.Queue[T] = asyncio.Queue()
_DONE = object()
async def drain(src: AsyncIterator[T]) -> None:
async for x in src:
await queue.put(x)
async def run_all() -> None:
async with asyncio.TaskGroup() as tg:
for src in sources:
tg.create_task(drain(src))
await queue.put(_DONE) # all drains finished
runner = asyncio.create_task(run_all())
try:
while True:
item = await queue.get()
if item is _DONE:
return
yield item
finally:
runner.cancel()

This is the bridge from cold pull-based generators to hot push-based queues — which is exactly the next section.

An async generator’s backpressure is implicit: the producer can’t get ahead because it’s suspended at yield. But sometimes you want the producer to run ahead — to decouple a bursty source from a steady consumer, or to fan work out to several workers. That’s what asyncio.Queue is for, and it’s the direct analog of a buffered Go channel.

The single most important parameter is maxsize. A bounded queue (maxsize > 0) gives you backpressure: when the queue is full, await queue.put(x) suspends the producer until a consumer makes room. An unbounded queue (maxsize=0, the default) will happily grow until you run out of memory if the producer outpaces the consumer — a real production incident, not a hypothetical.

// Node has no built-in bounded async queue. You either use a library
// (e.g. async generators with a semaphore) or a stream with highWaterMark:
import { Readable } from "node:stream";
const stream = new Readable({ highWaterMark: 100 /* backpressure */ });

Key differences:

  • Go: the buffered channel is the bounded queue, and close(ch) cleanly signals “no more values” to every range loop. Idiomatic and complete.
  • Python: asyncio.Queue(maxsize=n) mirrors the buffered channel, but it has no close — there is no built-in “done” signal. You signal completion yourself, usually with a sentinel object, or with join()/task_done() bookkeeping. This trips up Go developers constantly.
  • TS: no first-class bounded async queue in the stdlib; you lean on Node streams’ highWaterMark or a library.

Here’s the full bounded producer/consumer, supervised by a TaskGroup so a crash in either side tears down both cleanly:

pipeline.py
import asyncio
SENTINEL = object()
async def producer(queue: asyncio.Queue[object]) -> None:
for i in range(20):
await asyncio.sleep(0.05) # bursty source
await queue.put(i)
print(f" produced {i} (queue size {queue.qsize()})")
await queue.put(SENTINEL)
async def consumer(queue: asyncio.Queue[object]) -> None:
while True:
item = await queue.get()
if item is SENTINEL:
queue.task_done()
return
await asyncio.sleep(0.2) # slow consumer
print(f"consumed {item}")
queue.task_done()
async def main() -> None:
queue: asyncio.Queue[object] = asyncio.Queue(maxsize=5)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue))
tg.create_task(consumer(queue))
asyncio.run(main())

Watch the output: the producer races ahead until queue.qsize() hits 5, then stallsawait queue.put(...) is suspended — until the slow consumer drains one. That stall is backpressure working. Bump maxsize to 0 and the producer will blast all 20 items in before the consumer finishes its first, and the queue balloons. This is the lesson; feel it by changing the number.

To go faster, fan the work out to N consumer tasks pulling from one queue, then fan their results back into a results queue. This is a goroutine worker pool, or RxJS mergeMap with a concurrency limit.

fanout.py
import asyncio
SENTINEL = object()
async def worker(name: str, jobs: asyncio.Queue, results: asyncio.Queue) -> None:
while True:
job = await jobs.get()
if job is SENTINEL:
jobs.task_done()
return
await asyncio.sleep(0.1) # simulate work
await results.put(f"{name} did job {job}")
jobs.task_done()
async def main() -> None:
jobs: asyncio.Queue = asyncio.Queue(maxsize=10)
results: asyncio.Queue = asyncio.Queue()
n_workers = 4
async with asyncio.TaskGroup() as tg:
# Fan-out: N workers share the jobs queue.
for i in range(n_workers):
tg.create_task(worker(f"w{i}", jobs, results))
# Feed jobs, then one sentinel per worker so each one exits.
async def feed() -> None:
for j in range(20):
await jobs.put(j)
for _ in range(n_workers):
await jobs.put(SENTINEL)
tg.create_task(feed())
# Fan-in: collect exactly 20 results.
async def collect() -> None:
for _ in range(20):
print(await results.get())
tg.create_task(collect())
asyncio.run(main())
Fan-out / fan-in with bounded queues
Rendering diagram…

The “one sentinel per worker” detail is the Python tax for the missing close(): each worker consumes exactly one sentinel and exits, so you send N of them. (The cleaner alternative for unknown counts: await jobs.join() then tg.create_task cancellation — but inside a TaskGroup, sentinels keep every task finishing on its own, which TaskGroup prefers.)

A stream pipeline is one kind of background work. The other is the detached worker: a task you kick off that runs alongside your request handlers — a metrics flusher, a cache warmer, a queue drainer. This is where Python’s footguns live.

create_task and the “fire and forget” trap

Section titled “create_task and the “fire and forget” trap”
// "Fire and forget" — the floating-promise footgun.
doBackgroundWork(); // ⚠️ unhandled rejection if it throws
// Better: keep the reference and attach a handler.
const p = doBackgroundWork();
p.catch(err => log.error(err));

For background tasks with a known lifetime (the life of a request, a batch, a test), asyncio.TaskGroup is the right tool, exactly as in Module 05. It keeps strong references, propagates exceptions, and cancels siblings on failure. The catch: the async with block waits for every task to finish before exiting — so it’s for work that ends, not for daemons that run forever.

import asyncio
async def handle_batch(items: list[int]) -> None:
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(process(item))
# All tasks done here. If any raised, an ExceptionGroup propagates
# and the others were cancelled. No leaked tasks. No weak-ref trap.

For a task that runs for the lifetime of the service (not a request), you own it explicitly: keep the reference, cancel it on shutdown, and await it so its finally cleanup runs. This is the pattern FastAPI’s lifespan wraps for you (forward reference: Module 07).

import asyncio
import contextlib
async def metrics_flusher(interval: float) -> None:
try:
while True:
await asyncio.sleep(interval)
await flush_metrics()
except asyncio.CancelledError:
await flush_metrics() # final flush on shutdown
raise # always re-raise CancelledError!
finally:
print("flusher stopped")
async def main() -> None:
task = asyncio.create_task(metrics_flusher(5.0))
try:
await run_service() # the app's real work
finally:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task # wait for the finally/cleanup to run

The asyncio-native “do this every N seconds” is a while True + await asyncio.sleep(n), run as a background task. It’s the equivalent of setInterval (TS) or a time.Ticker loop (Go) — but it’s a real cancellable coroutine, so cleanup is structured.

import asyncio
async def every(interval: float, fn) -> None:
"""Run fn() every `interval` seconds until cancelled."""
while True:
await asyncio.sleep(interval)
try:
await fn()
except Exception:
log.exception("periodic task failed") # don't let one failure kill the loop

Because async generators can run forever and hold resources (a file handle, a DB cursor, a websocket), cleanup is not optional. Two mechanisms cover it.

When the consumer stops early (a break, a cancellation, an exception downstream), Python throws GeneratorExit into the generator at its suspended yield. Your finally block runs — this is where you close resources.

from collections.abc import AsyncIterator
async def tail_file(path: str) -> AsyncIterator[str]:
f = await aopen(path)
try:
while True:
line = await f.readline()
if line:
yield line
else:
await asyncio.sleep(0.1) # poll for new lines
finally:
await f.close() # ALWAYS runs — even on break/cancel
print("file closed")
async def main() -> None:
async for line in tail_file("app.log"):
print(line)
if "FATAL" in line:
break # triggers GeneratorExit → finally → file closed

aclose() for explicit, deterministic cleanup

Section titled “aclose() for explicit, deterministic cleanup”

A plain async for that runs to completion or breaks will eventually close the generator — but “eventually” depends on the GC, and on the event loop still running. For deterministic cleanup (and to avoid “coroutine ignored GeneratorExit” warnings at shutdown), close it yourself with aclose(), or use contextlib.aclosing to make it async with-scoped:

import asyncio
from contextlib import aclosing
async def main() -> None:
# aclosing() guarantees gen.aclose() runs when the block exits,
# exactly like Go's `defer close(ch)` or a Kotlin Flow's onCompletion.
async with aclosing(tail_file("app.log")) as stream:
async for line in stream:
print(line)
if "FATAL" in line:
break
# gen.aclose() has run here: the file is closed, finally executed,
# deterministically, before we move on.

Key differences:

  • Go: defer close(ch) in the producer goroutine; consumers see the closed channel. Cleanup is the producer’s job and runs at function exit.
  • Kotlin: Flow cleanup goes in onCompletion {} / flow {}’s own try/finally; cancelling the collector triggers it.
  • Python: try/finally inside the generator, and aclosing() at the consumer for deterministic, scoped teardown. Prefer aclosing over relying on the GC.

The payoff for all of this on the web tier: an async generator is a streaming HTTP response. Server-Sent Events (SSE), chunked responses, and streamed LLM tokens are all “yield bytes/events over time,” and FastAPI consumes an async generator directly.

# Preview — full treatment in Module 07.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def event_stream() -> AsyncIterator[str]:
for i in range(10):
await asyncio.sleep(1)
yield f"data: tick {i}\n\n" # SSE wire format
@app.get("/stream")
async def stream() -> StreamingResponse:
return StreamingResponse(event_stream(), media_type="text/event-stream")

The same async generator that powered your in-memory pipeline becomes an HTTP stream with zero changes to the generator itself — the cold, pull-driven model means FastAPI pulls one chunk, sends it, and pulls the next only when the socket is ready. Backpressure all the way to the client. Module 07 does this properly: SSE, StreamingResponse, and cancellation when the client disconnects.

You have…UseWhy
A transformation over a stream of itemsasync generator + async forCold, composable, automatic backpressure
A bursty producer + a steady consumerasyncio.Queue(maxsize=n)Decouple them; the buffer absorbs bursts
Multiple producers → one consumerQueue, one task per producerFan-in
One stream → N parallel workersQueue + N worker tasksFan-out worker pool
Background work with a known endTaskGroupSupervised, no leaks, exceptions propagate
A daemon for the service’s lifetimecreate_task + keep ref + cancel on shutdownExplicit lifecycle, graceful cleanup
Periodic in-process workwhile True + asyncio.sleepThe asyncio setInterval
Persistent / distributed / cron jobsAPScheduler / ArqA loop is not a job queue
A rich operator libraryaiostreamDon’t reinvent RxJS

Build a real in-process streaming pipeline end to end — an async-generator source, transform stages, bounded queues for backpressure, a windowed aggregator, and graceful shutdown.