Skip to content

Concurrent HTTP Fetcher

Build a concurrent URL fetcher that retrieves N URLs at once while capping how many requests run simultaneously and enforcing a per-request timeout — Python’s answer to a Go fan-out worker pool, but with a TaskGroup instead of a sync.WaitGroup and a Semaphore instead of a buffered channel. A single slow or failing URL must not sink the batch.

  • asyncio.run as the entry point and async def / await throughout.
  • asyncio.TaskGroup (3.11+) for structured fan-out — no leaked tasks.
  • asyncio.Semaphore to bound concurrency (the Go buffered-channel-as-semaphore pattern, made explicit).
  • asyncio.timeout (3.11+) for per-request deadlines.
  • httpx.AsyncClient for real, pooled, non-blocking HTTP.
  • Collecting a typed Success | Failure result per URL plus a summary.
  1. A FetchResult type with two variants:
    • Success(url, status, size, elapsed_ms)
    • Failure(url, error, elapsed_ms)
  2. A fetch_all(urls, *, max_concurrency, timeout_s) coroutine that:
    • launches one task per URL inside an asyncio.TaskGroup,
    • gates each through a shared asyncio.Semaphore(max_concurrency),
    • times each request out with asyncio.timeout(timeout_s),
    • turns a timeout or connection error into a Failure rather than crashing the batch.
  3. A reused httpx.AsyncClient (one client, connection pooling).
  4. Summary output: how many succeeded, how many failed, and the total elapsed time.

The shape maps cleanly from Go: instead of a fixed pool of worker goroutines pulling from a jobs channel, you launch one task per URL and let the Semaphore throttle them. The TaskGroup is the structured-concurrency replacement for WaitGroup — its async with block suspends until every child finishes (and cancels the rest if one fails fatally).

A single-module uv project. The whole program lives in src/fetcher/main.py.

  • Directoryconcurrent-fetcher/
    • pyproject.toml httpx dependency, ruff + ty config
    • Directorysrc/
      • Directoryfetcher/
        • main.py types, fetcher, and the entry point

Create the project and add the one runtime dependency. uv manages the Python version and the virtualenv for you — no pip, no python -m venv.

Terminal window
uv init concurrent-fetcher --package
cd concurrent-fetcher
uv add httpx
uv add --dev ruff ty
pyproject.toml
[project]
name = "concurrent-fetcher"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = ["httpx>=0.28"]
[dependency-groups]
dev = ["ruff", "ty"]
[project.scripts]
fetcher = "fetcher.main:run"
[build-system]
requires = ["uv_build>=0.8"]
build-backend = "uv_build"

We model each outcome as a frozen dataclass and use a Success | Failure union as the result. This is the Python equivalent of a TypeScript discriminated union or a Go tagged result struct — match over the two variants is exhaustive in practice, and isinstance narrows the type for the checker.

src/fetcher/main.py
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
import httpx
@dataclass(frozen=True, slots=True)
class Success:
url: str
status: int
size: int
elapsed_ms: float
@dataclass(frozen=True, slots=True)
class Failure:
url: str
error: str
elapsed_ms: float
type FetchResult = Success | Failure # PEP 695 type alias

The fetcher — concurrency, timeout, structure

Section titled “The fetcher — concurrency, timeout, structure”

This is the centerpiece. Three asyncio tools do all the work, and it’s worth seeing exactly which job each owns.

src/fetcher/main.py
async def fetch_one(
client: httpx.AsyncClient,
sem: asyncio.Semaphore,
url: str,
timeout_s: float,
) -> FetchResult:
start = time.perf_counter()
try:
async with sem: # bounded concurrency
async with asyncio.timeout(timeout_s): # per-request deadline
resp = await client.get(url, follow_redirects=True)
elapsed = (time.perf_counter() - start) * 1000
return Success(url, resp.status_code, len(resp.content), elapsed)
except TimeoutError:
elapsed = (time.perf_counter() - start) * 1000
return Failure(url, f"timeout after {timeout_s}s", elapsed)
except httpx.HTTPError as exc:
elapsed = (time.perf_counter() - start) * 1000
return Failure(url, f"{type(exc).__name__}: {exc}", elapsed)
async def fetch_all(
urls: list[str],
*,
max_concurrency: int = 5,
timeout_s: float = 5.0,
) -> list[FetchResult]:
sem = asyncio.Semaphore(max_concurrency)
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg: # structured fan-out
tasks = [
tg.create_task(fetch_one(client, sem, url, timeout_s))
for url in urls
]
# The TaskGroup block has exited → every task is done.
return [task.result() for task in tasks]

How the pieces fit together:

  • async with asyncio.TaskGroup() as tg: is the structured-concurrency boundary — Go’s errgroup collapsed into a block. fetch_all cannot return until every task created inside it finishes; leaking a task is impossible. Note that each fetch_one catches its own errors and returns a Failure, so a single bad URL never triggers the group’s fail-fast cancellation of its siblings — the batch always completes. (Let an exception escape fetch_one and you’d see the fail-fast behavior instead: siblings cancelled, an ExceptionGroup raised.)
  • async with sem: is what actually caps concurrency. All N tasks are scheduled at once, but only max_concurrency of them are past the Semaphore gate at any moment; the rest suspend (not block a thread) until a permit frees. This is the buffered-channel-as-semaphore pattern from Go, made explicit. The permit is released on block exit even if the body raises.
  • async with asyncio.timeout(timeout_s): races the request against a deadline. If client.get overruns, asyncio cancels it and the block raises TimeoutError, which we convert to a Failure. The surrounding except httpx.HTTPError catches connection/DNS/status errors and turns those into Failures too.
  • One httpx.AsyncClient, reused across all requests via async with — that gives you connection pooling and keep-alive. Creating a client per request would throw away the pool.

run() is a tiny sync wrapper around asyncio.run(main()) so it can serve as the project.scripts console entry point (you can’t make a script target async). It builds a URL list, fetches with a cap of 5, prints each result via match, and ends with a summary.

src/fetcher/main.py
async def main() -> None:
urls = [
"https://example.com",
"https://httpbin.org/get",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/10", # will time out
"https://httpbin.org/status/404",
"https://this-host-does-not-exist.invalid", # will fail
"https://api.github.com",
"https://www.python.org",
]
print(f"Fetching {len(urls)} URLs (max 5 concurrent, 5s timeout)...")
start = time.perf_counter()
results = await fetch_all(urls, max_concurrency=5, timeout_s=5.0)
total_ms = (time.perf_counter() - start) * 1000
for r in results:
match r:
case Success(url, status, size, elapsed):
print(f"[OK] {status} {url} ({size}B, {elapsed:.0f}ms)")
case Failure(url, error, elapsed):
print(f"[FAIL] {url} ({elapsed:.0f}ms) - {error}")
ok = sum(1 for r in results if isinstance(r, Success))
fail = len(results) - ok
print("-" * 60)
print(f"Total: {len(results)} | OK: {ok} | Failed: {fail} | Time: {total_ms:.0f}ms")
def run() -> None:
"""Sync entry point for the `fetcher` console script."""
asyncio.run(main())
if __name__ == "__main__":
run()

Because the cap is 5, the wall-clock time is roughly the work split into waves of 5, not the sum of every request — that’s the speedup the Semaphore buys over fetching one at a time, while staying kinder to the remote hosts than firing all N at once.

  1. Sync dependencies (creates the venv, installs httpx):

    Terminal window
    uv sync
  2. Run it — via the console script or the module directly:

    Terminal window
    uv run fetcher
    # or: uv run python -m fetcher.main
  3. Lint and type-check (every project in this guide does this):

    Terminal window
    uv run ruff check .
    uv run ruff format .
    uv run ty check

You’ll see lines stream out, then a summary like Total: 8 | OK: 5 | Failed: 3 | Time: 5180ms — the /delay/10 URL hits the 5s timeout and the .invalid host fails DNS, while the rest succeed. Exact timing varies with the network.