Skip to content

Event-Driven with Kafka

You’ve driven Kafka from TypeScript (kafkajs) or Go (sarama / confluent-kafka-go). In Python the modern, async-native choice is aiokafka: an asyncio producer and consumer that plug straight into the async/await world you built in Async & Concurrency and Streams. No thread pools, no callback bridges — a consumer is literally async for msg in consumer.

This module is about the patterns, not just the API: the event-driven mental model, delivery semantics, dead letter queues, idempotent consumers, a sober look at event sourcing and exactly-once, and how to wire all of this into a FastAPI app without shooting yourself in the foot.

LibraryStyleUse it when
aiokafkaPure-Python, asyncio-nativeDefault for async apps (FastAPI, asyncio workers). What we teach here.
confluent-kafkaC wrapper (librdkafka), blockingHighest throughput, richest config; but its API is sync — you’d run it in a thread pool.
kafka-pythonPure-Python, syncLegacy/simple scripts. No async. Maintenance has been spotty.
Terminal window
uv add aiokafka pydantic

aiokafka reimplements the Kafka protocol on top of asyncio, so its calls are coroutines — await producer.send_and_wait(...), async for msg in consumer. That’s the whole reason to pick it over confluent-kafka in an async service: no event-loop-blocking C calls, no executor juggling.

The mental model: events vs request/response

Section titled “The mental model: events vs request/response”

Coming from HTTP/gRPC, your reflex is request/response: the caller waits for a reply, and the callee must be up right now. Event-driven flips that. A producer appends a fact to a log and moves on; consumers read it later, at their own pace, maybe more than once, maybe years later. The producer doesn’t know — or care — who reads it.

Request/response (HTTP, gRPC)Event-driven (Kafka)
CouplingCaller knows the calleeProducer doesn’t know consumers
TimingSynchronous — callee must be upAsynchronous — consumers catch up later
BackpressureCaller blocks / times outLog buffers; consumers pull at their pace
FailureRequest fails, caller retriesEvent persists; reprocess any time
Fan-outOne call, one handlerOne event, N independent consumer groups
Shape”Do this and tell me the result""This happened” (a fact in past tense)

The naming convention follows the model: events are facts in the past tenseOrderPlaced, PaymentCaptured, UserRegistered — not commands (PlaceOrder). A command can be rejected; a fact already happened.

Topics, partitions, consumer groups, offsets

Section titled “Topics, partitions, consumer groups, offsets”

Five concepts carry almost everything. If you’ve used Kafka from any language these are identical — only the client API changes.

ConceptWhat it isAnalogy
TopicNamed, append-only log of messagesA Go channel name / an event “table”
PartitionOne ordered shard of a topic. Order is guaranteed within a partition, not across.A single-writer log file
OffsetA message’s position in its partition (monotonic int)Array index / a bookmark
KeyOptional bytes; same key → same partition → orderedShard key
Consumer groupA set of consumers that share a topic’s partitions; each partition goes to exactly one memberA worker pool with sticky assignment

Two rules that explain most surprises:

  1. Ordering is per-partition. Want all events for one order processed in order? Key by order_id so they land on the same partition.
  2. Parallelism is capped by partition count. A group of 10 consumers on a 6-partition topic leaves 4 idle. Pick partitions for your target concurrency up front — they’re hard to change later.
Topics, partitions & consumer groups
Rendering diagram…

The offset is the only state a consumer keeps. “Where am I in each partition?” Commit it and a restart resumes there; reset it to replay. Everything about delivery guarantees comes down to when you commit relative to when you process.

Don’t ship raw dicts onto a topic — once a message is on the log, its shape is a contract other services depend on. Model events as Pydantic v2 (see Pydantic) and serialize with model_dump_json(), which gives you bytes-ready JSON and validation on the way back in.

app/events.py
from datetime import datetime, timezone
from decimal import Decimal
from pydantic import BaseModel, Field
def _now() -> datetime:
return datetime.now(timezone.utc)
class OrderPlaced(BaseModel):
event_id: str # unique per event — used for idempotency
order_id: str
user_id: int
amount: Decimal
currency: str = "USD"
occurred_at: datetime = Field(default_factory=_now)
schema_version: int = 1 # see "Schema discipline" below

The producer is an async object: await producer.start(), send, await producer.stop(). Serialization is just turning your model into bytes.

import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["localhost:29092"] });
const producer = kafka.producer({ idempotent: true });
await producer.connect();
await producer.send({
topic: "orders",
messages: [{ key: order.orderId, value: JSON.stringify(order) }],
acks: -1, // all replicas
});
await producer.disconnect();

Across all three you build the message and send it yourself — there’s no framework-magic serializer like the JVM world’s JsonSerializer. The Python difference is that everything is a coroutine: send_and_wait returns when the broker has acked, so an await failing is your delivery error.

The key decides the partition (partition = hash(key) % num_partitions). It’s how you get ordering where you need it.

KeyEffectUse for
Entity id (order_id, user_id)All events for that entity share a partition → orderedOrder/user lifecycles
NoneRound-robin across partitions, no orderingMetrics, independent events
Composite (f"{tenant}:{user}")Grouped orderingPer-tenant streams
# Ordered per order: every OrderPlaced/OrderPaid/OrderShipped for one order
# lands on the same partition and is consumed in sequence.
await producer.send_and_wait("orders", key=order_id.encode(), value=payload)
# Unordered: telemetry where order doesn't matter — let Kafka balance it.
await producer.send_and_wait("metrics", key=None, value=payload)

Kafka offers three guarantees; you choose with config and commit timing.

GuaranteeMeaningHow you get it
At-most-onceMay lose, never duplicatesCommit offset before processing
At-least-onceNever lose, may duplicateCommit after processing (the sane default)
Exactly-onceProcessed onceTransactions + idempotent consumer (costly — see below)

Set enable_idempotence=True on the producer (it implies acks="all"). This stops producer-side duplicates: on an internal retry after a network blip, the broker dedups by (producer_id, sequence_number), so a flaky connection doesn’t write the same event twice. It does not dedup your application re-sending the same logical event — that’s your job (give each event an event_id and dedup on the consumer).

This is where Python’s async model shines. A consumer is an async iterator — the poll loop you hand-write in kafkajs or sarama is the async for.

const consumer = kafka.consumer({ groupId: "order-processor" });
await consumer.connect();
await consumer.subscribe({ topic: "orders", fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value!.toString());
await handle(order); // kafkajs auto-commits after this resolves
},
});

async for msg in consumer polls the broker, hands you decoded ConsumerRecords (.key, .value, .topic, .partition, .offset, .headers), and never blocks the event loop. Other coroutines run between messages. This is the same cold, pull-driven iteration from the Streams module — a Kafka consumer is just an async stream whose source is a partitioned log.

The single most important consumer decision. Commit = “I’m done with everything up to this offset; don’t redeliver it.”

ModeConfigGuaranteeRisk
Auto commitenable_auto_commit=True (timer, default 5s)At-most-once-ishCommits on a timer, not after processing — crash after commit but before work = lost message
Manual commitenable_auto_commit=False + await consumer.commit()At-least-onceCrash before commit = redelivery (so make handlers idempotent)
# Auto-commit trap: the offset advances on a timer regardless of whether
# handle() finished. A crash mid-handle silently drops the message.
consumer = AIOKafkaConsumer(..., enable_auto_commit=True) # convenient, lossy
# Manual: commit only after the work is durably done. Worst case is a
# redelivery on crash — which is fine if handle() is idempotent.
consumer = AIOKafkaConsumer(..., enable_auto_commit=False)
async for msg in consumer:
await handle(msg)
await consumer.commit()

Prefer manual commit + at-least-once + idempotent handlers. It’s the combination that survives crashes without losing data, and it’s far simpler to reason about than exactly-once.

A consumer is a long-running worker — it must stop cleanly so it commits offsets and leaves the group (triggering a fast rebalance instead of a 10s session timeout). This is the same shutdown discipline from Async & Concurrency: catch the signal, break the loop, run cleanup in finally.

app/worker.py
import asyncio
import signal
from aiokafka import AIOKafkaConsumer
from app.events import OrderPlaced
async def run_consumer(stop: asyncio.Event) -> None:
consumer = AIOKafkaConsumer(
"orders",
bootstrap_servers="localhost:29092",
group_id="order-processor",
enable_auto_commit=False,
)
await consumer.start()
try:
async for msg in consumer:
if stop.is_set():
break
event = OrderPlaced.model_validate_json(msg.value)
await handle(event)
await consumer.commit()
finally:
await consumer.stop() # commits + leaves the group cleanly
async def main() -> None:
stop = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop.set) # flip the event on Ctrl-C / SIGTERM
await run_consumer(stop)
if __name__ == "__main__":
asyncio.run(main())

consumer.stop() in the finally is the whole point: it flushes any pending commit and sends a graceful LeaveGroup, so Kafka reassigns your partitions in milliseconds rather than waiting out the session timeout.

Dead letter queues: never block on a poison message

Section titled “Dead letter queues: never block on a poison message”

Some messages will never succeed — malformed JSON, a schema you can’t parse, a business rule that rejects them forever. These are poison messages. If your handler just throws and you don’t commit, Kafka redelivers the same message forever and the partition is stuck — head-of-line blocking that halts every good message behind it.

The production pattern is retry transient failures a few times, then divert poison messages to a Dead Letter Queue (a separate topic) and move on.

Retry and dead-letter flow
Rendering diagram…

Unlike Spring Kafka’s DeadLetterPublishingRecoverer, aiokafka has no built-in DLQ — you wire it explicitly, which is honestly clearer about what’s happening. Classify the error, retry the transient ones, and republish the rest to the DLQ with the failure reason in headers.

app/dlq.py
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.events import OrderPlaced
DLQ_TOPIC = "orders.DLQ"
MAX_RETRIES = 3
class PoisonMessage(Exception):
"""Non-retryable: bad data / failed validation. Straight to DLQ."""
async def to_dlq(producer: AIOKafkaProducer, msg, reason: str) -> None:
await producer.send_and_wait(
DLQ_TOPIC,
key=msg.key,
value=msg.value, # preserve the raw bytes
headers=[ # breadcrumbs for the DLQ consumer
("x-original-topic", msg.topic.encode()),
("x-original-offset", str(msg.offset).encode()),
("x-error", reason.encode()),
],
)
async def consume_with_dlq(
consumer: AIOKafkaConsumer, producer: AIOKafkaProducer
) -> None:
async for msg in consumer:
try:
event = OrderPlaced.model_validate_json(msg.value) # may raise (poison)
await process_with_retries(event) # transient retries
except PoisonMessage as exc:
await to_dlq(producer, msg, f"poison: {exc}")
except Exception as exc: # retries exhausted
await to_dlq(producer, msg, f"exhausted: {exc}")
finally:
await consumer.commit() # ALWAYS advance — handled or parked, never stuck
async def process_with_retries(event: OrderPlaced) -> None:
delay = 0.5
for attempt in range(1, MAX_RETRIES + 1):
try:
await handle(event)
return
except PoisonMessage:
raise # don't retry poison
except Exception:
if attempt == MAX_RETRIES:
raise # give up -> caller sends to DLQ
await asyncio.sleep(delay) # backoff
delay *= 2

Two rules make this robust:

  • Commit no matter what. Whether the message was processed or parked on the DLQ, you’ve dealt with it — advance the offset so the partition keeps flowing.
  • Separate poison from transient. A validation error (PoisonMessage) will never succeed; retrying it just wastes the backoff window before the inevitable DLQ. A DB timeout might succeed next time — retry those.

Idempotent consumers (the at-least-once survival kit)

Section titled “Idempotent consumers (the at-least-once survival kit)”

At-least-once means your handler will occasionally see the same event twice — a redelivery after a crash, a producer resend. The fix isn’t exactly-once machinery; it’s making handle() safe to run twice. Two techniques, often combined:

app/idempotency.py
# 1. Dedup table: record event_ids you've handled, in the SAME transaction as
# the work. A replay finds the id already present and no-ops.
async def handle(event: OrderPlaced, db) -> None:
async with db.transaction():
already = await db.fetchval(
"SELECT 1 FROM processed_events WHERE event_id = $1", event.event_id
)
if already:
return # seen it — skip
await db.execute(
"INSERT INTO orders (order_id, user_id, amount, status) "
"VALUES ($1, $2, $3, 'PLACED')",
event.order_id, event.user_id, event.amount,
)
await db.execute(
"INSERT INTO processed_events (event_id) VALUES ($1)", event.event_id
)
# 2. Natural idempotency: write so re-running is a no-op. UPSERTs and
# "set to this state" operations are inherently safe to repeat.
await db.execute(
"INSERT INTO orders (order_id, status) VALUES ($1, 'PLACED') "
"ON CONFLICT (order_id) DO NOTHING",
event.order_id,
)

The dedup-table approach needs a stable, producer-assigned event_id (not the Kafka offset — a republish to a DLQ or retry topic changes the offset but is the same logical event). That’s exactly why OrderPlaced carries one.

A close cousin: instead of storing current state and mutating it, store the stream of events as the source of truth and derive state by replaying them. Kafka’s durable, ordered, replayable log is a natural fit for the event store.

Event sourcing: events fold into projections
Rendering diagram…
  • Events are immutable facts; you only ever append.
  • Projections (a.k.a. read models) are derived views — a consumer folds the event stream into whatever shape a query needs (a Postgres table, a Redis cache). Because they’re derived, you can drop and rebuild a projection by replaying the log from offset 0, or build a brand-new view of historical data.
  • Projection-building consumers must be idempotent — you’ll replay events when rebuilding, so applying the same event twice has to be safe.

A message on a topic is a contract with every current and future consumer. Treat event schemas like a public API.

  • Add fields, don’t break them. New fields must have defaults so old consumers (which ignore them) and new messages (which omit them) both deserialize. Pydantic models with defaults give you this for free. Never rename or retype a field — that’s a remove + add, and it breaks existing messages on the log.
  • Be lenient on read. Configure the model to ignore unknown fields so a newer producer’s extra field doesn’t crash an older consumer.
  • Version explicitly. Carry a schema_version on the event (as OrderPlaced does). For a truly breaking change, prefer a new versioned topic (orders.v2) and run both consumers during migration over silently mutating v1.
app/events.py
from pydantic import BaseModel, ConfigDict
class OrderPlaced(BaseModel):
# ignore unknown fields a newer producer might add -> forward compatible
model_config = ConfigDict(extra="ignore")
event_id: str
order_id: str
amount: Decimal
schema_version: int = 1
# v2 ADDED this, with a default -> old messages (no currency) still parse:
currency: str = "USD"

The natural question: where does Kafka live in a web app? The answer is split.

Producer → in the app, via lifespan. The web process produces events as a side effect of handling requests (place an order → emit OrderPlaced). Create one producer at startup, share it across requests, close it at shutdown — the same lifespan pattern you’d use for a DB pool.

app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from aiokafka import AIOKafkaProducer
from app.events import OrderPlaced
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.producer = AIOKafkaProducer(
bootstrap_servers="localhost:29092",
acks="all",
enable_idempotence=True,
)
await app.state.producer.start() # one producer for the whole app
try:
yield
finally:
await app.state.producer.stop() # flush + close on shutdown
app = FastAPI(lifespan=lifespan)
@app.post("/orders", status_code=202) # 202 Accepted: queued, not done
async def place_order(req: Request) -> dict[str, str]:
event = OrderPlaced(event_id="...", order_id="...", user_id=1, amount=Decimal("9.99"))
producer: AIOKafkaProducer = req.app.state.producer
await producer.send_and_wait("orders", key=event.order_id.encode(),
value=event.model_dump_json().encode())
return {"order_id": event.order_id, "status": "accepted"}

Consumer → a separate worker process. Do not run the consumer loop inside the web process. Why:

  • Different scaling axes. Web pods scale on request rate; consumers scale on partition count and processing lag. Coupling them wastes resources and caps your consumer parallelism at your web replica count.
  • async for is an infinite loop. It would fight your request handlers for the event loop, and a slow message would add latency to HTTP traffic.
  • Independent lifecycle. You want to deploy, restart, or scale the consumer without bouncing the API, and vice versa.

So the consumer is its own entrypoint — python -m app.worker (the worker.py above) — deployed as a separate Deployment/process that scales independently. The web app writes to Kafka; the worker reads. The order-pipeline sub-project builds exactly this split.

Build the real thing: a producer emitting events, a consumer maintaining a projection, and retry + dead-letter handling for the messages that go bad.