Order Pipeline + DLQ
Build an async order pipeline on aiokafka: a producer emits OrderPlaced
events to an orders topic, a consumer reads them and updates an in-memory
projection (a read model of order state), and any message that can’t be
processed — bad data, or a transient failure that keeps failing — is retried and
then parked on a dead letter queue (orders.DLQ) instead of blocking the
partition. It’s the canonical “accept fast, process later, never silently drop or
get stuck on a message” backend pattern.
If you’ve built a queue worker in TS (BullMQ) or Go (a channel + worker pool), this is the same shape — but Kafka gives you a durable, replayable log and consumer groups for horizontal scale, and the DLQ gives you a graveyard for poison messages.
What you’ll practice
Section titled “What you’ll practice”- Modeling events as versioned Pydantic v2 models and (de)serializing to bytes.
- An
AIOKafkaProducerthat publishes keyedOrderPlacedevents. - An
AIOKafkaConsumerwith a consumer group, manual commit, andasync forprocessing. - Maintaining a projection (read model) and making the consumer idempotent.
- Retry with backoff for transient failures, and a dead letter queue for poison messages.
- Graceful shutdown on SIGINT/SIGTERM.
Pipeline topology
Section titled “Pipeline topology”The producer and consumer never talk directly — Kafka’s log sits between them, so the producer returns immediately while processing happens in the worker. Failures are retried, then diverted to the DLQ; the projection is the queryable result.
flowchart LR P["producer.py<br/>emit OrderPlaced"] -->|"key=order_id"| T["orders topic<br/>P0 P1 P2 (keyed)"] T --> C["consumer.py (worker)<br/>async for msg:<br/>validate (Pydantic)<br/>update projection<br/>commit offset"] C -->|"poison"| DLQ["orders.DLQ topic"] DLQ -->|"inspect / replay"| C
Requirements
Section titled “Requirements”- The shared Kafka stack running (
docker compose up -d kafka kafka-ui). OrderPlacedcarries anevent_id(for idempotency),order_id(the partition key),user_id,amount, and aschema_version.- The producer keys by
order_idso all events for one order stay ordered. - The consumer uses a named group, manual commit, and commits after processing (at-least-once).
- Validation failures are poison → straight to the DLQ. Simulated transient failures retry with backoff, then go to the DLQ if still failing.
- The consumer is idempotent — a redelivered
event_idis a no-op. - Ctrl-C shuts the worker down cleanly (commits + leaves the group).
The worked solution
Section titled “The worked solution”A tiny uv project: a single app package with the event model, producer, and
consumer/worker.
Directoryorder-pipeline/
- pyproject.toml uv project — aiokafka + pydantic
Directoryapp/
- __init__.py
- events.py OrderPlaced Pydantic model
- producer.py emits events to the orders topic
- consumer.py worker: consume → project → retry/DLQ, graceful shutdown
Project setup: pyproject.toml
Section titled “Project setup: pyproject.toml”uv init order-pipeline && cd order-pipelineuv add aiokafka pydantic[project]name = "order-pipeline"version = "0.1.0"requires-python = ">=3.13"dependencies = [ "aiokafka>=0.12", "pydantic>=2.11",]
[dependency-groups]dev = ["ruff", "ty"]The event: app/events.py
Section titled “The event: app/events.py”A versioned Pydantic model. event_id is the idempotency key; extra="ignore"
makes it forward-compatible (a newer producer’s extra field won’t break this
consumer). model_dump_json() / model_validate_json() are the serialize /
deserialize pair.
from datetime import datetime, timezonefrom decimal import Decimalfrom pydantic import BaseModel, ConfigDict, Field
def _now() -> datetime: return datetime.now(timezone.utc)
class OrderPlaced(BaseModel): model_config = ConfigDict(extra="ignore") # forward-compatible reads
event_id: str # idempotency key (unique per event) order_id: str # partition key (ordering per order) user_id: int amount: Decimal currency: str = "USD" occurred_at: datetime = Field(default_factory=_now) schema_version: int = 1The producer: app/producer.py
Section titled “The producer: app/producer.py”enable_idempotence=True (which implies acks="all") stops producer-side
duplicates on retry. Run it standalone to pump a few events — including a couple of
bad ones to exercise the DLQ path.
import asyncioimport uuidfrom decimal import Decimal
from aiokafka import AIOKafkaProducer
from app.events import OrderPlaced
BOOTSTRAP = "localhost:29092"ORDERS_TOPIC = "orders"
async def publish(producer: AIOKafkaProducer, event: OrderPlaced) -> None: await producer.send_and_wait( ORDERS_TOPIC, key=event.order_id.encode(), # same order_id -> same partition value=event.model_dump_json().encode(), # Pydantic -> JSON -> bytes ) print(f"published {event.event_id} order={event.order_id} amount={event.amount}")
async def main() -> None: producer = AIOKafkaProducer( bootstrap_servers=BOOTSTRAP, acks="all", enable_idempotence=True, ) await producer.start() try: # three good orders for i in range(3): await publish(producer, OrderPlaced( event_id=str(uuid.uuid4()), order_id=f"order-{i}", user_id=100 + i, amount=Decimal("19.99"), )) # a poison order: amount <= 0 fails validation -> DLQ await publish(producer, OrderPlaced( event_id=str(uuid.uuid4()), order_id="order-bad", user_id=1, amount=Decimal("-5.00"), )) # a "flaky" order whose user_id triggers a simulated transient failure await publish(producer, OrderPlaced( event_id=str(uuid.uuid4()), order_id="order-flaky", user_id=666, amount=Decimal("42.00"), )) finally: await producer.stop()
if __name__ == "__main__": asyncio.run(main())The worker: app/consumer.py
Section titled “The worker: app/consumer.py”The heart of the project. One async for loop does it all: deserialize (validate),
process into the projection, classify failures, retry transient ones, divert poison
to the DLQ, and always commit so the partition keeps flowing. A signal handler
flips a stop event for graceful shutdown.
import asyncioimport signalfrom decimal import Decimal
from aiokafka import AIOKafkaConsumer, AIOKafkaProducerfrom aiokafka.structs import ConsumerRecordfrom pydantic import ValidationError
from app.events import OrderPlaced
BOOTSTRAP = "localhost:29092"ORDERS_TOPIC = "orders"DLQ_TOPIC = "orders.DLQ"GROUP_ID = "order-processor"MAX_RETRIES = 3
# The projection (read model) + the idempotency ledger. In production these would# be Postgres tables; here they're dicts to keep the project tiny.projection: dict[str, dict] = {}processed_event_ids: set[str] = set()
class PoisonMessage(Exception): """Non-retryable — bad data. Goes straight to the DLQ."""
class TransientError(Exception): """Might succeed on retry (DB blip, network) — retry with backoff."""
def apply_to_projection(event: OrderPlaced) -> None: # idempotent: a redelivered event_id is a no-op if event.event_id in processed_event_ids: print(f" duplicate {event.event_id}, skipping") return if event.amount <= Decimal("0"): raise PoisonMessage(f"non-positive amount {event.amount}") if event.user_id == 666: # simulate a flaky dependency raise TransientError("downstream user-service unavailable")
projection[event.order_id] = { "order_id": event.order_id, "user_id": event.user_id, "amount": str(event.amount), "status": "PLACED", } processed_event_ids.add(event.event_id) print(f" projected {event.order_id} -> {projection[event.order_id]['status']}")
async def process_with_retries(event: OrderPlaced) -> None: delay = 0.5 for attempt in range(1, MAX_RETRIES + 1): try: apply_to_projection(event) return except TransientError as exc: if attempt == MAX_RETRIES: raise # exhausted -> caller sends to DLQ print(f" transient ({exc}); retry {attempt}/{MAX_RETRIES} in {delay}s") await asyncio.sleep(delay) delay *= 2
async def to_dlq(producer: AIOKafkaProducer, msg: ConsumerRecord, reason: str) -> None: await producer.send_and_wait( DLQ_TOPIC, key=msg.key, value=msg.value, # preserve raw bytes headers=[ ("x-original-topic", msg.topic.encode()), ("x-original-offset", str(msg.offset).encode()), ("x-error", reason.encode()), ], ) print(f" -> DLQ ({reason})")
async def run(stop: asyncio.Event) -> None: consumer = AIOKafkaConsumer( ORDERS_TOPIC, bootstrap_servers=BOOTSTRAP, group_id=GROUP_ID, auto_offset_reset="earliest", enable_auto_commit=False, # we commit manually, after work ) producer = AIOKafkaProducer(bootstrap_servers=BOOTSTRAP, enable_idempotence=True) await consumer.start() await producer.start() try: async for msg in consumer: if stop.is_set(): break print(f"got offset={msg.offset} key={msg.key!r}") try: event = OrderPlaced.model_validate_json(msg.value) # may be poison await process_with_retries(event) except (PoisonMessage, ValidationError) as exc: await to_dlq(producer, msg, f"poison: {exc}") except Exception as exc: # retries exhausted await to_dlq(producer, msg, f"exhausted: {exc}") finally: await consumer.commit() # ALWAYS advance — handled or parked finally: await producer.stop() await consumer.stop() # commits + leaves group cleanly
async def main() -> None: stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set) print(f"worker up — group={GROUP_ID}, topic={ORDERS_TOPIC} (Ctrl-C to stop)") await run(stop) print(f"\nfinal projection: {len(projection)} orders")
if __name__ == "__main__": asyncio.run(main())Run it
Section titled “Run it”-
Start Kafka from the shared infra (one-time per session):
Terminal window cd shared-infra && docker compose up -d kafka kafka-ui -
Lint and type-check (the modern Python loop):
Terminal window uv run ruff check .uv run ty check . -
Start the worker — it joins the group and waits for messages:
Terminal window uv run python -m app.consumer -
In a second terminal, fire the producer (3 good + 1 poison + 1 flaky):
Terminal window uv run python -m app.producer -
Watch the worker terminal: the three good orders land in the projection, the negative-amount order goes straight to the DLQ as poison, and
order-flakyretries with backoff before exhausting to the DLQ. -
Open Kafka UI at
http://localhost:8090and inspect theordersandorders.DLQtopics — the DLQ messages carry thex-errorheader explaining why. -
Stop the worker with Ctrl-C — it commits offsets and leaves the group cleanly (no waiting out the session timeout).