Concurrent HTTP Fetcher
Build a concurrent URL fetcher that retrieves N URLs at once while capping how many
requests run simultaneously and enforcing a per-request timeout — Python’s answer to
a Go fan-out worker pool, but with a TaskGroup instead of a sync.WaitGroup and a
Semaphore instead of a buffered channel. A single slow or failing URL must not sink
the batch.
What you’ll practice
Section titled “What you’ll practice”asyncio.runas the entry point andasync def/awaitthroughout.asyncio.TaskGroup(3.11+) for structured fan-out — no leaked tasks.asyncio.Semaphoreto bound concurrency (the Go buffered-channel-as-semaphore pattern, made explicit).asyncio.timeout(3.11+) for per-request deadlines.httpx.AsyncClientfor real, pooled, non-blocking HTTP.- Collecting a typed
Success | Failureresult per URL plus a summary.
Requirements
Section titled “Requirements”- A
FetchResulttype with two variants:Success(url, status, size, elapsed_ms)Failure(url, error, elapsed_ms)
- A
fetch_all(urls, *, max_concurrency, timeout_s)coroutine that:- launches one task per URL inside an
asyncio.TaskGroup, - gates each through a shared
asyncio.Semaphore(max_concurrency), - times each request out with
asyncio.timeout(timeout_s), - turns a timeout or connection error into a
Failurerather than crashing the batch.
- launches one task per URL inside an
- A reused
httpx.AsyncClient(one client, connection pooling). - Summary output: how many succeeded, how many failed, and the total elapsed time.
The shape maps cleanly from Go: instead of a fixed pool of worker goroutines pulling
from a jobs channel, you launch one task per URL and let the Semaphore throttle them.
The TaskGroup is the structured-concurrency replacement for WaitGroup — its
async with block suspends until every child finishes (and cancels the rest if one
fails fatally).
The worked solution
Section titled “The worked solution”A single-module uv project. The whole program lives in src/fetcher/main.py.
Directoryconcurrent-fetcher/
- pyproject.toml httpx dependency, ruff + ty config
Directorysrc/
Directoryfetcher/
- main.py types, fetcher, and the entry point
Project setup
Section titled “Project setup”Create the project and add the one runtime dependency. uv manages the Python
version and the virtualenv for you — no pip, no python -m venv.
uv init concurrent-fetcher --packagecd concurrent-fetcheruv add httpxuv add --dev ruff ty[project]name = "concurrent-fetcher"version = "0.1.0"requires-python = ">=3.13"dependencies = ["httpx>=0.28"]
[dependency-groups]dev = ["ruff", "ty"]
[project.scripts]fetcher = "fetcher.main:run"
[build-system]requires = ["uv_build>=0.8"]build-backend = "uv_build"The result type
Section titled “The result type”We model each outcome as a frozen dataclass and use a Success | Failure union as
the result. This is the Python equivalent of a TypeScript discriminated union or a
Go tagged result struct — match over the two variants is exhaustive in practice,
and isinstance narrows the type for the checker.
from __future__ import annotations
import asyncioimport timefrom dataclasses import dataclass
import httpx
@dataclass(frozen=True, slots=True)class Success: url: str status: int size: int elapsed_ms: float
@dataclass(frozen=True, slots=True)class Failure: url: str error: str elapsed_ms: float
type FetchResult = Success | Failure # PEP 695 type aliasThe fetcher — concurrency, timeout, structure
Section titled “The fetcher — concurrency, timeout, structure”This is the centerpiece. Three asyncio tools do all the work, and it’s worth seeing exactly which job each owns.
async def fetch_one( client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str, timeout_s: float,) -> FetchResult: start = time.perf_counter() try: async with sem: # bounded concurrency async with asyncio.timeout(timeout_s): # per-request deadline resp = await client.get(url, follow_redirects=True) elapsed = (time.perf_counter() - start) * 1000 return Success(url, resp.status_code, len(resp.content), elapsed) except TimeoutError: elapsed = (time.perf_counter() - start) * 1000 return Failure(url, f"timeout after {timeout_s}s", elapsed) except httpx.HTTPError as exc: elapsed = (time.perf_counter() - start) * 1000 return Failure(url, f"{type(exc).__name__}: {exc}", elapsed)
async def fetch_all( urls: list[str], *, max_concurrency: int = 5, timeout_s: float = 5.0,) -> list[FetchResult]: sem = asyncio.Semaphore(max_concurrency) async with httpx.AsyncClient() as client: async with asyncio.TaskGroup() as tg: # structured fan-out tasks = [ tg.create_task(fetch_one(client, sem, url, timeout_s)) for url in urls ] # The TaskGroup block has exited → every task is done. return [task.result() for task in tasks]How the pieces fit together:
async with asyncio.TaskGroup() as tg:is the structured-concurrency boundary — Go’serrgroupcollapsed into a block.fetch_allcannot return until every task created inside it finishes; leaking a task is impossible. Note that eachfetch_onecatches its own errors and returns aFailure, so a single bad URL never triggers the group’s fail-fast cancellation of its siblings — the batch always completes. (Let an exception escapefetch_oneand you’d see the fail-fast behavior instead: siblings cancelled, anExceptionGroupraised.)async with sem:is what actually caps concurrency. All N tasks are scheduled at once, but onlymax_concurrencyof them are past theSemaphoregate at any moment; the rest suspend (not block a thread) until a permit frees. This is the buffered-channel-as-semaphore pattern from Go, made explicit. The permit is released on block exit even if the body raises.async with asyncio.timeout(timeout_s):races the request against a deadline. Ifclient.getoverruns, asyncio cancels it and the block raisesTimeoutError, which we convert to aFailure. The surroundingexcept httpx.HTTPErrorcatches connection/DNS/status errors and turns those intoFailures too.- One
httpx.AsyncClient, reused across all requests viaasync with— that gives you connection pooling and keep-alive. Creating a client per request would throw away the pool.
The entry point
Section titled “The entry point”run() is a tiny sync wrapper around asyncio.run(main()) so it can serve as the
project.scripts console entry point (you can’t make a script target async). It
builds a URL list, fetches with a cap of 5, prints each result via match, and ends
with a summary.
async def main() -> None: urls = [ "https://example.com", "https://httpbin.org/get", "https://httpbin.org/delay/1", "https://httpbin.org/delay/10", # will time out "https://httpbin.org/status/404", "https://this-host-does-not-exist.invalid", # will fail "https://api.github.com", "https://www.python.org", ]
print(f"Fetching {len(urls)} URLs (max 5 concurrent, 5s timeout)...") start = time.perf_counter() results = await fetch_all(urls, max_concurrency=5, timeout_s=5.0) total_ms = (time.perf_counter() - start) * 1000
for r in results: match r: case Success(url, status, size, elapsed): print(f"[OK] {status} {url} ({size}B, {elapsed:.0f}ms)") case Failure(url, error, elapsed): print(f"[FAIL] {url} ({elapsed:.0f}ms) - {error}")
ok = sum(1 for r in results if isinstance(r, Success)) fail = len(results) - ok print("-" * 60) print(f"Total: {len(results)} | OK: {ok} | Failed: {fail} | Time: {total_ms:.0f}ms")
def run() -> None: """Sync entry point for the `fetcher` console script.""" asyncio.run(main())
if __name__ == "__main__": run()Because the cap is 5, the wall-clock time is roughly the work split into waves of 5,
not the sum of every request — that’s the speedup the Semaphore buys over fetching
one at a time, while staying kinder to the remote hosts than firing all N at once.
Run it
Section titled “Run it”-
Sync dependencies (creates the venv, installs
httpx):Terminal window uv sync -
Run it — via the console script or the module directly:
Terminal window uv run fetcher# or: uv run python -m fetcher.main -
Lint and type-check (every project in this guide does this):
Terminal window uv run ruff check .uv run ruff format .uv run ty check
You’ll see lines stream out, then a summary like
Total: 8 | OK: 5 | Failed: 3 | Time: 5180ms — the /delay/10 URL hits the 5s
timeout and the .invalid host fails DNS, while the rest succeed. Exact timing
varies with the network.