Async & Structured Concurrency
You already write concurrent code: async/await in TypeScript, goroutines and
channels in Go. Python’s asyncio is closest to the Node model — a single-threaded
event loop running cooperative tasks — but the modern API (3.11+) added
structured concurrency (TaskGroup, asyncio.timeout) that finally makes it
competitive with Go’s errgroup + context discipline. This module maps every
piece onto what you already know.
The mental model: one thread, cooperative
Section titled “The mental model: one thread, cooperative”The single most important thing to internalize: asyncio is not parallelism. It
is one OS thread running an event loop that switches between tasks at await
points. This is exactly Node’s model. It is not Go’s model.
| Concept | TypeScript | Go | Python |
|---|---|---|---|
| Unit of concurrency | async function / Promise | goroutine | coroutine (async def) |
| Scheduling model | Single-threaded event loop | M:N green threads on OS threads | Single-threaded event loop |
| True parallelism? | No (one thread) | Yes (GOMAXPROCS threads) | No (one thread, GIL) |
| Yields at | every await | I/O + scheduler preemption | every await |
| CPU-bound work | blocks the loop | runs in parallel | blocks the loop |
| Cancellation | AbortController | context.Context | CancelledError / Task.cancel() |
| Structured lifecycle | none (floating promises) | none built in (errgroup helps) | TaskGroup (3.11+) |
Coroutines are lazy
Section titled “Coroutines are lazy”This trips up everyone coming from JS. A coroutine doesn’t run when you call it — it returns a coroutine object that runs only when awaited or scheduled.
import asyncio
async def greet() -> str: print("running greet") return "hi"
async def main() -> None: coro = greet() # nothing prints — coro is just an object print("not run yet") result = await coro # NOW greet() runs, prints "running greet" print(result)
asyncio.run(main())# not run yet# running greet# hiIn TypeScript, greet() would have executed immediately and returned a Promise.
In Python you get a “coroutine was never awaited” RuntimeWarning if you forget —
Python’s gentle reminder that you dropped a promise on the floor.
Function coloring — the honest take
Section titled “Function coloring — the honest take”async is contagious, in Python as in TypeScript. To await something, your
function must be async; to call an async function meaningfully, you need an
event loop. This is the “function coloring” problem, and it’s real:
- You can’t call an
async deffrom ordinary sync code withoutasyncio.run()(at the program edge) — the same way you can’tawaitoutside anasync functionin JS. - Mixing a sync library into an async codebase means either finding an async
equivalent (
requests→httpx) or pushing the blocking call to a thread (see the blocking trap below).
Go has no function coloring: any function can run in a goroutine, and blocking
calls yield automatically. That is genuinely nicer. Python’s bet is that explicit
await points make the suspension behavior visible — you can see exactly where your
code can be interrupted. Decide per project whether your stack is async (FastAPI,
Litestar, async SQLAlchemy) or sync (Django classic, Flask); don’t straddle.
Running coroutines
Section titled “Running coroutines”asyncio.run(main()) is the one entry point you need. It creates an event loop,
runs the coroutine to completion, and tears the loop down. Call it once, at the top
of your program — never inside a coroutine.
async function fetchUser(): Promise<string> { await new Promise((r) => setTimeout(r, 1000)); return "Alice";}
async function main(): Promise<void> { const start = Date.now(); const user = await fetchUser(); console.log(`${user} in ${Date.now() - start}ms`);}
main();package main
import ( "fmt" "time")
func fetchUser() string { time.Sleep(1 * time.Second) return "Alice"}
func main() { start := time.Now() user := fetchUser() fmt.Printf("%s in %v\n", user, time.Since(start))}import asyncioimport time
async def fetch_user() -> str: await asyncio.sleep(1) # non-blocking sleep — yields the loop return "Alice"
async def main() -> None: start = time.perf_counter() user = await fetch_user() print(f"{user} in {(time.perf_counter() - start) * 1000:.0f}ms")
asyncio.run(main())Running things concurrently
Section titled “Running things concurrently”The example above runs one task. To get a speedup you run independent work
concurrently. Three tools, in increasing order of preference: gather, then
TaskGroup (the modern default).
Here’s the canonical “fetch two things at once” in all three languages:
async function fetchUser(): Promise<string> { await new Promise((r) => setTimeout(r, 1000)); return "Alice";}async function fetchOrder(): Promise<string> { await new Promise((r) => setTimeout(r, 1000)); return "Order-42";}
async function main(): Promise<void> { const start = Date.now(); const [user, order] = await Promise.all([fetchUser(), fetchOrder()]); console.log(`${user}, ${order} in ${Date.now() - start}ms`); // Alice, Order-42 in ~1000ms}main();func main() { start := time.Now() var user, order string var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done(); time.Sleep(time.Second); user = "Alice" }() go func() { defer wg.Done(); time.Sleep(time.Second); order = "Order-42" }() wg.Wait() fmt.Printf("%s, %s in %v\n", user, order, time.Since(start)) // Alice, Order-42 in ~1s}import asyncio, time
async def fetch_user() -> str: await asyncio.sleep(1) return "Alice"
async def fetch_order() -> str: await asyncio.sleep(1) return "Order-42"
async def main() -> None: start = time.perf_counter() user, order = await asyncio.gather(fetch_user(), fetch_order()) print(f"{user}, {order} in {(time.perf_counter() - start) * 1000:.0f}ms") # Alice, Order-42 in ~1000ms
asyncio.run(main())asyncio.gather(*coros) is Python’s Promise.all: it schedules each coroutine as a
task, runs them concurrently on the loop, and returns results in input order.
Total time is the slowest branch (~1s), not the sum (~2s).
Structured concurrency: TaskGroup is the default
Section titled “Structured concurrency: TaskGroup is the default”gather works, but it has the same flaw as Promise.all and bare goroutines:
leaks and lost errors. If one coroutine raises while the others are still
running, gather returns (or raises) but doesn’t necessarily cancel the siblings,
and a fire-and-forget asyncio.create_task whose handle you drop becomes the
asyncio version of a floating promise — it can vanish with its exception swallowed.
Go developers solved this with errgroup; Python 3.11 added asyncio.TaskGroup,
which gives you the same guarantees as a language primitive. Prefer it over
gather for anything non-trivial.
// Promise.all — concurrent, but a rejection doesn't cancel the others;// they keep running with their results discarded.const [user, order] = await Promise.all([fetchUser(), fetchOrder()]);// errgroup — first error cancels the shared context; Wait() returns it.g, ctx := errgroup.WithContext(context.Background())var user, order stringg.Go(func() error { var e error; user, e = fetchUser(ctx); return e })g.Go(func() error { var e error; order, e = fetchOrder(ctx); return e })if err := g.Wait(); err != nil { // first error; siblings were cancelled via ctx}# TaskGroup — the structured-concurrency primitive (3.11+).async def main() -> None: async with asyncio.TaskGroup() as tg: user_task = tg.create_task(fetch_user()) order_task = tg.create_task(fetch_order()) # The `async with` block doesn't exit until ALL tasks finish. # If any task raises, the rest are cancelled and the error # propagates out of the block. print(user_task.result(), order_task.result())What TaskGroup guarantees — the same contract as Go’s errgroup, but enforced by
the async with block:
- The block does not exit until every task created in it has finished.
- If any task raises, all sibling tasks are cancelled, then the block re-raises.
- If the surrounding code is cancelled, all tasks in the group are cancelled.
- You cannot leak a task — its lifetime is bound to the
async withscope.
gather vs TaskGroup — when each fits
Section titled “gather vs TaskGroup — when each fits”asyncio.gather | asyncio.TaskGroup (3.11+) | |
|---|---|---|
| Style | functional, returns a result list | block-scoped, structured |
| Result order | preserves input order | read each task’s .result() |
| On first error | siblings keep running (unless return_exceptions) | siblings cancelled |
| Multiple errors | only the first surfaces | bundled into an ExceptionGroup |
| Leak safety | can leak if you mix in stray create_task | scope-bound, can’t leak |
| Use when | quick fan-out of homogeneous work you fully control | the default for anything real |
One more gather mode worth knowing: return_exceptions=True makes it behave like
Promise.allSettled — every result comes back, exceptions included as values rather
than raised. Useful when you genuinely want partial results and no cancellation.
results = await asyncio.gather(*tasks, return_exceptions=True)for r in results: if isinstance(r, Exception): print("failed:", r) else: print("ok:", r)ExceptionGroup and except*
Section titled “ExceptionGroup and except*”Because a TaskGroup can have several tasks fail at once, it raises an
ExceptionGroup (a 3.11 language feature) bundling all of them. You unpack it
with the new except* syntax:
try: async with asyncio.TaskGroup() as tg: tg.create_task(might_fail_a()) tg.create_task(might_fail_b())except* ValueError as eg: print(f"{len(eg.exceptions)} ValueError(s):", eg.exceptions)except* ConnectionError as eg: print(f"{len(eg.exceptions)} connection error(s):", eg.exceptions)Go flattens to a single first error; Python keeps them all. Each except* block
runs for whichever exception types matched, with the rest left in the group.
Cancellation & timeouts
Section titled “Cancellation & timeouts”Cancellation in asyncio is cooperative, just like Go’s context: a cancelled
task receives a CancelledError raised at its next await point. Code between
awaits runs to completion — cancellation can’t interrupt a tight CPU loop with no
suspension points (the asyncio analogue of a goroutine that never checks ctx.Done()).
| Concept | TypeScript | Go | Python |
|---|---|---|---|
| Cancellation token | AbortController / AbortSignal | context.Context | the Task itself / CancelledError |
| Request cancellation | controller.abort() | cancel() | task.cancel() |
| Observe cancellation | check signal.aborted | <-ctx.Done() | CancelledError raised at await |
| Timeout | AbortSignal.timeout(ms) | context.WithTimeout | async with asyncio.timeout(s): |
| Deadline propagation | manual (pass the signal) | ctx flows down the call tree | implicit (cancels the awaited task tree) |
| Cleanup on cancel | finally | defer | try/finally (+ shield if needed) |
Timeouts with asyncio.timeout
Section titled “Timeouts with asyncio.timeout”Python 3.11 added asyncio.timeout(), a context manager that’s far cleaner than the
old asyncio.wait_for. It cancels whatever is running inside the block when the
deadline passes:
// AbortSignal.timeout cancels fetch after 2sconst res = await fetch(url, { signal: AbortSignal.timeout(2000) });ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()result, err := doWork(ctx) // doWork must respect ctx.Done()if errors.Is(err, context.DeadlineExceeded) { // timed out}try: async with asyncio.timeout(2.0): # seconds result = await do_work() # cancelled if it overrunsexcept TimeoutError: print("timed out")When the deadline fires, asyncio raises CancelledError inside the block to unwind
do_work, then converts it to a TimeoutError at the async with. You catch
TimeoutError; the inner code sees normal cancellation. There’s also
asyncio.timeout_at(loop.time() + 2) for absolute deadlines — the equivalent of
context.WithDeadline.
Handling cancellation cleanly
Section titled “Handling cancellation cleanly”Treat CancelledError like Go’s ctx.Done(): clean up and stop. Don’t swallow it —
re-raise so the cancellation propagates.
async def worker() -> None: try: while True: await do_one_unit() # cancellation lands at an await point except asyncio.CancelledError: await flush_buffers() # cleanup, like `defer` raise # ALWAYS re-raise — don't suppress cancellationCoordination primitives
Section titled “Coordination primitives”For producer/consumer and bounded concurrency, asyncio ships async-aware versions of the primitives you know — all of which suspend rather than block the thread.
asyncio.Queue — producer/consumer (≈ Go channels)
Section titled “asyncio.Queue — producer/consumer (≈ Go channels)”An asyncio.Queue is the closest thing to a Go channel: producers put, consumers
get, both suspend when the queue is full/empty. The differences: a Python queue
has no close(), so you signal completion with a sentinel or task_done()/join();
and there’s no select statement (reach for asyncio.wait with FIRST_COMPLETED,
or just structure with tasks).
func main() { jobs := make(chan int, 100) results := make(chan string, 100) var wg sync.WaitGroup
for w := 1; w <= 3; w++ { // fan-out: 3 workers wg.Add(1) go func(id int) { defer wg.Done() for j := range jobs { // range until closed results <- fmt.Sprintf("worker %d did job %d", id, j) } }(w) } for j := 1; j <= 9; j++ { jobs <- j } close(jobs)
go func() { wg.Wait(); close(results) }() for r := range results { fmt.Println(r) }}import asyncio
async def worker(wid: int, jobs: asyncio.Queue[int], results: asyncio.Queue[str]) -> None: while True: job = await jobs.get() # suspends if empty try: await results.put(f"worker {wid} did job {job}") finally: jobs.task_done() # mark this item complete
async def main() -> None: jobs: asyncio.Queue[int] = asyncio.Queue() results: asyncio.Queue[str] = asyncio.Queue()
async with asyncio.TaskGroup() as tg: workers = [tg.create_task(worker(i, jobs, results)) for i in range(1, 4)] for j in range(1, 10): await jobs.put(j) await jobs.join() # wait until every job is task_done() for w in workers: # no channel close — cancel the workers w.cancel()
while not results.empty(): print(results.get_nowait())
asyncio.run(main())The shape maps cleanly: jobs.join() + task_done() replaces close(jobs) +
range, and cancelling the workers inside the TaskGroup replaces the
wg.Wait(); close(results) goroutine. Because the workers are created in the group,
the cancellation is clean and nothing leaks.
asyncio.Semaphore — bounded concurrency
Section titled “asyncio.Semaphore — bounded concurrency”When you launch N tasks but only want K running at once (rate-limiting an upstream,
capping DB connections), use a Semaphore — identical in spirit to Go’s
buffered-channel-as-semaphore.
sem := make(chan struct{}, 5) // at most 5 in flightfor _, u := range urls { go func(u string) { sem <- struct{}{} // acquire defer func() { <-sem }() // release fetch(u) }(u)}sem = asyncio.Semaphore(5) # at most 5 in flight
async def fetch_guarded(url: str) -> str: async with sem: # acquire on enter, release on exit (even on error) return await fetch(url)
async def main() -> None: async with asyncio.TaskGroup() as tg: for url in urls: tg.create_task(fetch_guarded(url))All N tasks are scheduled at once, but async with sem: lets only 5 past the gate at
a time; the rest suspend (not block) until a permit frees up. This is the exact
pattern the Concurrent HTTP Fetcher exercise builds on.
gather vs as_completed — order vs latency
Section titled “gather vs as_completed — order vs latency”When you want results as they finish rather than in submission order, use
asyncio.as_completed. It’s the streaming counterpart to gather:
async def main() -> None: coros = [fetch(u) for u in urls] # Yields each result the moment its coroutine completes — fastest first. for earliest in asyncio.as_completed(coros): result = await earliest print("done:", result)| Want | Use |
|---|---|
| All results, input order, fail-fast cancellation | TaskGroup |
| All results, input order, one shot | asyncio.gather |
All results regardless of failures (allSettled) | gather(..., return_exceptions=True) |
| Results streamed as they complete | asyncio.as_completed |
First to finish wins (Promise.race) | asyncio.wait(..., return_when=FIRST_COMPLETED) |
Async context managers & iterators (briefly)
Section titled “Async context managers & iterators (briefly)”Two pieces of async syntax you’ll use constantly:
async with— an async context manager. Same job aswith(acquire/release a resource), but__aenter__/__aexit__canawait. You’ve already seen it:TaskGroup,timeout,Semaphore, and every async client (httpx.AsyncClient, DB sessions) are async context managers.async for— iterate an async generator, awaiting each item. This is how you stream rows from a DB cursor or chunks from an HTTP response.
async def tail(path: str): # an async generator async with aiofiles.open(path) as f: async for line in f: # await each line yield line.rstrip()
async def main() -> None: async for line in tail("app.log"): print(line)Streaming with async generators — backpressure, aiostream, queues as pipelines — is
the focus of Module 06: Streams & Background Work. Here,
just recognize the syntax.
Real async HTTP with httpx
Section titled “Real async HTTP with httpx”asyncio only gives you the loop and primitives; you need async-aware libraries to
get any benefit. The blocking requests library (you’ll see it everywhere) stalls the
loop. The modern choice is httpx, whose AsyncClient is a drop-in async HTTP
client with a requests-like API.
import asyncioimport httpx
async def fetch_json(client: httpx.AsyncClient, url: str) -> dict: resp = await client.get(url, timeout=5.0) resp.raise_for_status() return resp.json()
async def main() -> None: # One client, reused — connection pooling, HTTP/2, keep-alive. async with httpx.AsyncClient() as client: async with asyncio.TaskGroup() as tg: tasks = [ tg.create_task(fetch_json(client, f"https://httpbin.org/anything/{i}")) for i in range(5) ] for t in tasks: print(t.result()["url"])
asyncio.run(main())The blocking-call trap
Section titled “The blocking-call trap”One thread, one loop: a blocking call freezes the entire service. CPU-bound work
(image resizing, a big JSON parse, password hashing with argon2), a sync library with
no async version, or time.sleep — any of these blocks every other coroutine for its
full duration. There’s no scheduler to preempt it the way Go would.
The escape hatch is to push that work onto a thread (or process) pool so the loop keeps spinning:
// In Go you barely think about this — the scheduler runs the blocking// call on its own OS thread and keeps other goroutines going.hash := argon2.IDKey(pw, salt, 1, 64*1024, 4, 32) // just call itimport asynciofrom argon2 import PasswordHasher
ph = PasswordHasher()
async def hash_password(pw: str) -> str: # to_thread runs the blocking CPU work on a thread pool, # so the event loop stays responsive. (3.9+) return await asyncio.to_thread(ph.hash, pw)
# For CPU-bound work that the GIL would serialize anyway, use a# ProcessPoolExecutor to get real parallelism:async def parse_huge(data: bytes) -> dict: loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: return await loop.run_in_executor(pool, expensive_parse, data)Rules of thumb:
- I/O that has an async library (HTTP, Postgres, Redis) → use the async library
(
httpx,asyncpg,redis.asyncio). Don’t thread it. - Blocking I/O with no async option (some SDK, a sync DB driver) →
asyncio.to_thread(fn, *args). Threads are fine here; the GIL releases during I/O. - CPU-bound work →
loop.run_in_executor(ProcessPoolExecutor(), ...)for true parallelism, because the GIL serializes CPU-bound threads. (More in Module 16.)
Cheat sheet
Section titled “Cheat sheet”| Pattern | TypeScript | Go | Python (asyncio) |
|---|---|---|---|
| Mark async | async function | (any func + go) | async def |
| Await one | await p | <-ch / blocking call | await coro |
| Entry point | top-level await / main() | func main() | asyncio.run(main()) |
| Non-blocking sleep | setTimeout + Promise | time.Sleep (in goroutine) | await asyncio.sleep(s) |
| Run all, ordered | Promise.all | errgroup / WaitGroup | asyncio.gather |
| Structured group | — | errgroup.WithContext | asyncio.TaskGroup |
| All settled | Promise.allSettled | per-goroutine error capture | gather(..., return_exceptions=True) |
| First wins | Promise.race | select | asyncio.wait(FIRST_COMPLETED) |
| Stream as done | — | select loop on channels | asyncio.as_completed |
| Channel | — | chan T | asyncio.Queue |
| Bounded concurrency | manual | buffered channel | asyncio.Semaphore |
| Timeout | AbortSignal.timeout | context.WithTimeout | async with asyncio.timeout(s) |
| Cancel | AbortController.abort | cancel() | task.cancel() / CancelledError |
| Offload blocking | worker thread | (automatic) | asyncio.to_thread / run_in_executor |
| Lock | — (single thread) | sync.Mutex | asyncio.Lock (rarely needed) |
Practice
Section titled “Practice”Put the event loop, TaskGroup, Semaphore, and asyncio.timeout to work in a
real, network-bound build.