Skip to content

Notification Service

Build a production-shaped notification microservice that uses every major piece of the guide: a FastAPI REST API behind JWT auth, validating with Pydantic v2, persisting to Postgres via async SQLAlchemy 2.0 (migrated with Alembic), publishing events to Kafka with aiokafka; a separate worker that consumes those events, simulates delivery per channel, retries transient failures and parks poison messages on a DLQ; Redis for a sliding-window rate limiter; structlog + Prometheus + OpenTelemetry for observability; a pytest + Testcontainers suite; and a uv multi-stage Dockerfile plus a compose stack.

The API and the worker are one uv package with two entry points. This is the same shape as a real service you’d run — accept fast over HTTP, deliver slowly off a queue.

  1. Scaffolding a real multi-edge service with uv — one package, two entry points.
  2. Modeling a discriminated union of notification channels with Pydantic v2.
  3. Async SQLAlchemy 2.0 + Alembic, with the repository as the only SQL seam.
  4. A JWT dependency (pyjwt) + argon2 hashing, and RBAC in a FastAPI dependency.
  5. A Redis sliding-window rate limiter (redis.asyncio).
  6. An aiokafka producer in the FastAPI lifespan, and a separate worker with bounded retries and a dead-letter queue.
  7. structlog, Prometheus, and OpenTelemetry wiring that the API and worker share.
  8. Unit + Testcontainers integration tests, and a multi-stage uv Docker image.

A uv project named notification-service. Postgres, Redis, and Kafka from the shared infra. Two runnable commands: uv run app (the API) and uv run worker. A POST /v1/notifications that returns 202 with a QUEUED notification, and a worker that flips it to SENT. Everything async, typed, and linted with ruff + ty.

  • Directorynotification-service/
    • pyproject.toml
    • alembic.ini
    • Dockerfile
    • docker-compose.yml
    • Directorysrc/app/
      • config.py
      • Directorymodels/
        • schemas.py
        • domain.py
      • Directorydb/
        • engine.py
        • tables.py
        • repository.py
        • migrations/versions/0001_init.py
      • Directoryauth/
        • jwt.py
        • deps.py
      • cache/redis.py
      • Directoryevents/
        • schemas.py
        • producer.py
        • consumer.py
      • Directoryobservability/
        • logging.py
        • metrics.py
        • tracing.py
      • Directoryapi/
        • main.py
        • routes.py
        • service.py
      • workers/main.py
    • Directorytests/
      • conftest.py
      • test_service.py
      • test_api.py

One uv init, then add the whole stack. Note the two [project.scripts] — that’s how uv run app and uv run worker work.

Terminal window
uv init --package notification-service && cd notification-service
uv add fastapi uvicorn sqlalchemy asyncpg alembic aiokafka redis \
pyjwt argon2-cffi pydantic-settings structlog prometheus-client \
opentelemetry-distro
uv add --dev pytest pytest-asyncio testcontainers[postgres] httpx ruff ty
pyproject.toml
[project]
name = "notification-service"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
"fastapi", "uvicorn[standard]", "sqlalchemy[asyncio]", "asyncpg", "alembic",
"aiokafka", "redis", "pyjwt", "argon2-cffi", "pydantic-settings",
"structlog", "prometheus-client", "opentelemetry-distro",
]
[project.scripts]
app = "app.api.main:run" # uv run app
worker = "app.workers.main:run" # uv run worker
app-token = "app.auth.jwt:mint_cli" # dev helper to print a JWT
[tool.ruff]
target-version = "py313"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "ASYNC"] # incl. async-lint + pyupgrade
[tool.ty]
# ty is the fast Astral type checker; `uv run ty check src`. mypy is the
# mature equivalent if you need a plugin ty doesn't have yet.
[tool.pytest.ini_options]
asyncio_mode = "auto"

pydantic-settings (Module 04) reads from the environment with typed defaults. The shared-infra values are the local defaults; production overrides via env vars.

src/app/config.py
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="NS_", env_file=".env")
database_url: str = "postgresql+asyncpg://dev:dev@localhost:5432/app"
redis_url: str = "redis://localhost:6379/0"
kafka_bootstrap: str = "localhost:29092"
kafka_topic: str = "notifications"
kafka_dlq_topic: str = "notifications.DLQ"
jwt_secret: str = "dev-only-change-me-256-bits-minimum-please" # override in prod!
jwt_algorithm: str = "HS256"
jwt_ttl_seconds: int = 3600
rate_limit_per_hour: int = 100
max_retries: int = 3
service_name: str = "notification-service"
@lru_cache
def settings() -> Settings:
return Settings()

The instructive bit is the discriminated union of channels: each channel carries its own fields, and Pydantic dispatches on the type literal. Adding a channel is a new class, and match over it stays exhaustive. The status is a StrEnum (Module 02).

src/app/models/domain.py
from enum import StrEnum
class NotificationStatus(StrEnum):
PENDING = "pending" # persisted, not yet queued
QUEUED = "queued" # published to Kafka
SENT = "sent" # worker delivered it
FAILED = "failed" # delivery failed permanently
CANCELLED = "cancelled"
# Lifecycle rule lives with the domain, not scattered in services.
def can_retry(status: NotificationStatus, retry_count: int, max_retries: int) -> bool:
return status == NotificationStatus.FAILED and retry_count < max_retries
src/app/models/schemas.py
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from uuid import UUID
from datetime import datetime
from .domain import NotificationStatus
class EmailChannel(BaseModel):
type: Literal["email"] = "email"
to: str
subject: str = Field(min_length=1, max_length=200)
class SmsChannel(BaseModel):
type: Literal["sms"] = "sms"
phone_number: str = Field(pattern=r"^\+[1-9]\d{6,14}$")
class PushChannel(BaseModel):
type: Literal["push"] = "push"
device_token: str
title: str
# Pydantic dispatches on `type` — invalid/unknown channels are 422'd for free.
Channel = Annotated[
EmailChannel | SmsChannel | PushChannel,
Field(discriminator="type"),
]
class CreateNotificationRequest(BaseModel):
user_id: str
channel: Channel
body: str = Field(min_length=1, max_length=4000)
metadata: dict[str, str] = Field(default_factory=dict)
class NotificationResponse(BaseModel):
id: UUID
user_id: str
channel: Channel
body: str
status: NotificationStatus
retry_count: int
created_at: datetime
sent_at: datetime | None = None

4. Database — async SQLAlchemy 2.0 + Alembic

Section titled “4. Database — async SQLAlchemy 2.0 + Alembic”

The engine and session factory (Module 09). The channel is stored as a type column plus a JSONB blob, so the discriminated union round-trips without a column per channel.

src/app/db/engine.py
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from app.config import settings
engine = create_async_engine(settings().database_url, pool_size=10, pool_pre_ping=True)
Session = async_sessionmaker(engine, expire_on_commit=False)
src/app/db/tables.py
from datetime import datetime, UTC
from uuid import UUID, uuid4
from sqlalchemy import String, Integer, Text, DateTime, func
from sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from app.models.domain import NotificationStatus
class Base(DeclarativeBase): ...
class NotificationRow(Base):
__tablename__ = "notifications"
id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id: Mapped[str] = mapped_column(String(255), index=True)
channel_type: Mapped[str] = mapped_column(String(32))
channel_data: Mapped[dict] = mapped_column(JSONB)
body: Mapped[str] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(32), default=NotificationStatus.PENDING, index=True)
metadata_: Mapped[dict] = mapped_column("metadata", JSONB, default=dict)
retry_count: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)

The repository is the only place SQL lives. Everything else speaks Pydantic.

src/app/db/repository.py
from uuid import UUID
from sqlalchemy import select, update
from app.db.engine import Session
from app.db.tables import NotificationRow
from app.models.schemas import CreateNotificationRequest, NotificationResponse
from app.models.domain import NotificationStatus
class NotificationRepository:
async def create(self, req: CreateNotificationRequest) -> NotificationResponse:
async with Session() as s, s.begin():
row = NotificationRow(
user_id=req.user_id,
channel_type=req.channel.type,
channel_data=req.channel.model_dump(),
body=req.body,
metadata_=req.metadata,
)
s.add(row)
await s.flush()
return self._to_response(row)
async def set_status(self, id: UUID, status: NotificationStatus, *, sent_at=None) -> None:
async with Session() as s, s.begin():
await s.execute(
update(NotificationRow).where(NotificationRow.id == id)
.values(status=status, sent_at=sent_at)
)
async def get(self, id: UUID) -> NotificationResponse | None:
async with Session() as s:
row = await s.get(NotificationRow, id)
return self._to_response(row) if row else None
@staticmethod
def _to_response(row: NotificationRow) -> NotificationResponse:
return NotificationResponse(
id=row.id, user_id=row.user_id,
channel={"type": row.channel_type, **row.channel_data},
body=row.body, status=row.status, retry_count=row.retry_count,
created_at=row.created_at, sent_at=row.sent_at,
)

Schema is owned by Alembic, not by create_all. Generate with uv run alembic revision --autogenerate -m init; the result is roughly:

src/app/db/migrations/versions/0001_init.py
def upgrade() -> None:
op.create_table(
"notifications",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("user_id", sa.String(255), nullable=False),
sa.Column("channel_type", sa.String(32), nullable=False),
sa.Column("channel_data", postgresql.JSONB, nullable=False),
sa.Column("body", sa.Text, nullable=False),
sa.Column("status", sa.String(32), nullable=False, server_default="pending"),
sa.Column("metadata", postgresql.JSONB, nullable=False, server_default="{}"),
sa.Column("retry_count", sa.Integer, nullable=False, server_default="0"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("sent_at", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_notifications_user_id", "notifications", ["user_id"])
op.create_index("ix_notifications_status", "notifications", ["status"])

pyjwt signs and verifies; argon2-cffi hashes passwords (you’d verify against a users table — shown minimal here). The dev CLI mints a token so you can curl.

src/app/auth/jwt.py
import sys, time, jwt
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from app.config import settings
ph = PasswordHasher() # argon2id — the right default; never sha/md5 a password
def hash_password(pw: str) -> str: return ph.hash(pw)
def verify_password(hash_: str, pw: str) -> bool:
try: return ph.verify(hash_, pw)
except VerifyMismatchError: return False
def mint(user_id: str, role: str = "user") -> str:
s = settings()
now = int(time.time())
return jwt.encode(
{"sub": user_id, "role": role, "iat": now, "exp": now + s.jwt_ttl_seconds},
s.jwt_secret, algorithm=s.jwt_algorithm,
)
def decode(token: str) -> dict:
s = settings()
return jwt.decode(token, s.jwt_secret, algorithms=[s.jwt_algorithm])
def mint_cli() -> None: # uv run app-token --user user-1 --role user
import argparse
p = argparse.ArgumentParser()
p.add_argument("--user", required=True); p.add_argument("--role", default="user")
a = p.parse_args(sys.argv[1:])
print(mint(a.user, a.role))

The FastAPI dependency turns a Bearer token into a CurrentUser. This is the injection point routes depend on — no global state.

src/app/auth/deps.py
from dataclasses import dataclass
from typing import Annotated
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.auth.jwt import decode
@dataclass
class CurrentUser:
user_id: str
role: str
_bearer = HTTPBearer()
def current_user(cred: Annotated[HTTPAuthorizationCredentials, Depends(_bearer)]) -> CurrentUser:
try:
payload = decode(cred.credentials)
except jwt.InvalidTokenError:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
return CurrentUser(user_id=payload["sub"], role=payload.get("role", "user"))
CurrentUserDep = Annotated[CurrentUser, Depends(current_user)]

6. Redis sliding-window rate limiter (Module 10)

Section titled “6. Redis sliding-window rate limiter (Module 10)”

A sorted set per user, scored by timestamp. The check-and-add must be atomic, so it runs as a single Lua script: prune the old window, count, then reject-or-add. A pipeline can’t do this (the count→decision→add gap races under concurrency, and a score-based “roll back” would clobber other requests landing in the same instant).

src/app/cache/redis.py
import time, uuid
from redis.asyncio import Redis
from app.config import settings
redis = Redis.from_url(settings().redis_url, decode_responses=True)
# KEYS[1] = zset key; ARGV: now_ms, window_ms, limit, member, ttl_seconds
SLIDING_WINDOW_LUA = """
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - window)
local count = redis.call('ZCARD', KEYS[1])
if count + 1 > limit then
return 0
end
redis.call('ZADD', KEYS[1], now, ARGV[4])
redis.call('EXPIRE', KEYS[1], ARGV[5])
return 1
"""
class RateLimiter:
WINDOW = 3600 # 1 hour
def __init__(self, max_per_hour: int) -> None:
self.max = max_per_hour
async def allow(self, user_id: str) -> bool:
key = f"rl:notify:{user_id}"
now_ms = int(time.time() * 1000)
member = f"{now_ms}-{uuid.uuid4()}" # unique per request
allowed = await redis.eval(
SLIDING_WINDOW_LUA, 1,
key, now_ms, self.WINDOW * 1000, self.max, member, self.WINDOW,
)
return bool(allowed)

7. Kafka — producer in the lifespan, worker with retry + DLQ (Module 11)

Section titled “7. Kafka — producer in the lifespan, worker with retry + DLQ (Module 11)”

The event envelope is a versioned Pydantic model — never publish raw domain rows.

src/app/events/schemas.py
from pydantic import BaseModel
from uuid import UUID
from app.models.schemas import Channel
class NotificationQueued(BaseModel):
version: int = 1
event_type: str = "notification.queued"
notification_id: UUID
user_id: str
channel: Channel
body: str

The producer is created and closed in the FastAPI lifespan, configured idempotent (acks=all, enable_idempotence=True) so retries don’t duplicate.

src/app/events/producer.py
from aiokafka import AIOKafkaProducer
from app.config import settings
from app.events.schemas import NotificationQueued
class Producer:
def __init__(self) -> None:
self._p: AIOKafkaProducer | None = None
async def start(self) -> None:
self._p = AIOKafkaProducer(
bootstrap_servers=settings().kafka_bootstrap,
enable_idempotence=True, acks="all",
)
await self._p.start()
async def stop(self) -> None:
if self._p: await self._p.stop()
async def publish(self, event: NotificationQueued) -> None:
assert self._p is not None
await self._p.send_and_wait(
settings().kafka_topic,
key=event.user_id.encode(), # partition by user
value=event.model_dump_json().encode(),
)
producer = Producer()

The worker is the second half of the system: an async for poll loop (Module 06) that simulates delivery, retries transient failures with backoff, and republishes poison/exhausted messages to the DLQ instead of blocking the partition.

src/app/events/consumer.py
import asyncio
from datetime import datetime, UTC
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.config import settings
from app.events.schemas import NotificationQueued
from app.db.repository import NotificationRepository
from app.models.domain import NotificationStatus
from app.observability.logging import log
from app.observability.metrics import metrics
class TransientError(Exception): ... # retry these
class PermanentError(Exception): ... # straight to DLQ
async def deliver(event: NotificationQueued) -> None:
# Simulated provider call. Real impl: a per-channel adapter behind a Protocol,
# wrapped in `async with asyncio.timeout(5):`.
match event.channel.type:
case "email" | "sms" | "push":
await asyncio.sleep(0.05) # pretend network
case _:
raise PermanentError(f"unknown channel {event.channel.type}")
async def run_consumer() -> None:
cfg = settings()
repo = NotificationRepository()
consumer = AIOKafkaConsumer(
cfg.kafka_topic, bootstrap_servers=cfg.kafka_bootstrap,
group_id="notification-worker", enable_auto_commit=False,
auto_offset_reset="earliest",
)
dlq = AIOKafkaProducer(bootstrap_servers=cfg.kafka_bootstrap)
await consumer.start(); await dlq.start()
log.info("worker_started", topic=cfg.kafka_topic)
try:
async for msg in consumer: # backpressure for free
event = NotificationQueued.model_validate_json(msg.value)
try:
await _process_with_retry(event, repo, cfg.max_retries)
metrics.notifications_sent.labels(channel=event.channel.type).inc()
except PermanentError as e:
await _to_dlq(dlq, cfg.kafka_dlq_topic, msg, str(e))
await repo.set_status(event.notification_id, NotificationStatus.FAILED)
metrics.notifications_failed.labels(reason="permanent").inc()
finally:
await consumer.commit() # commit only after handling
finally:
await consumer.stop(); await dlq.stop()
async def _process_with_retry(event, repo, max_retries: int) -> None:
for attempt in range(max_retries):
try:
await deliver(event)
await repo.set_status(event.notification_id, NotificationStatus.SENT,
sent_at=datetime.now(UTC))
log.info("delivered", notification_id=str(event.notification_id))
return
except TransientError:
await asyncio.sleep(2 ** attempt) # exponential backoff
raise PermanentError("retries exhausted") # -> DLQ
async def _to_dlq(dlq, topic: str, msg, reason: str) -> None:
await dlq.send_and_wait(
topic, key=msg.key,
value=msg.value,
headers=[("dlq-reason", reason.encode())],
)
log.warning("sent_to_dlq", reason=reason)

structlog for JSON logs, prometheus-client for counters/histograms exposed at /metrics, OpenTelemetry for traces. The API and worker import the same modules.

src/app/observability/logging.py
import structlog
def configure_logging() -> None:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(), # JSON to stdout for the platform
],
)
log = structlog.get_logger()
src/app/observability/metrics.py
from prometheus_client import Counter
class _Metrics:
notifications_created = Counter("notifications_created_total", "created", ["channel"])
notifications_sent = Counter("notifications_sent_total", "delivered", ["channel"])
notifications_failed = Counter("notifications_failed_total", "failed", ["reason"])
metrics = _Metrics()
src/app/observability/tracing.py
# `opentelemetry-distro` auto-instruments FastAPI, SQLAlchemy, aiokafka, and redis.
# Zero-code path: run with `uv run opentelemetry-instrument uv run app` and set
# OTEL_EXPORTER_OTLP_ENDPOINT. For explicit spans in the worker:
from opentelemetry import trace
tracer = trace.get_tracer("notification-service")

9. The FastAPI app & endpoints (Module 07)

Section titled “9. The FastAPI app & endpoints (Module 07)”

The app wires observability, mounts routes, exposes /metrics and /health, and starts/stops the Kafka producer in its lifespan. Routes are thin; the service orchestrates (the create() spine is shown on the capstone overview).

src/app/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from prometheus_client import make_asgi_app
from app.api.routes import router
from app.events.producer import producer
from app.observability.logging import configure_logging, log
@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging()
await producer.start() # Kafka producer lives as long as the app
log.info("api_started")
yield
await producer.stop()
app = FastAPI(title="Notification Service", lifespan=lifespan)
app.include_router(router, prefix="/v1")
app.mount("/metrics", make_asgi_app()) # Prometheus scrape endpoint
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
def run() -> None: # uv run app
import uvicorn
uvicorn.run("app.api.main:app", host="0.0.0.0", port=8000)
src/app/api/routes.py
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from app.auth.deps import CurrentUserDep
from app.models.schemas import CreateNotificationRequest, NotificationResponse
from app.api.service import NotificationService, Forbidden, RateLimited
router = APIRouter(tags=["notifications"])
service = NotificationService()
@router.post("/notifications", status_code=status.HTTP_202_ACCEPTED)
async def create(req: CreateNotificationRequest, user: CurrentUserDep) -> NotificationResponse:
try:
return await service.create(req, user)
except Forbidden as e:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(e))
except RateLimited as e:
raise HTTPException(status.HTTP_429_TOO_MANY_REQUESTS, str(e))
@router.get("/notifications/{notification_id}")
async def get(notification_id: UUID, user: CurrentUserDep) -> NotificationResponse:
n = await service.get(notification_id, user)
if n is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, "not found")
return n
src/app/api/service.py
from uuid import UUID
from aiokafka.errors import KafkaError
from app.auth.deps import CurrentUser
from app.cache.redis import RateLimiter
from app.config import settings
from app.db.repository import NotificationRepository
from app.events.producer import producer
from app.events.schemas import NotificationQueued
from app.models.domain import NotificationStatus
from app.models.schemas import CreateNotificationRequest, NotificationResponse
from app.observability.logging import log
from app.observability.metrics import metrics
class Forbidden(Exception): ...
class RateLimited(Exception): ...
class NotificationService:
def __init__(self) -> None:
self.repo = NotificationRepository()
self.limiter = RateLimiter(settings().rate_limit_per_hour)
async def create(self, req: CreateNotificationRequest, caller: CurrentUser) -> NotificationResponse:
if caller.role != "admin" and req.user_id != caller.user_id:
raise Forbidden("cannot send notifications for other users")
if not await self.limiter.allow(req.user_id):
raise RateLimited(f"rate limit exceeded for {req.user_id}")
n = await self.repo.create(req) # persist PENDING first
try:
await producer.publish(NotificationQueued(
notification_id=n.id, user_id=n.user_id, channel=n.channel, body=n.body,
))
await self.repo.set_status(n.id, NotificationStatus.QUEUED)
n.status = NotificationStatus.QUEUED
except KafkaError:
log.error("queue_failed", notification_id=str(n.id)) # stays PENDING, recoverable
metrics.notifications_created.labels(channel=req.channel.type).inc()
return n
async def get(self, id: UUID, caller: CurrentUser) -> NotificationResponse | None:
n = await self.repo.get(id)
if n and caller.role != "admin" and n.user_id != caller.user_id:
raise Forbidden("not your notification")
return n
src/app/workers/main.py
import asyncio
from app.events.consumer import run_consumer
from app.observability.logging import configure_logging
def run() -> None: # uv run worker
configure_logging()
asyncio.run(run_consumer())

A unit test of the orchestration logic with mocked infra — the rate limiter blocks before anything is persisted:

tests/test_service.py
import pytest
from unittest.mock import AsyncMock
from app.api.service import NotificationService, RateLimited
from app.auth.deps import CurrentUser
from app.models.schemas import CreateNotificationRequest, EmailChannel
@pytest.mark.asyncio
async def test_rate_limited_request_is_not_persisted():
svc = NotificationService()
svc.limiter.allow = AsyncMock(return_value=False)
svc.repo.create = AsyncMock()
req = CreateNotificationRequest(
user_id="u1", channel=EmailChannel(to="a@b.com", subject="Hi"), body="hi")
with pytest.raises(RateLimited):
await svc.create(req, CurrentUser(user_id="u1", role="user"))
svc.repo.create.assert_not_called() # never touched the DB

An integration test with Testcontainers: a real Postgres, the API driven through httpx.AsyncClient over the ASGI transport — proving auth + persistence actually work, and that one user can’t read another’s notification.

tests/test_api.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.api.main import app
from app.auth.jwt import mint
@pytest.mark.asyncio
async def test_create_returns_queued(pg_container): # fixture starts Postgres + migrations
token = mint("user-1", "user")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
r = await c.post("/v1/notifications",
headers={"Authorization": f"Bearer {token}"},
json={"user_id": "user-1",
"channel": {"type": "email", "to": "a@b.com", "subject": "Hi"},
"body": "Welcome!"})
assert r.status_code == 202
assert r.json()["status"] in {"queued", "pending"}
@pytest.mark.asyncio
async def test_cannot_read_other_users_notification(pg_container):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as c:
token1 = mint("user-1", "user")
created = await c.post("/v1/notifications",
headers={"Authorization": f"Bearer {token1}"},
json={"user_id": "user-1",
"channel": {"type": "email", "to": "a@b.com", "subject": "P"},
"body": "Private"})
nid = created.json()["id"]
token2 = mint("user-2", "user")
resp = await c.get(f"/v1/notifications/{nid}",
headers={"Authorization": f"Bearer {token2}"})
assert resp.status_code == 403
Terminal window
uv run pytest # all
uv run pytest tests/test_service.py # unit only (no Docker)

12. Deploy — multi-stage uv Docker + compose (Module 17)

Section titled “12. Deploy — multi-stage uv Docker + compose (Module 17)”

A multi-stage build: a builder stage installs deps into a venv with uv sync, the final slim stage copies just the venv and source, runs as non-root, and the same image runs the API or the worker depending on the command.

Dockerfile
# ---- builder ----
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS builder
ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-install-project --no-dev
COPY . .
RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev
# ---- runtime ----
FROM python:3.13-slim-bookworm
RUN useradd -m -u 1000 app
WORKDIR /app
COPY --from=builder --chown=app:app /app /app
ENV PATH="/app/.venv/bin:$PATH"
USER app
EXPOSE 8000
# Override `command` in compose/K8s to `worker`. Default is the API.
CMD ["app"]
docker-compose.yml (the app + worker; infra is shared-infra)
services:
api:
build: .
command: ["app"]
ports: ["8000:8000"]
environment:
NS_DATABASE_URL: postgresql+asyncpg://dev:dev@postgres:5432/app
NS_REDIS_URL: redis://redis:6379/0
NS_KAFKA_BOOTSTRAP: kafka:9092
depends_on: [postgres, redis, kafka]
worker:
build: .
command: ["worker"] # same image, different entry point
environment:
NS_DATABASE_URL: postgresql+asyncpg://dev:dev@postgres:5432/app
NS_KAFKA_BOOTSTRAP: kafka:9092
depends_on: [postgres, kafka]
# postgres / redis / kafka: same definitions as shared-infra

On Kubernetes this is two Deployments off one image — args: ["app"] and args: ["worker"] — scaled independently, with the worker replicas joining the same Kafka consumer group so partitions rebalance across them. (See Module 17.)

  1. Start the backing stores:

    Terminal window
    cd shared-infra && docker compose up -d # postgres :5432, redis :6379, kafka :29092
  2. Migrate and run the API:

    Terminal window
    cd notification-service
    uv run alembic upgrade head
    uv run app # http://localhost:8000/docs, /metrics
  3. In a second terminal, run the worker:

    Terminal window
    uv run worker
  4. Mint a token and send one — watch the worker deliver it:

    Terminal window
    TOKEN=$(uv run app-token --user user-1 --role user)
    curl -s -X POST http://localhost:8000/v1/notifications \
    -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
    -d '{"user_id":"user-1","channel":{"type":"email","to":"a@b.com","subject":"Hi"},"body":"Welcome!"}' | jq
    # -> 202, status "queued"; worker logs {"event":"delivered", ...} and flips it to "sent"
  5. Read it back, scrape metrics:

    Terminal window
    curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8000/v1/notifications/<id> | jq
    curl -s http://localhost:8000/metrics | grep notifications_