Event-Driven with Kafka
You’ve driven Kafka from TypeScript (kafkajs) or Go (sarama /
confluent-kafka-go). In Python the modern, async-native choice is aiokafka:
an asyncio producer and consumer that plug straight into the async/await
world you built in Async & Concurrency and
Streams. No thread pools, no callback bridges — a
consumer is literally async for msg in consumer.
This module is about the patterns, not just the API: the event-driven mental model, delivery semantics, dead letter queues, idempotent consumers, a sober look at event sourcing and exactly-once, and how to wire all of this into a FastAPI app without shooting yourself in the foot.
The client: which library
Section titled “The client: which library”| Library | Style | Use it when |
|---|---|---|
aiokafka | Pure-Python, asyncio-native | Default for async apps (FastAPI, asyncio workers). What we teach here. |
confluent-kafka | C wrapper (librdkafka), blocking | Highest throughput, richest config; but its API is sync — you’d run it in a thread pool. |
kafka-python | Pure-Python, sync | Legacy/simple scripts. No async. Maintenance has been spotty. |
uv add aiokafka pydanticaiokafka reimplements the Kafka protocol on top of asyncio, so its calls are
coroutines — await producer.send_and_wait(...), async for msg in consumer.
That’s the whole reason to pick it over confluent-kafka in an async service: no
event-loop-blocking C calls, no executor juggling.
The mental model: events vs request/response
Section titled “The mental model: events vs request/response”Coming from HTTP/gRPC, your reflex is request/response: the caller waits for a reply, and the callee must be up right now. Event-driven flips that. A producer appends a fact to a log and moves on; consumers read it later, at their own pace, maybe more than once, maybe years later. The producer doesn’t know — or care — who reads it.
| Request/response (HTTP, gRPC) | Event-driven (Kafka) | |
|---|---|---|
| Coupling | Caller knows the callee | Producer doesn’t know consumers |
| Timing | Synchronous — callee must be up | Asynchronous — consumers catch up later |
| Backpressure | Caller blocks / times out | Log buffers; consumers pull at their pace |
| Failure | Request fails, caller retries | Event persists; reprocess any time |
| Fan-out | One call, one handler | One event, N independent consumer groups |
| Shape | ”Do this and tell me the result" | "This happened” (a fact in past tense) |
The naming convention follows the model: events are facts in the past tense —
OrderPlaced, PaymentCaptured, UserRegistered — not commands (PlaceOrder).
A command can be rejected; a fact already happened.
Topics, partitions, consumer groups, offsets
Section titled “Topics, partitions, consumer groups, offsets”Five concepts carry almost everything. If you’ve used Kafka from any language these are identical — only the client API changes.
| Concept | What it is | Analogy |
|---|---|---|
| Topic | Named, append-only log of messages | A Go channel name / an event “table” |
| Partition | One ordered shard of a topic. Order is guaranteed within a partition, not across. | A single-writer log file |
| Offset | A message’s position in its partition (monotonic int) | Array index / a bookmark |
| Key | Optional bytes; same key → same partition → ordered | Shard key |
| Consumer group | A set of consumers that share a topic’s partitions; each partition goes to exactly one member | A worker pool with sticky assignment |
Two rules that explain most surprises:
- Ordering is per-partition. Want all events for one order processed in order?
Key by
order_idso they land on the same partition. - Parallelism is capped by partition count. A group of 10 consumers on a 6-partition topic leaves 4 idle. Pick partitions for your target concurrency up front — they’re hard to change later.
flowchart LR Prod["producer<br/>(key = order_id)"] subgraph Topic["topic: orders (3 partitions)"] P0["P0: [o0][o3][o6] ..."] P1["P1: [o1][o4][o7] ..."] P2["P2: [o2][o5][o8] ..."] end subgraph Group["consumer group "order-proc""] A["consumer A"] B["consumer B"] end Prod -->|"keyed by order_id"| P0 Prod -->|"keyed by order_id"| P1 Prod -->|"keyed by order_id"| P2 P0 -->|"owns + own offset"| A P1 -->|"owns + own offset"| A P2 -->|"owns + own offset"| B
The offset is the only state a consumer keeps. “Where am I in each partition?” Commit it and a restart resumes there; reset it to replay. Everything about delivery guarantees comes down to when you commit relative to when you process.
Producing events
Section titled “Producing events”A typed event model with Pydantic
Section titled “A typed event model with Pydantic”Don’t ship raw dicts onto a topic — once a message is on the log, its shape is a
contract other services depend on. Model events as Pydantic v2 (see
Pydantic) and serialize with model_dump_json(),
which gives you bytes-ready JSON and validation on the way back in.
from datetime import datetime, timezonefrom decimal import Decimalfrom pydantic import BaseModel, Field
def _now() -> datetime: return datetime.now(timezone.utc)
class OrderPlaced(BaseModel): event_id: str # unique per event — used for idempotency order_id: str user_id: int amount: Decimal currency: str = "USD" occurred_at: datetime = Field(default_factory=_now) schema_version: int = 1 # see "Schema discipline" belowAIOKafkaProducer
Section titled “AIOKafkaProducer”The producer is an async object: await producer.start(), send, await producer.stop(). Serialization is just turning your model into bytes.
import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["localhost:29092"] });const producer = kafka.producer({ idempotent: true });await producer.connect();
await producer.send({ topic: "orders", messages: [{ key: order.orderId, value: JSON.stringify(order) }], acks: -1, // all replicas});await producer.disconnect();w := &kafka.Writer{ Addr: kafka.TCP("localhost:29092"), Topic: "orders", RequiredAcks: kafka.RequireAll, // acks=all Balancer: &kafka.Hash{}, // key-based partitioning}defer w.Close()
value, _ := json.Marshal(order)err := w.WriteMessages(ctx, kafka.Message{ Key: []byte(order.OrderID), Value: value,})from aiokafka import AIOKafkaProducerfrom app.events import OrderPlaced
async def make_producer() -> AIOKafkaProducer: producer = AIOKafkaProducer( bootstrap_servers="localhost:29092", acks="all", # wait for all in-sync replicas enable_idempotence=True, # no duplicates on internal retry ) await producer.start() return producer
async def publish_order(producer: AIOKafkaProducer, event: OrderPlaced) -> None: await producer.send_and_wait( topic="orders", key=event.order_id.encode(), # same order_id -> same partition value=event.model_dump_json().encode(), # Pydantic -> JSON -> bytes )Across all three you build the message and send it yourself — there’s no
framework-magic serializer like the JVM world’s JsonSerializer. The Python
difference is that everything is a coroutine: send_and_wait returns when the
broker has acked, so an await failing is your delivery error.
Keys & partitioning
Section titled “Keys & partitioning”The key decides the partition (partition = hash(key) % num_partitions). It’s how
you get ordering where you need it.
| Key | Effect | Use for |
|---|---|---|
Entity id (order_id, user_id) | All events for that entity share a partition → ordered | Order/user lifecycles |
None | Round-robin across partitions, no ordering | Metrics, independent events |
Composite (f"{tenant}:{user}") | Grouped ordering | Per-tenant streams |
# Ordered per order: every OrderPlaced/OrderPaid/OrderShipped for one order# lands on the same partition and is consumed in sequence.await producer.send_and_wait("orders", key=order_id.encode(), value=payload)
# Unordered: telemetry where order doesn't matter — let Kafka balance it.await producer.send_and_wait("metrics", key=None, value=payload)Delivery semantics & idempotence
Section titled “Delivery semantics & idempotence”Kafka offers three guarantees; you choose with config and commit timing.
| Guarantee | Meaning | How you get it |
|---|---|---|
| At-most-once | May lose, never duplicates | Commit offset before processing |
| At-least-once | Never lose, may duplicate | Commit after processing (the sane default) |
| Exactly-once | Processed once | Transactions + idempotent consumer (costly — see below) |
Set enable_idempotence=True on the producer (it implies acks="all"). This
stops producer-side duplicates: on an internal retry after a network blip, the
broker dedups by (producer_id, sequence_number), so a flaky connection doesn’t
write the same event twice. It does not dedup your application re-sending the
same logical event — that’s your job (give each event an event_id and dedup on
the consumer).
Consuming events
Section titled “Consuming events”AIOKafkaConsumer and async for
Section titled “AIOKafkaConsumer and async for”This is where Python’s async model shines. A consumer is an async iterator — the
poll loop you hand-write in kafkajs or sarama is the async for.
const consumer = kafka.consumer({ groupId: "order-processor" });await consumer.connect();await consumer.subscribe({ topic: "orders", fromBeginning: true });
await consumer.run({ eachMessage: async ({ message }) => { const order = JSON.parse(message.value!.toString()); await handle(order); // kafkajs auto-commits after this resolves },});r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:29092"}, GroupID: "order-processor", Topic: "orders",})defer r.Close()
for { m, err := r.FetchMessage(ctx) // explicit poll loop if err != nil { break } var order Order json.Unmarshal(m.Value, &order) handle(order) r.CommitMessages(ctx, m) // commit AFTER processing}from aiokafka import AIOKafkaConsumerfrom app.events import OrderPlaced
async def consume() -> None: consumer = AIOKafkaConsumer( "orders", bootstrap_servers="localhost:29092", group_id="order-processor", # the consumer group auto_offset_reset="earliest", # no committed offset? start from the top enable_auto_commit=False, # we commit manually, after processing ) await consumer.start() try: async for msg in consumer: # the poll loop, as iteration event = OrderPlaced.model_validate_json(msg.value) await handle(event) # do the work... await consumer.commit() # ...then commit (at-least-once) finally: await consumer.stop() # leaves group + commits cleanlyasync for msg in consumer polls the broker, hands you decoded ConsumerRecords
(.key, .value, .topic, .partition, .offset, .headers), and never blocks
the event loop. Other coroutines run between messages. This is the same cold,
pull-driven iteration from the Streams module — a
Kafka consumer is just an async stream whose source is a partitioned log.
Manual vs auto commit
Section titled “Manual vs auto commit”The single most important consumer decision. Commit = “I’m done with everything up to this offset; don’t redeliver it.”
| Mode | Config | Guarantee | Risk |
|---|---|---|---|
| Auto commit | enable_auto_commit=True (timer, default 5s) | At-most-once-ish | Commits on a timer, not after processing — crash after commit but before work = lost message |
| Manual commit | enable_auto_commit=False + await consumer.commit() | At-least-once | Crash before commit = redelivery (so make handlers idempotent) |
# Auto-commit trap: the offset advances on a timer regardless of whether# handle() finished. A crash mid-handle silently drops the message.consumer = AIOKafkaConsumer(..., enable_auto_commit=True) # convenient, lossy
# Manual: commit only after the work is durably done. Worst case is a# redelivery on crash — which is fine if handle() is idempotent.consumer = AIOKafkaConsumer(..., enable_auto_commit=False)async for msg in consumer: await handle(msg) await consumer.commit()Prefer manual commit + at-least-once + idempotent handlers. It’s the combination that survives crashes without losing data, and it’s far simpler to reason about than exactly-once.
Graceful shutdown
Section titled “Graceful shutdown”A consumer is a long-running worker — it must stop cleanly so it commits offsets
and leaves the group (triggering a fast rebalance instead of a 10s session
timeout). This is the same shutdown discipline from
Async & Concurrency: catch the signal,
break the loop, run cleanup in finally.
import asyncioimport signalfrom aiokafka import AIOKafkaConsumerfrom app.events import OrderPlaced
async def run_consumer(stop: asyncio.Event) -> None: consumer = AIOKafkaConsumer( "orders", bootstrap_servers="localhost:29092", group_id="order-processor", enable_auto_commit=False, ) await consumer.start() try: async for msg in consumer: if stop.is_set(): break event = OrderPlaced.model_validate_json(msg.value) await handle(event) await consumer.commit() finally: await consumer.stop() # commits + leaves the group cleanly
async def main() -> None: stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set) # flip the event on Ctrl-C / SIGTERM await run_consumer(stop)
if __name__ == "__main__": asyncio.run(main())consumer.stop() in the finally is the whole point: it flushes any pending
commit and sends a graceful LeaveGroup, so Kafka reassigns your partitions in
milliseconds rather than waiting out the session timeout.
Dead letter queues: never block on a poison message
Section titled “Dead letter queues: never block on a poison message”Some messages will never succeed — malformed JSON, a schema you can’t parse, a business rule that rejects them forever. These are poison messages. If your handler just throws and you don’t commit, Kafka redelivers the same message forever and the partition is stuck — head-of-line blocking that halts every good message behind it.
The production pattern is retry transient failures a few times, then divert poison messages to a Dead Letter Queue (a separate topic) and move on.
flowchart TB
Orders["orders"] --> Consumer["consumer"]
Consumer --> Handle["handle()"]
Handle -->|"ok"| Commit["commit offset"]
Handle -->|"raises"| Transient{"transient?"}
Transient -->|"yes"| Retry["retry (n times, backoff)"]
Retry -->|"still failing"| DLQ["orders.DLQ<br/>(commit + continue)"]
Transient -->|"no (poison)"| DLQ
DLQ --> DLQConsumer["DLQ consumer:<br/>log / store / alert / replay"]
Unlike Spring Kafka’s DeadLetterPublishingRecoverer, aiokafka has no built-in
DLQ — you wire it explicitly, which is honestly clearer about what’s happening.
Classify the error, retry the transient ones, and republish the rest to the DLQ
with the failure reason in headers.
import asynciofrom aiokafka import AIOKafkaConsumer, AIOKafkaProducerfrom app.events import OrderPlaced
DLQ_TOPIC = "orders.DLQ"MAX_RETRIES = 3
class PoisonMessage(Exception): """Non-retryable: bad data / failed validation. Straight to DLQ."""
async def to_dlq(producer: AIOKafkaProducer, msg, reason: str) -> None: await producer.send_and_wait( DLQ_TOPIC, key=msg.key, value=msg.value, # preserve the raw bytes headers=[ # breadcrumbs for the DLQ consumer ("x-original-topic", msg.topic.encode()), ("x-original-offset", str(msg.offset).encode()), ("x-error", reason.encode()), ], )
async def consume_with_dlq( consumer: AIOKafkaConsumer, producer: AIOKafkaProducer) -> None: async for msg in consumer: try: event = OrderPlaced.model_validate_json(msg.value) # may raise (poison) await process_with_retries(event) # transient retries except PoisonMessage as exc: await to_dlq(producer, msg, f"poison: {exc}") except Exception as exc: # retries exhausted await to_dlq(producer, msg, f"exhausted: {exc}") finally: await consumer.commit() # ALWAYS advance — handled or parked, never stuck
async def process_with_retries(event: OrderPlaced) -> None: delay = 0.5 for attempt in range(1, MAX_RETRIES + 1): try: await handle(event) return except PoisonMessage: raise # don't retry poison except Exception: if attempt == MAX_RETRIES: raise # give up -> caller sends to DLQ await asyncio.sleep(delay) # backoff delay *= 2Two rules make this robust:
- Commit no matter what. Whether the message was processed or parked on the DLQ, you’ve dealt with it — advance the offset so the partition keeps flowing.
- Separate poison from transient. A validation error (
PoisonMessage) will never succeed; retrying it just wastes the backoff window before the inevitable DLQ. A DB timeout might succeed next time — retry those.
Idempotent consumers (the at-least-once survival kit)
Section titled “Idempotent consumers (the at-least-once survival kit)”At-least-once means your handler will occasionally see the same event twice —
a redelivery after a crash, a producer resend. The fix isn’t exactly-once
machinery; it’s making handle() safe to run twice. Two techniques, often combined:
# 1. Dedup table: record event_ids you've handled, in the SAME transaction as# the work. A replay finds the id already present and no-ops.async def handle(event: OrderPlaced, db) -> None: async with db.transaction(): already = await db.fetchval( "SELECT 1 FROM processed_events WHERE event_id = $1", event.event_id ) if already: return # seen it — skip await db.execute( "INSERT INTO orders (order_id, user_id, amount, status) " "VALUES ($1, $2, $3, 'PLACED')", event.order_id, event.user_id, event.amount, ) await db.execute( "INSERT INTO processed_events (event_id) VALUES ($1)", event.event_id )# 2. Natural idempotency: write so re-running is a no-op. UPSERTs and# "set to this state" operations are inherently safe to repeat.await db.execute( "INSERT INTO orders (order_id, status) VALUES ($1, 'PLACED') " "ON CONFLICT (order_id) DO NOTHING", event.order_id,)The dedup-table approach needs a stable, producer-assigned event_id (not the
Kafka offset — a republish to a DLQ or retry topic changes the offset but is the
same logical event). That’s exactly why OrderPlaced carries one.
Event sourcing, briefly
Section titled “Event sourcing, briefly”A close cousin: instead of storing current state and mutating it, store the stream of events as the source of truth and derive state by replaying them. Kafka’s durable, ordered, replayable log is a natural fit for the event store.
flowchart TB
subgraph Events["events (source of truth, append-only)"]
direction LR
E1["OrderPlaced(o1)"] --> E2["OrderPaid(o1)"] --> E3["OrderShipped(o1)"]
end
Events -->|"replay / fold"| Projections
subgraph Projections["projections (read models, rebuildable)"]
direction TB
OrdersTable["orders_table:<br/>o1 = {status: SHIPPED, ...}"]
DailyRevenue["daily_revenue:<br/>2026-06-19 = $4,210"]
end
- Events are immutable facts; you only ever append.
- Projections (a.k.a. read models) are derived views — a consumer folds the event stream into whatever shape a query needs (a Postgres table, a Redis cache). Because they’re derived, you can drop and rebuild a projection by replaying the log from offset 0, or build a brand-new view of historical data.
- Projection-building consumers must be idempotent — you’ll replay events when rebuilding, so applying the same event twice has to be safe.
Schema discipline
Section titled “Schema discipline”A message on a topic is a contract with every current and future consumer. Treat event schemas like a public API.
- Add fields, don’t break them. New fields must have defaults so old consumers (which ignore them) and new messages (which omit them) both deserialize. Pydantic models with defaults give you this for free. Never rename or retype a field — that’s a remove + add, and it breaks existing messages on the log.
- Be lenient on read. Configure the model to ignore unknown fields so a newer producer’s extra field doesn’t crash an older consumer.
- Version explicitly. Carry a
schema_versionon the event (asOrderPlaceddoes). For a truly breaking change, prefer a new versioned topic (orders.v2) and run both consumers during migration over silently mutatingv1.
from pydantic import BaseModel, ConfigDict
class OrderPlaced(BaseModel): # ignore unknown fields a newer producer might add -> forward compatible model_config = ConfigDict(extra="ignore")
event_id: str order_id: str amount: Decimal schema_version: int = 1 # v2 ADDED this, with a default -> old messages (no currency) still parse: currency: str = "USD"Integrating with FastAPI
Section titled “Integrating with FastAPI”The natural question: where does Kafka live in a web app? The answer is split.
Producer → in the app, via lifespan. The web process produces events as a side
effect of handling requests (place an order → emit OrderPlaced). Create one
producer at startup, share it across requests, close it at shutdown — the same
lifespan pattern you’d use for a DB pool.
from contextlib import asynccontextmanagerfrom fastapi import FastAPI, Requestfrom aiokafka import AIOKafkaProducerfrom app.events import OrderPlaced
@asynccontextmanagerasync def lifespan(app: FastAPI): app.state.producer = AIOKafkaProducer( bootstrap_servers="localhost:29092", acks="all", enable_idempotence=True, ) await app.state.producer.start() # one producer for the whole app try: yield finally: await app.state.producer.stop() # flush + close on shutdown
app = FastAPI(lifespan=lifespan)
@app.post("/orders", status_code=202) # 202 Accepted: queued, not doneasync def place_order(req: Request) -> dict[str, str]: event = OrderPlaced(event_id="...", order_id="...", user_id=1, amount=Decimal("9.99")) producer: AIOKafkaProducer = req.app.state.producer await producer.send_and_wait("orders", key=event.order_id.encode(), value=event.model_dump_json().encode()) return {"order_id": event.order_id, "status": "accepted"}Consumer → a separate worker process. Do not run the consumer loop inside the web process. Why:
- Different scaling axes. Web pods scale on request rate; consumers scale on partition count and processing lag. Coupling them wastes resources and caps your consumer parallelism at your web replica count.
async foris an infinite loop. It would fight your request handlers for the event loop, and a slow message would add latency to HTTP traffic.- Independent lifecycle. You want to deploy, restart, or scale the consumer without bouncing the API, and vice versa.
So the consumer is its own entrypoint — python -m app.worker (the worker.py
above) — deployed as a separate Deployment/process that scales independently. The
web app writes to Kafka; the worker reads. The order-pipeline sub-project builds
exactly this split.
Practice
Section titled “Practice”Build the real thing: a producer emitting events, a consumer maintaining a projection, and retry + dead-letter handling for the messages that go bad.