Notification Service
Build a production-shaped notification microservice that uses every major piece of
the guide: a FastAPI REST API behind JWT auth, validating with Pydantic
v2, persisting to Postgres via async SQLAlchemy 2.0 (migrated with
Alembic), publishing events to Kafka with aiokafka; a separate worker
that consumes those events, simulates delivery per channel, retries transient
failures and parks poison messages on a DLQ; Redis for a sliding-window rate
limiter; structlog + Prometheus + OpenTelemetry for observability; a pytest +
Testcontainers suite; and a uv multi-stage Dockerfile plus a compose stack.
The API and the worker are one uv package with two entry points. This is the same
shape as a real service you’d run — accept fast over HTTP, deliver slowly off a queue.
What you’ll practice
Section titled “What you’ll practice”- Scaffolding a real multi-edge service with
uv— one package, two entry points. - Modeling a discriminated union of notification channels with Pydantic v2.
- Async SQLAlchemy 2.0 + Alembic, with the repository as the only SQL seam.
- A JWT dependency (
pyjwt) + argon2 hashing, and RBAC in a FastAPI dependency. - A Redis sliding-window rate limiter (
redis.asyncio). - An aiokafka producer in the FastAPI lifespan, and a separate worker with bounded retries and a dead-letter queue.
- structlog, Prometheus, and OpenTelemetry wiring that the API and worker share.
- Unit + Testcontainers integration tests, and a multi-stage
uvDocker image.
Requirements
Section titled “Requirements”A uv project named notification-service. Postgres, Redis, and Kafka from the
shared infra. Two runnable commands: uv run app (the API) and uv run worker. A
POST /v1/notifications that returns 202 with a QUEUED notification, and a
worker that flips it to SENT. Everything async, typed, and linted with ruff + ty.
The worked solution
Section titled “The worked solution”Directorynotification-service/
- pyproject.toml
- alembic.ini
- Dockerfile
- docker-compose.yml
Directorysrc/app/
- config.py
Directorymodels/
- schemas.py
- domain.py
Directorydb/
- engine.py
- tables.py
- repository.py
- migrations/versions/0001_init.py
Directoryauth/
- jwt.py
- deps.py
- cache/redis.py
Directoryevents/
- schemas.py
- producer.py
- consumer.py
Directoryobservability/
- logging.py
- metrics.py
- tracing.py
Directoryapi/
- main.py
- routes.py
- service.py
- workers/main.py
Directorytests/
- conftest.py
- test_service.py
- test_api.py
1. Project & tooling
Section titled “1. Project & tooling”One uv init, then add the whole stack. Note the two [project.scripts] — that’s
how uv run app and uv run worker work.
uv init --package notification-service && cd notification-serviceuv add fastapi uvicorn sqlalchemy asyncpg alembic aiokafka redis \ pyjwt argon2-cffi pydantic-settings structlog prometheus-client \ opentelemetry-distrouv add --dev pytest pytest-asyncio testcontainers[postgres] httpx ruff ty[project]name = "notification-service"version = "0.1.0"requires-python = ">=3.13"dependencies = [ "fastapi", "uvicorn[standard]", "sqlalchemy[asyncio]", "asyncpg", "alembic", "aiokafka", "redis", "pyjwt", "argon2-cffi", "pydantic-settings", "structlog", "prometheus-client", "opentelemetry-distro",]
[project.scripts]app = "app.api.main:run" # uv run appworker = "app.workers.main:run" # uv run workerapp-token = "app.auth.jwt:mint_cli" # dev helper to print a JWT
[tool.ruff]target-version = "py313"[tool.ruff.lint]select = ["E", "F", "I", "UP", "B", "ASYNC"] # incl. async-lint + pyupgrade
[tool.ty]# ty is the fast Astral type checker; `uv run ty check src`. mypy is the# mature equivalent if you need a plugin ty doesn't have yet.
[tool.pytest.ini_options]asyncio_mode = "auto"2. Settings — one source of config
Section titled “2. Settings — one source of config”pydantic-settings (Module 04) reads from the environment with typed defaults. The
shared-infra values are the local defaults; production overrides via env vars.
from functools import lru_cachefrom pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings): model_config = SettingsConfigDict(env_prefix="NS_", env_file=".env")
database_url: str = "postgresql+asyncpg://dev:dev@localhost:5432/app" redis_url: str = "redis://localhost:6379/0" kafka_bootstrap: str = "localhost:29092" kafka_topic: str = "notifications" kafka_dlq_topic: str = "notifications.DLQ"
jwt_secret: str = "dev-only-change-me-256-bits-minimum-please" # override in prod! jwt_algorithm: str = "HS256" jwt_ttl_seconds: int = 3600
rate_limit_per_hour: int = 100 max_retries: int = 3 service_name: str = "notification-service"
@lru_cachedef settings() -> Settings: return Settings()3. Domain & API models (Pydantic v2)
Section titled “3. Domain & API models (Pydantic v2)”The instructive bit is the discriminated union of channels: each channel carries
its own fields, and Pydantic dispatches on the type literal. Adding a channel is a
new class, and match over it stays exhaustive. The status is a StrEnum (Module 02).
from enum import StrEnum
class NotificationStatus(StrEnum): PENDING = "pending" # persisted, not yet queued QUEUED = "queued" # published to Kafka SENT = "sent" # worker delivered it FAILED = "failed" # delivery failed permanently CANCELLED = "cancelled"
# Lifecycle rule lives with the domain, not scattered in services.def can_retry(status: NotificationStatus, retry_count: int, max_retries: int) -> bool: return status == NotificationStatus.FAILED and retry_count < max_retriesfrom typing import Annotated, Literalfrom pydantic import BaseModel, Fieldfrom uuid import UUIDfrom datetime import datetimefrom .domain import NotificationStatus
class EmailChannel(BaseModel): type: Literal["email"] = "email" to: str subject: str = Field(min_length=1, max_length=200)
class SmsChannel(BaseModel): type: Literal["sms"] = "sms" phone_number: str = Field(pattern=r"^\+[1-9]\d{6,14}$")
class PushChannel(BaseModel): type: Literal["push"] = "push" device_token: str title: str
# Pydantic dispatches on `type` — invalid/unknown channels are 422'd for free.Channel = Annotated[ EmailChannel | SmsChannel | PushChannel, Field(discriminator="type"),]
class CreateNotificationRequest(BaseModel): user_id: str channel: Channel body: str = Field(min_length=1, max_length=4000) metadata: dict[str, str] = Field(default_factory=dict)
class NotificationResponse(BaseModel): id: UUID user_id: str channel: Channel body: str status: NotificationStatus retry_count: int created_at: datetime sent_at: datetime | None = None4. Database — async SQLAlchemy 2.0 + Alembic
Section titled “4. Database — async SQLAlchemy 2.0 + Alembic”The engine and session factory (Module 09). The channel is stored as a type column
plus a JSONB blob, so the discriminated union round-trips without a column per channel.
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_enginefrom app.config import settings
engine = create_async_engine(settings().database_url, pool_size=10, pool_pre_ping=True)Session = async_sessionmaker(engine, expire_on_commit=False)from datetime import datetime, UTCfrom uuid import UUID, uuid4from sqlalchemy import String, Integer, Text, DateTime, funcfrom sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUIDfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnfrom app.models.domain import NotificationStatus
class Base(DeclarativeBase): ...
class NotificationRow(Base): __tablename__ = "notifications" id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4) user_id: Mapped[str] = mapped_column(String(255), index=True) channel_type: Mapped[str] = mapped_column(String(32)) channel_data: Mapped[dict] = mapped_column(JSONB) body: Mapped[str] = mapped_column(Text) status: Mapped[str] = mapped_column(String(32), default=NotificationStatus.PENDING, index=True) metadata_: Mapped[dict] = mapped_column("metadata", JSONB, default=dict) retry_count: Mapped[int] = mapped_column(Integer, default=0) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)The repository is the only place SQL lives. Everything else speaks Pydantic.
from uuid import UUIDfrom sqlalchemy import select, updatefrom app.db.engine import Sessionfrom app.db.tables import NotificationRowfrom app.models.schemas import CreateNotificationRequest, NotificationResponsefrom app.models.domain import NotificationStatus
class NotificationRepository: async def create(self, req: CreateNotificationRequest) -> NotificationResponse: async with Session() as s, s.begin(): row = NotificationRow( user_id=req.user_id, channel_type=req.channel.type, channel_data=req.channel.model_dump(), body=req.body, metadata_=req.metadata, ) s.add(row) await s.flush() return self._to_response(row)
async def set_status(self, id: UUID, status: NotificationStatus, *, sent_at=None) -> None: async with Session() as s, s.begin(): await s.execute( update(NotificationRow).where(NotificationRow.id == id) .values(status=status, sent_at=sent_at) )
async def get(self, id: UUID) -> NotificationResponse | None: async with Session() as s: row = await s.get(NotificationRow, id) return self._to_response(row) if row else None
@staticmethod def _to_response(row: NotificationRow) -> NotificationResponse: return NotificationResponse( id=row.id, user_id=row.user_id, channel={"type": row.channel_type, **row.channel_data}, body=row.body, status=row.status, retry_count=row.retry_count, created_at=row.created_at, sent_at=row.sent_at, )Schema is owned by Alembic, not by create_all. Generate with
uv run alembic revision --autogenerate -m init; the result is roughly:
def upgrade() -> None: op.create_table( "notifications", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("user_id", sa.String(255), nullable=False), sa.Column("channel_type", sa.String(32), nullable=False), sa.Column("channel_data", postgresql.JSONB, nullable=False), sa.Column("body", sa.Text, nullable=False), sa.Column("status", sa.String(32), nullable=False, server_default="pending"), sa.Column("metadata", postgresql.JSONB, nullable=False, server_default="{}"), sa.Column("retry_count", sa.Integer, nullable=False, server_default="0"), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), sa.Column("sent_at", sa.DateTime(timezone=True), nullable=True), ) op.create_index("ix_notifications_user_id", "notifications", ["user_id"]) op.create_index("ix_notifications_status", "notifications", ["status"])5. Auth — JWT + argon2 (Module 13)
Section titled “5. Auth — JWT + argon2 (Module 13)”pyjwt signs and verifies; argon2-cffi hashes passwords (you’d verify against a
users table — shown minimal here). The dev CLI mints a token so you can curl.
import sys, time, jwtfrom argon2 import PasswordHasherfrom argon2.exceptions import VerifyMismatchErrorfrom app.config import settings
ph = PasswordHasher() # argon2id — the right default; never sha/md5 a password
def hash_password(pw: str) -> str: return ph.hash(pw)def verify_password(hash_: str, pw: str) -> bool: try: return ph.verify(hash_, pw) except VerifyMismatchError: return False
def mint(user_id: str, role: str = "user") -> str: s = settings() now = int(time.time()) return jwt.encode( {"sub": user_id, "role": role, "iat": now, "exp": now + s.jwt_ttl_seconds}, s.jwt_secret, algorithm=s.jwt_algorithm, )
def decode(token: str) -> dict: s = settings() return jwt.decode(token, s.jwt_secret, algorithms=[s.jwt_algorithm])
def mint_cli() -> None: # uv run app-token --user user-1 --role user import argparse p = argparse.ArgumentParser() p.add_argument("--user", required=True); p.add_argument("--role", default="user") a = p.parse_args(sys.argv[1:]) print(mint(a.user, a.role))The FastAPI dependency turns a Bearer token into a CurrentUser. This is the
injection point routes depend on — no global state.
from dataclasses import dataclassfrom typing import Annotatedimport jwtfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsfrom app.auth.jwt import decode
@dataclassclass CurrentUser: user_id: str role: str
_bearer = HTTPBearer()
def current_user(cred: Annotated[HTTPAuthorizationCredentials, Depends(_bearer)]) -> CurrentUser: try: payload = decode(cred.credentials) except jwt.InvalidTokenError: raise HTTPException( status.HTTP_401_UNAUTHORIZED, "invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) return CurrentUser(user_id=payload["sub"], role=payload.get("role", "user"))
CurrentUserDep = Annotated[CurrentUser, Depends(current_user)]6. Redis sliding-window rate limiter (Module 10)
Section titled “6. Redis sliding-window rate limiter (Module 10)”A sorted set per user, scored by timestamp. The check-and-add must be atomic, so it runs as a single Lua script: prune the old window, count, then reject-or-add. A pipeline can’t do this (the count→decision→add gap races under concurrency, and a score-based “roll back” would clobber other requests landing in the same instant).
import time, uuidfrom redis.asyncio import Redisfrom app.config import settings
redis = Redis.from_url(settings().redis_url, decode_responses=True)
# KEYS[1] = zset key; ARGV: now_ms, window_ms, limit, member, ttl_secondsSLIDING_WINDOW_LUA = """local now = tonumber(ARGV[1])local window = tonumber(ARGV[2])local limit = tonumber(ARGV[3])redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - window)local count = redis.call('ZCARD', KEYS[1])if count + 1 > limit then return 0endredis.call('ZADD', KEYS[1], now, ARGV[4])redis.call('EXPIRE', KEYS[1], ARGV[5])return 1"""
class RateLimiter: WINDOW = 3600 # 1 hour
def __init__(self, max_per_hour: int) -> None: self.max = max_per_hour
async def allow(self, user_id: str) -> bool: key = f"rl:notify:{user_id}" now_ms = int(time.time() * 1000) member = f"{now_ms}-{uuid.uuid4()}" # unique per request allowed = await redis.eval( SLIDING_WINDOW_LUA, 1, key, now_ms, self.WINDOW * 1000, self.max, member, self.WINDOW, ) return bool(allowed)7. Kafka — producer in the lifespan, worker with retry + DLQ (Module 11)
Section titled “7. Kafka — producer in the lifespan, worker with retry + DLQ (Module 11)”The event envelope is a versioned Pydantic model — never publish raw domain rows.
from pydantic import BaseModelfrom uuid import UUIDfrom app.models.schemas import Channel
class NotificationQueued(BaseModel): version: int = 1 event_type: str = "notification.queued" notification_id: UUID user_id: str channel: Channel body: strThe producer is created and closed in the FastAPI lifespan, configured
idempotent (acks=all, enable_idempotence=True) so retries don’t duplicate.
from aiokafka import AIOKafkaProducerfrom app.config import settingsfrom app.events.schemas import NotificationQueued
class Producer: def __init__(self) -> None: self._p: AIOKafkaProducer | None = None
async def start(self) -> None: self._p = AIOKafkaProducer( bootstrap_servers=settings().kafka_bootstrap, enable_idempotence=True, acks="all", ) await self._p.start()
async def stop(self) -> None: if self._p: await self._p.stop()
async def publish(self, event: NotificationQueued) -> None: assert self._p is not None await self._p.send_and_wait( settings().kafka_topic, key=event.user_id.encode(), # partition by user value=event.model_dump_json().encode(), )
producer = Producer()The worker is the second half of the system: an async for poll loop (Module 06)
that simulates delivery, retries transient failures with backoff, and republishes
poison/exhausted messages to the DLQ instead of blocking the partition.
import asynciofrom datetime import datetime, UTCfrom aiokafka import AIOKafkaConsumer, AIOKafkaProducerfrom app.config import settingsfrom app.events.schemas import NotificationQueuedfrom app.db.repository import NotificationRepositoryfrom app.models.domain import NotificationStatusfrom app.observability.logging import logfrom app.observability.metrics import metrics
class TransientError(Exception): ... # retry theseclass PermanentError(Exception): ... # straight to DLQ
async def deliver(event: NotificationQueued) -> None: # Simulated provider call. Real impl: a per-channel adapter behind a Protocol, # wrapped in `async with asyncio.timeout(5):`. match event.channel.type: case "email" | "sms" | "push": await asyncio.sleep(0.05) # pretend network case _: raise PermanentError(f"unknown channel {event.channel.type}")
async def run_consumer() -> None: cfg = settings() repo = NotificationRepository() consumer = AIOKafkaConsumer( cfg.kafka_topic, bootstrap_servers=cfg.kafka_bootstrap, group_id="notification-worker", enable_auto_commit=False, auto_offset_reset="earliest", ) dlq = AIOKafkaProducer(bootstrap_servers=cfg.kafka_bootstrap) await consumer.start(); await dlq.start() log.info("worker_started", topic=cfg.kafka_topic) try: async for msg in consumer: # backpressure for free event = NotificationQueued.model_validate_json(msg.value) try: await _process_with_retry(event, repo, cfg.max_retries) metrics.notifications_sent.labels(channel=event.channel.type).inc() except PermanentError as e: await _to_dlq(dlq, cfg.kafka_dlq_topic, msg, str(e)) await repo.set_status(event.notification_id, NotificationStatus.FAILED) metrics.notifications_failed.labels(reason="permanent").inc() finally: await consumer.commit() # commit only after handling finally: await consumer.stop(); await dlq.stop()
async def _process_with_retry(event, repo, max_retries: int) -> None: for attempt in range(max_retries): try: await deliver(event) await repo.set_status(event.notification_id, NotificationStatus.SENT, sent_at=datetime.now(UTC)) log.info("delivered", notification_id=str(event.notification_id)) return except TransientError: await asyncio.sleep(2 ** attempt) # exponential backoff raise PermanentError("retries exhausted") # -> DLQ
async def _to_dlq(dlq, topic: str, msg, reason: str) -> None: await dlq.send_and_wait( topic, key=msg.key, value=msg.value, headers=[("dlq-reason", reason.encode())], ) log.warning("sent_to_dlq", reason=reason)8. Observability (Module 15)
Section titled “8. Observability (Module 15)”structlog for JSON logs, prometheus-client for counters/histograms exposed at
/metrics, OpenTelemetry for traces. The API and worker import the same modules.
import structlog
def configure_logging() -> None: structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), # JSON to stdout for the platform ], )
log = structlog.get_logger()from prometheus_client import Counter
class _Metrics: notifications_created = Counter("notifications_created_total", "created", ["channel"]) notifications_sent = Counter("notifications_sent_total", "delivered", ["channel"]) notifications_failed = Counter("notifications_failed_total", "failed", ["reason"])
metrics = _Metrics()# `opentelemetry-distro` auto-instruments FastAPI, SQLAlchemy, aiokafka, and redis.# Zero-code path: run with `uv run opentelemetry-instrument uv run app` and set# OTEL_EXPORTER_OTLP_ENDPOINT. For explicit spans in the worker:from opentelemetry import tracetracer = trace.get_tracer("notification-service")9. The FastAPI app & endpoints (Module 07)
Section titled “9. The FastAPI app & endpoints (Module 07)”The app wires observability, mounts routes, exposes /metrics and /health, and
starts/stops the Kafka producer in its lifespan. Routes are thin; the service
orchestrates (the create() spine is shown on the capstone overview).
from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom prometheus_client import make_asgi_appfrom app.api.routes import routerfrom app.events.producer import producerfrom app.observability.logging import configure_logging, log
@asynccontextmanagerasync def lifespan(app: FastAPI): configure_logging() await producer.start() # Kafka producer lives as long as the app log.info("api_started") yield await producer.stop()
app = FastAPI(title="Notification Service", lifespan=lifespan)app.include_router(router, prefix="/v1")app.mount("/metrics", make_asgi_app()) # Prometheus scrape endpoint
@app.get("/health")async def health() -> dict[str, str]: return {"status": "ok"}
def run() -> None: # uv run app import uvicorn uvicorn.run("app.api.main:app", host="0.0.0.0", port=8000)from uuid import UUIDfrom fastapi import APIRouter, HTTPException, statusfrom app.auth.deps import CurrentUserDepfrom app.models.schemas import CreateNotificationRequest, NotificationResponsefrom app.api.service import NotificationService, Forbidden, RateLimited
router = APIRouter(tags=["notifications"])service = NotificationService()
@router.post("/notifications", status_code=status.HTTP_202_ACCEPTED)async def create(req: CreateNotificationRequest, user: CurrentUserDep) -> NotificationResponse: try: return await service.create(req, user) except Forbidden as e: raise HTTPException(status.HTTP_403_FORBIDDEN, str(e)) except RateLimited as e: raise HTTPException(status.HTTP_429_TOO_MANY_REQUESTS, str(e))
@router.get("/notifications/{notification_id}")async def get(notification_id: UUID, user: CurrentUserDep) -> NotificationResponse: n = await service.get(notification_id, user) if n is None: raise HTTPException(status.HTTP_404_NOT_FOUND, "not found") return nfrom uuid import UUIDfrom aiokafka.errors import KafkaErrorfrom app.auth.deps import CurrentUserfrom app.cache.redis import RateLimiterfrom app.config import settingsfrom app.db.repository import NotificationRepositoryfrom app.events.producer import producerfrom app.events.schemas import NotificationQueuedfrom app.models.domain import NotificationStatusfrom app.models.schemas import CreateNotificationRequest, NotificationResponsefrom app.observability.logging import logfrom app.observability.metrics import metrics
class Forbidden(Exception): ...class RateLimited(Exception): ...
class NotificationService: def __init__(self) -> None: self.repo = NotificationRepository() self.limiter = RateLimiter(settings().rate_limit_per_hour)
async def create(self, req: CreateNotificationRequest, caller: CurrentUser) -> NotificationResponse: if caller.role != "admin" and req.user_id != caller.user_id: raise Forbidden("cannot send notifications for other users") if not await self.limiter.allow(req.user_id): raise RateLimited(f"rate limit exceeded for {req.user_id}")
n = await self.repo.create(req) # persist PENDING first try: await producer.publish(NotificationQueued( notification_id=n.id, user_id=n.user_id, channel=n.channel, body=n.body, )) await self.repo.set_status(n.id, NotificationStatus.QUEUED) n.status = NotificationStatus.QUEUED except KafkaError: log.error("queue_failed", notification_id=str(n.id)) # stays PENDING, recoverable metrics.notifications_created.labels(channel=req.channel.type).inc() return n
async def get(self, id: UUID, caller: CurrentUser) -> NotificationResponse | None: n = await self.repo.get(id) if n and caller.role != "admin" and n.user_id != caller.user_id: raise Forbidden("not your notification") return n10. The worker entry point
Section titled “10. The worker entry point”import asynciofrom app.events.consumer import run_consumerfrom app.observability.logging import configure_logging
def run() -> None: # uv run worker configure_logging() asyncio.run(run_consumer())11. Tests (Module 12)
Section titled “11. Tests (Module 12)”A unit test of the orchestration logic with mocked infra — the rate limiter blocks before anything is persisted:
import pytestfrom unittest.mock import AsyncMockfrom app.api.service import NotificationService, RateLimitedfrom app.auth.deps import CurrentUserfrom app.models.schemas import CreateNotificationRequest, EmailChannel
@pytest.mark.asyncioasync def test_rate_limited_request_is_not_persisted(): svc = NotificationService() svc.limiter.allow = AsyncMock(return_value=False) svc.repo.create = AsyncMock()
req = CreateNotificationRequest( user_id="u1", channel=EmailChannel(to="a@b.com", subject="Hi"), body="hi") with pytest.raises(RateLimited): await svc.create(req, CurrentUser(user_id="u1", role="user")) svc.repo.create.assert_not_called() # never touched the DBAn integration test with Testcontainers: a real Postgres, the API driven
through httpx.AsyncClient over the ASGI transport — proving auth + persistence
actually work, and that one user can’t read another’s notification.
import pytestfrom httpx import AsyncClient, ASGITransportfrom app.api.main import appfrom app.auth.jwt import mint
@pytest.mark.asyncioasync def test_create_returns_queued(pg_container): # fixture starts Postgres + migrations token = mint("user-1", "user") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as c: r = await c.post("/v1/notifications", headers={"Authorization": f"Bearer {token}"}, json={"user_id": "user-1", "channel": {"type": "email", "to": "a@b.com", "subject": "Hi"}, "body": "Welcome!"}) assert r.status_code == 202 assert r.json()["status"] in {"queued", "pending"}
@pytest.mark.asyncioasync def test_cannot_read_other_users_notification(pg_container): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as c: token1 = mint("user-1", "user") created = await c.post("/v1/notifications", headers={"Authorization": f"Bearer {token1}"}, json={"user_id": "user-1", "channel": {"type": "email", "to": "a@b.com", "subject": "P"}, "body": "Private"}) nid = created.json()["id"] token2 = mint("user-2", "user") resp = await c.get(f"/v1/notifications/{nid}", headers={"Authorization": f"Bearer {token2}"}) assert resp.status_code == 403uv run pytest # alluv run pytest tests/test_service.py # unit only (no Docker)12. Deploy — multi-stage uv Docker + compose (Module 17)
Section titled “12. Deploy — multi-stage uv Docker + compose (Module 17)”A multi-stage build: a builder stage installs deps into a venv with uv sync,
the final slim stage copies just the venv and source, runs as non-root, and the
same image runs the API or the worker depending on the command.
# ---- builder ----FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS builderENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copyWORKDIR /appCOPY pyproject.toml uv.lock ./RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-install-project --no-devCOPY . .RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev
# ---- runtime ----FROM python:3.13-slim-bookwormRUN useradd -m -u 1000 appWORKDIR /appCOPY --from=builder --chown=app:app /app /appENV PATH="/app/.venv/bin:$PATH"USER appEXPOSE 8000# Override `command` in compose/K8s to `worker`. Default is the API.CMD ["app"]services: api: build: . command: ["app"] ports: ["8000:8000"] environment: NS_DATABASE_URL: postgresql+asyncpg://dev:dev@postgres:5432/app NS_REDIS_URL: redis://redis:6379/0 NS_KAFKA_BOOTSTRAP: kafka:9092 depends_on: [postgres, redis, kafka]
worker: build: . command: ["worker"] # same image, different entry point environment: NS_DATABASE_URL: postgresql+asyncpg://dev:dev@postgres:5432/app NS_KAFKA_BOOTSTRAP: kafka:9092 depends_on: [postgres, kafka] # postgres / redis / kafka: same definitions as shared-infraOn Kubernetes this is two Deployments off one image — args: ["app"] and
args: ["worker"] — scaled independently, with the worker replicas joining the same
Kafka consumer group so partitions rebalance across them. (See Module 17.)
Run it
Section titled “Run it”-
Start the backing stores:
Terminal window cd shared-infra && docker compose up -d # postgres :5432, redis :6379, kafka :29092 -
Migrate and run the API:
Terminal window cd notification-serviceuv run alembic upgrade headuv run app # http://localhost:8000/docs, /metrics -
In a second terminal, run the worker:
Terminal window uv run worker -
Mint a token and send one — watch the worker deliver it:
Terminal window TOKEN=$(uv run app-token --user user-1 --role user)curl -s -X POST http://localhost:8000/v1/notifications \-H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \-d '{"user_id":"user-1","channel":{"type":"email","to":"a@b.com","subject":"Hi"},"body":"Welcome!"}' | jq# -> 202, status "queued"; worker logs {"event":"delivered", ...} and flips it to "sent" -
Read it back, scrape metrics:
Terminal window curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8000/v1/notifications/<id> | jqcurl -s http://localhost:8000/metrics | grep notifications_