Skip to content

Observability

A service you can’t see into is a service you can’t operate. Observability rests on three pillars: logs (what happened), metrics (how much, how often, how fast), and traces (where time went across services). Python in 2026 has a clean, vendor-neutral story for all three: structlog for structured logging, the Prometheus client for metrics, and OpenTelemetry for tracing — and the OTel SDK can carry your metrics too.

If you’re reaching for print() to debug a running service, stop. print writes unstructured text to stdout with no level, no timestamp, no context, and no way to filter or correlate it. We’ll name it once here as the thing structured logging replaces, then never again.

ConceptTypeScriptGoPython
Structured loggingpinoslog / zap / zerologstructlog
Stdlib loggingconsoleloglogging (structlog wraps it)
Log contextAsyncLocalStoragecontext.Contextcontextvars + bound loggers
Metrics clientprom-clientclient_golangprometheus-client
Metrics endpointcustom /metricspromhttp.Handler()make_asgi_app() mounted on FastAPI
Tracing@opentelemetry/sdk-nodego.opentelemetry.io/otelopentelemetry-sdk
Auto-instrumentation@opentelemetry/auto-instrumentations-nodemanual mostlyopentelemetry-instrumentation-*
Health checkscustom routecustom routecustom FastAPI route
Error trackingSentry SDKSentry SDKsentry-sdk
Profiler0x / clinicpprofpy-spy (no code changes)

The whole module’s dependency set, added to a uv project:

Terminal window
uv add structlog prometheus-client \
opentelemetry-distro opentelemetry-exporter-otlp \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-httpx \
opentelemetry-instrumentation-sqlalchemy

Pillar 1 — Structured logging with structlog

Section titled “Pillar 1 — Structured logging with structlog”

The stdlib logging module works and is everywhere, but its default output is a human-formatted string. Structured logging means every line is a JSON object with typed fields you can filter, aggregate, and correlate in Loki, ELK, or Datadog — no fragile regex parsing. structlog is the modern Python way to get there. It’s the equivalent of choosing pino over console.log in Node, or leaning on slog’s structured handler in Go.

# print/logging default (a string a regex has to claw fields out of)
2026-06-19 10:23:45 INFO user created id=123 name=Alice
# structlog JSON (machine-parseable, every field typed and queryable)
{"event": "user created", "user_id": 123, "name": "Alice", "level": "info", "timestamp": "2026-06-19T10:23:45.123Z"}

The same “log an event with fields” three ways

Section titled “The same “log an event with fields” three ways”
import pino from "pino";
const logger = pino();
logger.info({ userId: 123, name: "Alice" }, "user created");
logger.error({ err }, "failed to connect");

Key differences:

  • structlog’s first positional arg is the event (the message); everything else is **kwargs that become structured fields — exactly like pino’s object or slog’s key/value varargs.
  • structlog.get_logger() returns a lazy logger. It does almost no work until you actually emit, so you can call it at module top level.
  • Field values are kept as native types (user_id=123 stays an int in the JSON), not stringified into the message.

Configuring structlog (JSON for prod, pretty for dev)

Section titled “Configuring structlog (JSON for prod, pretty for dev)”

structlog is a pipeline of processors. Each log call flows through the chain: add a timestamp, add the level, merge in context, then render. Configure it once at startup.

app/logging_config.py
import logging
import sys
import structlog
def configure_logging(*, json_logs: bool, level: str = "INFO") -> None:
# Shared processors run for every event.
shared: list = [
structlog.contextvars.merge_contextvars, # pull in request-scoped context
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info, # render exc_info into a string
]
# The final renderer: JSON in prod, colorized key=value in dev.
renderer = (
structlog.processors.JSONRenderer()
if json_logs
else structlog.dev.ConsoleRenderer()
)
structlog.configure(
processors=[*shared, renderer],
wrapper_class=structlog.make_filtering_bound_logger(
logging.getLevelNamesMapping()[level]
),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)

structlog respects the stdlib level names you already know:

LevelWhenExample
debugDeveloper detail, off in prodlog.debug("cache lookup", key=k)
infoNormal operationslog.info("server started", port=8000)
warningUnexpected but handledlog.warning("retrying", attempt=n)
errorAn operation failedlog.error("db write failed", err=str(e))
exceptionAn error with the current tracebacklog.exception("unhandled")

make_filtering_bound_logger(INFO) drops anything below INFO before the processor chain runs — so a disabled log.debug(...) costs almost nothing, the same way pino’s level gate or slog’s Enabled() check works.

This is where structured logging earns its keep. Bind fields to a logger once, and every subsequent line carries them — no threading a logger object through every function call. There are two ways to bind.

1. Bound loggers — explicit, returns a new logger with extra fields baked in:

log = structlog.get_logger()
request_log = log.bind(request_id="abc-123", path="/api/tasks")
request_log.info("handling request") # includes request_id + path
request_log.info("db query done", rows=4) # also includes request_id + path

2. contextvars — implicit, ambient context for the whole async task, the direct analogue of Node’s AsyncLocalStorage and Go’s context.Context. This is what you want for request-scoped data like a correlation ID:

import { AsyncLocalStorage } from "node:async_hooks";
const als = new AsyncLocalStorage<{ requestId: string }>();
als.run({ requestId: id }, () => {
// every logger.child() in here can read requestId
handle();
});

contextvars are async-safe: each asyncio task gets its own copy, so two concurrent requests never leak each other’s context. The merge_contextvars processor (first in our pipeline above) is what pulls these bound values into every event.

Request-scoped context with FastAPI middleware

Section titled “Request-scoped context with FastAPI middleware”

The standard pattern: a middleware that mints (or reads) a correlation ID, binds it to contextvars, and tags the response. Every log line within that request — yours and the framework’s — then carries the ID.

app/middleware.py
import uuid
import structlog
from starlette.types import ASGIApp, Receive, Scope, Send
class RequestContextMiddleware:
def __init__(self, app: ASGIApp) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = dict(scope["headers"])
# Honor an inbound correlation ID (from a gateway / upstream service),
# otherwise generate one. lowercase header names in ASGI.
incoming = headers.get(b"x-request-id")
request_id = incoming.decode() if incoming else str(uuid.uuid4())
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=scope["method"],
path=scope["path"],
)
async def send_with_request_id(message) -> None:
if message["type"] == "http.response.start":
message.setdefault("headers", []).append(
(b"x-request-id", request_id.encode())
)
await send(message)
await self.app(scope, receive, send_with_request_id)

Your dependencies (uvicorn, SQLAlchemy, httpx) log via stdlib logging, not structlog. To get one consistent JSON stream, route stdlib records through structlog’s renderer with ProcessorFormatter:

app/logging_config.py (continued)
def configure_stdlib_bridge(*, json_logs: bool) -> None:
renderer = (
structlog.processors.JSONRenderer()
if json_logs
else structlog.dev.ConsoleRenderer()
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=renderer,
foreign_pre_chain=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
],
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root = logging.getLogger()
root.handlers = [handler]
root.setLevel(logging.INFO)
# Let uvicorn's loggers propagate to root instead of double-printing.
for name in ("uvicorn", "uvicorn.error", "uvicorn.access"):
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True

Now uvicorn’s access logs and your log.info(...) calls land in the same JSON stream, both carrying the bound request_id.

Prometheus is pull-based: your app exposes a /metrics text endpoint, and a Prometheus server scrapes it on a timer. This is the inverse of push-based systems like StatsD. The prometheus-client library is the direct sibling of Node’s prom-client and Go’s client_golang — same metric types, same exposition format.

TypeGoesUse for
Counterup only (resets on restart)total requests, errors, items processed
Gaugeup and downin-flight requests, queue depth, connection pool size
Histogrambuckets + sum + countrequest latency, payload sizes (gives you percentiles)
Summaryclient-side quantileslatency when you can’t aggregate across instances (prefer Histogram)
import { Counter, Histogram } from "prom-client";
const requests = new Counter({
name: "http_requests_total",
help: "Total HTTP requests",
labelNames: ["method", "path", "status"],
});
const latency = new Histogram({
name: "http_request_duration_seconds",
help: "Request latency",
labelNames: ["method", "path"],
});
requests.inc({ method: "GET", path: "/api/tasks", status: "200" });

Key differences:

  • Declare metrics as module-level singletons. The default registry is a global, and re-declaring the same metric name raises Duplicated timeseries — so define once, import everywhere. (This bites people who declare metrics inside a function.)
  • .labels(...) returns the specific child series, then you .inc() / .observe() on it — same shape as Go’s .WithLabelValues(...).
  • Histogram buckets are fixed at declaration and chosen by you. There’s no auto-bucketing; pick buckets that straddle your latency SLO.

prometheus-client ships an ASGI app you mount directly — no hand-written route:

app/metrics.py
import time
from prometheus_client import make_asgi_app
from starlette.types import ASGIApp, Receive, Scope, Send
from app.metrics_defs import LATENCY, REQUESTS, IN_PROGRESS
# Mounted in main.py: app.mount("/metrics", metrics_app)
metrics_app = make_asgi_app()
class PrometheusMiddleware:
"""Records a counter + latency histogram for every HTTP request."""
def __init__(self, app: ASGIApp) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
method = scope["method"]
# Use the route template, NOT the raw path — see the cardinality caution.
path = scope.get("path", "unknown")
status_holder = {"code": 500}
async def send_wrapper(message) -> None:
if message["type"] == "http.response.start":
status_holder["code"] = message["status"]
await send(message)
IN_PROGRESS.inc()
start = time.perf_counter()
try:
await self.app(scope, receive, send_wrapper)
finally:
elapsed = time.perf_counter() - start
IN_PROGRESS.dec()
LATENCY.labels(method=method, path=path).observe(elapsed)
REQUESTS.labels(
method=method, path=path, status=str(status_holder["code"])
).inc()

Two methodologies keep you from drowning in metrics:

MethodForThe three signals
REDrequest-driven services (your API)Rate, Errors, Duration
USEresources (CPU, pool, queue)Utilization, Saturation, Errors

The middleware above gives you RED for free: rate (http_requests_total), errors (filter by status=~"5.."), duration (the histogram). For USE, a gauge on your DB pool’s in-use connections and your Kafka consumer lag covers the resources that actually fall over.

Cardinality: the one pitfall that bites everyone

Section titled “Cardinality: the one pitfall that bites everyone”

Every unique combination of label values is a separate time series Prometheus must store and index. Put a high-cardinality value in a label and you get a “cardinality explosion” — millions of series, an OOM’d Prometheus, and a surprise bill.

# CATASTROPHE: user_id and raw path are unbounded.
REQUESTS.labels(method="GET", path=f"/users/{user_id}", status="200").inc()
# ^^^^^^^ a new series per user, forever
# GOOD: use the route TEMPLATE, keep labels low-cardinality.
REQUESTS.labels(method="GET", path="/users/{id}", status="200").inc()

A trace is the end-to-end journey of one request. Each unit of work is a span (an HTTP handler, a DB query, an outbound call). Spans nest into a tree; they all share one trace ID, and each links to its parent via span ID. Context propagation carries the trace ID across process boundaries (via the traceparent header) so a request through three services is one connected trace.

TermMeaning
TraceThe whole request journey across services
SpanOne timed unit of work within a trace
Trace IDShared by every span in one trace
Span IDIdentifies a single span
Context propagationPassing trace context across service calls (traceparent header)
ExporterShips spans to a backend (OTLP → collector → Jaeger/Tempo)
SamplerDecides which traces to keep (cost control)

This is the big win over manual JVM/Go tracing. The opentelemetry-instrumentation-* packages monkey-patch your libraries to emit spans with zero application code changes. Wire FastAPI, httpx, and SQLAlchemy in one place:

app/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def configure_tracing(
app, *, service_name: str, otlp_endpoint: str
) -> TracerProvider:
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
# Batch + export over OTLP gRPC to the collector (default :4317).
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(provider)
# One call each — these patch the libraries to emit spans automatically.
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(enable_commenter=True)
# Return the concrete provider so the caller can .shutdown() it on exit.
return provider

That alone gives you a span per HTTP request, a child span per outbound httpx call (with the traceparent header propagated automatically), and a child span per SQL query — connected into one trace.

Auto-instrumentation covers the framework; you add spans for the operations you care about — same as tracer.startSpan() in Node or tracer.Start(ctx, ...) in Go.

app/service.py
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
try:
await validate(order_id) # auto-spanned DB/HTTP calls nest here
await charge(order_id)
span.set_attribute("order.status", "completed")
return Order(order_id, status="completed")
except PaymentError as exc:
span.record_exception(exc)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc)))
raise

The with block handles start/end for you (no finally: span.end() like the JVM), and start_as_current_span makes it the parent for any spans created inside — including the auto-instrumented ones.

The payoff of running all three pillars: stitch a trace_id into every log line so “this error log” links straight to “this trace.” Add a structlog processor that reads the active span:

app/logging_config.py (processor)
from opentelemetry import trace
def add_trace_context(logger, method_name, event_dict):
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid:
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict

Drop add_trace_context into your processor chain and every log emitted inside a span carries trace_id. Now a log search in Loki links to the exact trace in Jaeger — logs, metrics, and traces all joined by IDs.

Kubernetes distinguishes two probes, and conflating them causes restart loops:

ProbeQuestionEndpointFailure means
Liveness”Is the process wedged?”/healthzKill and restart the pod
Readiness”Can it serve traffic now?”/readyzPull it from the load balancer (don’t restart)

/healthz should be cheap and dependency-free — it answers “is the event loop alive?” /readyz checks the things you need to serve a request (DB, Redis) and returns 503 when they’re down, so traffic drains without a kill.

app/health.py
from typing import Annotated
from fastapi import APIRouter, Depends, Request, Response, status
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
router = APIRouter(tags=["health"])
def get_engine(request: Request) -> AsyncEngine:
# The engine is stashed on app.state in the lifespan (see main.py).
# A FastAPI dependency is how a route gets at it — a plain typed
# parameter would be treated as a query param, not injected.
return request.app.state.engine
@router.get("/healthz")
async def healthz() -> dict[str, str]:
# Liveness: no dependencies. If the loop runs, we're alive.
return {"status": "ok"}
@router.get("/readyz")
async def readyz(
response: Response,
engine: Annotated[AsyncEngine, Depends(get_engine)],
) -> dict[str, object]:
checks: dict[str, str] = {}
ready = True
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
checks["postgres"] = "ok"
except Exception as exc: # noqa: BLE001 — readiness must never raise
checks["postgres"] = f"down: {exc}"
ready = False
if not ready:
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
return {"ready": ready, "checks": checks}

On SIGTERM, you want in-flight requests to finish, the trace exporter to flush its buffer, and pools to close — then exit. uvicorn handles connection draining; you own resource cleanup via FastAPI’s lifespan:

app/main.py (lifespan)
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine
from app.tracing import configure_tracing
@asynccontextmanager
async def lifespan(app):
engine = create_async_engine(...)
app.state.engine = engine
# configure_tracing returns the concrete TracerProvider it created, so
# we can call .shutdown() on *that* — get_tracer_provider() may hand back
# the API's no-op provider, which has no shutdown() (AttributeError).
provider = configure_tracing(app, service_name="api", otlp_endpoint=...)
yield
# Shutdown: flush pending spans, then close pools.
await engine.dispose()
provider.shutdown() # forces BatchSpanProcessor to flush

Profiling — py-spy. When a service is slow or pinning a CPU in production, py-spy is the pprof of Python: a sampling profiler that attaches to a running process by PID with no code changes and no restart. py-spy dump --pid 1234 prints every thread’s stack right now; py-spy top --pid 1234 gives a live top-style view; py-spy record -o flame.svg --pid 1234 produces a flamegraph. Because it reads memory from outside the process, it adds near-zero overhead — safe to run against prod.

Error tracking — Sentry. Metrics tell you the error rate; Sentry tells you the error. sentry-sdk captures unhandled exceptions with full stack traces, local variables, request context, and release/version — and its FastAPI integration wires in with a single sentry_sdk.init(dsn=..., traces_sample_rate=0.1). It also reads the OTel trace context, so a Sentry issue links back to the trace. Same role as @sentry/node or sentry-go.

You can’t instrument everything on day one, and you shouldn’t try. Priority order for a new service:

  1. Structured JSON logs with a request ID. Highest value, lowest effort. The moment you can grep one correlation ID across a request, debugging changes.
  2. RED metrics on HTTP — the request counter + latency histogram from the middleware above. That’s your “is it up, is it fast, is it erroring” dashboard.
  3. Health endpoints/healthz and /readyz, because K8s needs them to route traffic sanely.
  4. Tracing, auto-instrumented. Turn it on; you don’t need manual spans yet. The framework + DB + HTTP spans alone explain most “why is this endpoint slow.”
  5. Manual spans and custom business metrics — last, and only where a real question demands them. Don’t pre-instrument code nobody is asking about.

The discipline that matters most isn’t adding telemetry — it’s cardinality and cost. Sampled traces (1–10% in prod), low-cardinality metric labels, and logs at INFO (not DEBUG) in production keep your observability bill smaller than your compute bill. Telemetry you can’t afford to keep is telemetry you don’t have.

ConcernTypeScriptGoPython
Structured loggingpinoslog / zapstructlog (JSON renderer)
Log contextAsyncLocalStoragecontext.Contextcontextvars + bind_contextvars
Metricsprom-clientclient_golangprometheus-client
/metricscustom routepromhttp.Handler()make_asgi_app() mount
Tracing@opentelemetry/sdk-nodego.opentelemetry.io/otelopentelemetry-sdk + -instrumentation-*
Auto-instrumentauto-instrumentations-nodemostly manualopentelemetry-instrument (zero code)
Trace/log linkinject trace_idinject trace_idstructlog processor reads active span
Profilingclinic / 0xpprofpy-spy (no restart)
Error tracking@sentry/nodesentry-gosentry-sdk

Wire all three pillars into one FastAPI service — structured logs with a request ID, a Prometheus /metrics endpoint, OpenTelemetry traces exported to a collector, and health probes — then watch a request flow through every pillar at once.