Skip to content

Order Pipeline + DLQ

Build an async order pipeline on aiokafka: a producer emits OrderPlaced events to an orders topic, a consumer reads them and updates an in-memory projection (a read model of order state), and any message that can’t be processed — bad data, or a transient failure that keeps failing — is retried and then parked on a dead letter queue (orders.DLQ) instead of blocking the partition. It’s the canonical “accept fast, process later, never silently drop or get stuck on a message” backend pattern.

If you’ve built a queue worker in TS (BullMQ) or Go (a channel + worker pool), this is the same shape — but Kafka gives you a durable, replayable log and consumer groups for horizontal scale, and the DLQ gives you a graveyard for poison messages.

  1. Modeling events as versioned Pydantic v2 models and (de)serializing to bytes.
  2. An AIOKafkaProducer that publishes keyed OrderPlaced events.
  3. An AIOKafkaConsumer with a consumer group, manual commit, and async for processing.
  4. Maintaining a projection (read model) and making the consumer idempotent.
  5. Retry with backoff for transient failures, and a dead letter queue for poison messages.
  6. Graceful shutdown on SIGINT/SIGTERM.

The producer and consumer never talk directly — Kafka’s log sits between them, so the producer returns immediately while processing happens in the worker. Failures are retried, then diverted to the DLQ; the projection is the queryable result.

Order pipeline topology
Rendering diagram…
  • The shared Kafka stack running (docker compose up -d kafka kafka-ui).
  • OrderPlaced carries an event_id (for idempotency), order_id (the partition key), user_id, amount, and a schema_version.
  • The producer keys by order_id so all events for one order stay ordered.
  • The consumer uses a named group, manual commit, and commits after processing (at-least-once).
  • Validation failures are poison → straight to the DLQ. Simulated transient failures retry with backoff, then go to the DLQ if still failing.
  • The consumer is idempotent — a redelivered event_id is a no-op.
  • Ctrl-C shuts the worker down cleanly (commits + leaves the group).

A tiny uv project: a single app package with the event model, producer, and consumer/worker.

  • Directoryorder-pipeline/
    • pyproject.toml uv project — aiokafka + pydantic
    • Directoryapp/
      • __init__.py
      • events.py OrderPlaced Pydantic model
      • producer.py emits events to the orders topic
      • consumer.py worker: consume → project → retry/DLQ, graceful shutdown
Terminal window
uv init order-pipeline && cd order-pipeline
uv add aiokafka pydantic
pyproject.toml
[project]
name = "order-pipeline"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
"aiokafka>=0.12",
"pydantic>=2.11",
]
[dependency-groups]
dev = ["ruff", "ty"]

A versioned Pydantic model. event_id is the idempotency key; extra="ignore" makes it forward-compatible (a newer producer’s extra field won’t break this consumer). model_dump_json() / model_validate_json() are the serialize / deserialize pair.

app/events.py
from datetime import datetime, timezone
from decimal import Decimal
from pydantic import BaseModel, ConfigDict, Field
def _now() -> datetime:
return datetime.now(timezone.utc)
class OrderPlaced(BaseModel):
model_config = ConfigDict(extra="ignore") # forward-compatible reads
event_id: str # idempotency key (unique per event)
order_id: str # partition key (ordering per order)
user_id: int
amount: Decimal
currency: str = "USD"
occurred_at: datetime = Field(default_factory=_now)
schema_version: int = 1

enable_idempotence=True (which implies acks="all") stops producer-side duplicates on retry. Run it standalone to pump a few events — including a couple of bad ones to exercise the DLQ path.

app/producer.py
import asyncio
import uuid
from decimal import Decimal
from aiokafka import AIOKafkaProducer
from app.events import OrderPlaced
BOOTSTRAP = "localhost:29092"
ORDERS_TOPIC = "orders"
async def publish(producer: AIOKafkaProducer, event: OrderPlaced) -> None:
await producer.send_and_wait(
ORDERS_TOPIC,
key=event.order_id.encode(), # same order_id -> same partition
value=event.model_dump_json().encode(), # Pydantic -> JSON -> bytes
)
print(f"published {event.event_id} order={event.order_id} amount={event.amount}")
async def main() -> None:
producer = AIOKafkaProducer(
bootstrap_servers=BOOTSTRAP,
acks="all",
enable_idempotence=True,
)
await producer.start()
try:
# three good orders
for i in range(3):
await publish(producer, OrderPlaced(
event_id=str(uuid.uuid4()),
order_id=f"order-{i}",
user_id=100 + i,
amount=Decimal("19.99"),
))
# a poison order: amount <= 0 fails validation -> DLQ
await publish(producer, OrderPlaced(
event_id=str(uuid.uuid4()),
order_id="order-bad",
user_id=1,
amount=Decimal("-5.00"),
))
# a "flaky" order whose user_id triggers a simulated transient failure
await publish(producer, OrderPlaced(
event_id=str(uuid.uuid4()),
order_id="order-flaky",
user_id=666,
amount=Decimal("42.00"),
))
finally:
await producer.stop()
if __name__ == "__main__":
asyncio.run(main())

The heart of the project. One async for loop does it all: deserialize (validate), process into the projection, classify failures, retry transient ones, divert poison to the DLQ, and always commit so the partition keeps flowing. A signal handler flips a stop event for graceful shutdown.

app/consumer.py
import asyncio
import signal
from decimal import Decimal
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.structs import ConsumerRecord
from pydantic import ValidationError
from app.events import OrderPlaced
BOOTSTRAP = "localhost:29092"
ORDERS_TOPIC = "orders"
DLQ_TOPIC = "orders.DLQ"
GROUP_ID = "order-processor"
MAX_RETRIES = 3
# The projection (read model) + the idempotency ledger. In production these would
# be Postgres tables; here they're dicts to keep the project tiny.
projection: dict[str, dict] = {}
processed_event_ids: set[str] = set()
class PoisonMessage(Exception):
"""Non-retryable — bad data. Goes straight to the DLQ."""
class TransientError(Exception):
"""Might succeed on retry (DB blip, network) — retry with backoff."""
def apply_to_projection(event: OrderPlaced) -> None:
# idempotent: a redelivered event_id is a no-op
if event.event_id in processed_event_ids:
print(f" duplicate {event.event_id}, skipping")
return
if event.amount <= Decimal("0"):
raise PoisonMessage(f"non-positive amount {event.amount}")
if event.user_id == 666: # simulate a flaky dependency
raise TransientError("downstream user-service unavailable")
projection[event.order_id] = {
"order_id": event.order_id,
"user_id": event.user_id,
"amount": str(event.amount),
"status": "PLACED",
}
processed_event_ids.add(event.event_id)
print(f" projected {event.order_id} -> {projection[event.order_id]['status']}")
async def process_with_retries(event: OrderPlaced) -> None:
delay = 0.5
for attempt in range(1, MAX_RETRIES + 1):
try:
apply_to_projection(event)
return
except TransientError as exc:
if attempt == MAX_RETRIES:
raise # exhausted -> caller sends to DLQ
print(f" transient ({exc}); retry {attempt}/{MAX_RETRIES} in {delay}s")
await asyncio.sleep(delay)
delay *= 2
async def to_dlq(producer: AIOKafkaProducer, msg: ConsumerRecord, reason: str) -> None:
await producer.send_and_wait(
DLQ_TOPIC,
key=msg.key,
value=msg.value, # preserve raw bytes
headers=[
("x-original-topic", msg.topic.encode()),
("x-original-offset", str(msg.offset).encode()),
("x-error", reason.encode()),
],
)
print(f" -> DLQ ({reason})")
async def run(stop: asyncio.Event) -> None:
consumer = AIOKafkaConsumer(
ORDERS_TOPIC,
bootstrap_servers=BOOTSTRAP,
group_id=GROUP_ID,
auto_offset_reset="earliest",
enable_auto_commit=False, # we commit manually, after work
)
producer = AIOKafkaProducer(bootstrap_servers=BOOTSTRAP, enable_idempotence=True)
await consumer.start()
await producer.start()
try:
async for msg in consumer:
if stop.is_set():
break
print(f"got offset={msg.offset} key={msg.key!r}")
try:
event = OrderPlaced.model_validate_json(msg.value) # may be poison
await process_with_retries(event)
except (PoisonMessage, ValidationError) as exc:
await to_dlq(producer, msg, f"poison: {exc}")
except Exception as exc: # retries exhausted
await to_dlq(producer, msg, f"exhausted: {exc}")
finally:
await consumer.commit() # ALWAYS advance — handled or parked
finally:
await producer.stop()
await consumer.stop() # commits + leaves group cleanly
async def main() -> None:
stop = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop.set)
print(f"worker up — group={GROUP_ID}, topic={ORDERS_TOPIC} (Ctrl-C to stop)")
await run(stop)
print(f"\nfinal projection: {len(projection)} orders")
if __name__ == "__main__":
asyncio.run(main())
  1. Start Kafka from the shared infra (one-time per session):

    Terminal window
    cd shared-infra && docker compose up -d kafka kafka-ui
  2. Lint and type-check (the modern Python loop):

    Terminal window
    uv run ruff check .
    uv run ty check .
  3. Start the worker — it joins the group and waits for messages:

    Terminal window
    uv run python -m app.consumer
  4. In a second terminal, fire the producer (3 good + 1 poison + 1 flaky):

    Terminal window
    uv run python -m app.producer
  5. Watch the worker terminal: the three good orders land in the projection, the negative-amount order goes straight to the DLQ as poison, and order-flaky retries with backoff before exhausting to the DLQ.

  6. Open Kafka UI at http://localhost:8090 and inspect the orders and orders.DLQ topics — the DLQ messages carry the x-error header explaining why.

  7. Stop the worker with Ctrl-C — it commits offsets and leaves the group cleanly (no waiting out the session timeout).