Event-Driven with Kafka
You’ve used Kafka through kafkajs (TypeScript) or sarama/confluent-kafka-go
(Go). On the JVM, Spring Kafka is the standard — it provides KafkaTemplate
for producing and @KafkaListener for consuming. This module covers production
patterns: serialization, error handling, dead letter topics, exactly-once
semantics, and offset management.
Kafka Refresher
Section titled “Kafka Refresher”If you’ve used Kafka before, this is a quick reminder. If not, here are the core concepts: a producer writes to a partitioned topic, and a consumer group divides the partitions among its members.
flowchart LR P["Producer"] -->|"orders topic"| T subgraph T["Topic: orders (partitioned)"] P0["Partition 0"] P1["Partition 1"] P2["Partition 2"] P3["Partition 3"] end subgraph G["Consumer Group"] C1["Consumer 1"] C2["Consumer 2"] end P0 --> C1 P1 --> C1 P2 --> C2 P3 --> C2
| Concept | Description |
|---|---|
| Topic | Named stream of messages (like a queue/channel name) |
| Partition | Ordered, immutable sequence within a topic. Messages within a partition are ordered. |
| Offset | Position of a message within a partition (monotonically increasing) |
| Consumer Group | Set of consumers that cooperate to consume a topic. Each partition goes to exactly one consumer in the group. |
| Key | Optional message key — messages with the same key go to the same partition (ordering guarantee) |
| Broker | Kafka server node |
When to Use Kafka vs Other Tools
Section titled “When to Use Kafka vs Other Tools”| Use Case | Tool | Why |
|---|---|---|
| Fire-and-forget notifications | Redis Pub/Sub | Simplest, lowest latency |
| Task queue (process once) | RabbitMQ or Redis Streams | Built-in ack, simpler ops |
| Event sourcing, audit log | Kafka | Persistent, replayable, ordered |
| Data pipeline between services | Kafka | Decouples producers from consumers |
| Real-time analytics | Kafka | High throughput, consumer groups |
| Request/reply RPC | HTTP or gRPC | Kafka is not designed for request/reply |
Spring Kafka Setup
Section titled “Spring Kafka Setup”Dependencies
Section titled “Dependencies”plugins { kotlin("jvm") version "2.1.0" kotlin("plugin.spring") version "2.1.0" id("org.springframework.boot") version "3.4.1" id("io.spring.dependency-management") version "1.1.7"}
dependencies { implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.kafka:spring-kafka") implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
testImplementation("org.springframework.kafka:spring-kafka-test")}Configuration
Section titled “Configuration”spring: kafka: bootstrap-servers: localhost:29092
producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all # Wait for all replicas to acknowledge retries: 3 # Retry on transient failures properties: enable.idempotence: true # Prevent duplicate messages on retry max.in.flight.requests.per.connection: 5
consumer: group-id: my-app-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest # Start from beginning if no committed offset properties: spring.json.trusted.packages: "com.example.*" # Trust classes for deserialization
listener: ack-mode: record # Acknowledge each record individually concurrency: 3 # Number of consumer threadsTopic Configuration
Section titled “Topic Configuration”import org.apache.kafka.clients.admin.NewTopicimport org.springframework.context.annotation.Beanimport org.springframework.context.annotation.Configurationimport org.springframework.kafka.config.TopicBuilder
@Configurationclass KafkaTopicConfig {
@Bean fun ordersTopic(): NewTopic = TopicBuilder .name("orders") .partitions(6) .replicas(1) // Use 3 in production .build()
@Bean fun orderEventsTopic(): NewTopic = TopicBuilder .name("order-events") .partitions(6) .replicas(1) .config("retention.ms", "604800000") // 7 days .build()
@Bean fun deadLetterTopic(): NewTopic = TopicBuilder .name("orders.DLT") // Convention: original-topic.DLT .partitions(1) .replicas(1) .build()}Producing Messages
Section titled “Producing Messages”KafkaTemplate Basics
Section titled “KafkaTemplate Basics”import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['localhost:29092'] });const producer = kafka.producer();await producer.connect();
await producer.send({ topic: 'orders', messages: [ { key: 'order-123', value: JSON.stringify({ orderId: 123, amount: 99.99 }) } ]});config := sarama.NewConfig()config.Producer.Return.Successes = trueproducer, _ := sarama.NewSyncProducer([]string{"localhost:29092"}, config)
msg := &sarama.ProducerMessage{ Topic: "orders", Key: sarama.StringEncoder("order-123"), Value: sarama.StringEncoder(`{"orderId":123,"amount":99.99}`),}partition, offset, _ := producer.SendMessage(msg)import org.springframework.kafka.core.KafkaTemplateimport org.springframework.stereotype.Service
data class OrderEvent( val orderId: String, val userId: Long, val amount: Double, val status: String, val timestamp: Instant = Instant.now())
@Serviceclass OrderProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// Simple send (fire and forget with internal retries) fun sendOrder(event: OrderEvent) { kafkaTemplate.send("orders", event.orderId, event) }
// Send with callback (non-blocking) fun sendOrderAsync(event: OrderEvent) { val future = kafkaTemplate.send("orders", event.orderId, event) future.whenComplete { result, ex -> if (ex != null) { println("Failed to send: ${ex.message}") } else { val metadata = result.recordMetadata println("Sent to ${metadata.topic()}-${metadata.partition()} @ offset ${metadata.offset()}") } } }
// Send and wait (blocking) fun sendOrderSync(event: OrderEvent) { val result = kafkaTemplate.send("orders", event.orderId, event).get() val metadata = result.recordMetadata println("Sent to ${metadata.topic()}-${metadata.partition()} @ offset ${metadata.offset()}") }
// Send to specific partition fun sendToPartition(event: OrderEvent, partition: Int) { kafkaTemplate.send("orders", partition, event.orderId, event) }
// Send with headers fun sendWithHeaders(event: OrderEvent) { val record = org.apache.kafka.clients.producer.ProducerRecord<String, Any>( "orders", // topic null, // partition (null = use partitioner) event.orderId, // key event // value ) record.headers().add("event-type", "ORDER_CREATED".toByteArray()) record.headers().add("source", "order-service".toByteArray()) kafkaTemplate.send(record) }}In TS and Go you build the message and call send yourself; in Kotlin the
KafkaTemplate<String, Any> handles serialization and gives you sync, async
(callback), and fire-and-forget variants from the same API.
Message Key Strategy
Section titled “Message Key Strategy”The message key determines which partition a message goes to. Messages with the same key always go to the same partition, guaranteeing ordering.
@Serviceclass EventProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// All events for the same order go to the same partition (ordered) fun sendOrderEvent(orderId: String, event: Any) { kafkaTemplate.send("order-events", orderId, event) // orderId = "order-123" → always partition X // orderId = "order-456" → always partition Y }
// All events for the same user go to the same partition fun sendUserEvent(userId: String, event: Any) { kafkaTemplate.send("user-events", userId, event) }
// No key = round-robin across partitions (no ordering guarantee) fun sendMetricEvent(event: Any) { kafkaTemplate.send("metrics", event) }}| Key Strategy | Ordering Guarantee | Use Case |
|---|---|---|
Entity ID (e.g., orderId) | All events for same entity are ordered | Order lifecycle, user actions |
| No key (null) | No ordering (round-robin) | Metrics, logs, independent events |
Composite key (userId:action) | Grouped ordering | User-action streams |
Consuming Messages
Section titled “Consuming Messages”@KafkaListener Basics
Section titled “@KafkaListener Basics”const consumer = kafka.consumer({ groupId: 'order-processor' });await consumer.connect();await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const order = JSON.parse(message.value.toString()); console.log(`Processing order ${order.orderId} from partition ${partition}`); }});type OrderHandler struct{}
func (h OrderHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }func (h OrderHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }func (h OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { var order OrderEvent json.Unmarshal(msg.Value, &order) fmt.Printf("Processing order %s\n", order.OrderID) session.MarkMessage(msg, "") } return nil}import org.springframework.kafka.annotation.KafkaListenerimport org.springframework.kafka.support.KafkaHeadersimport org.springframework.messaging.handler.annotation.Headerimport org.springframework.messaging.handler.annotation.Payloadimport org.springframework.stereotype.Component
@Componentclass OrderConsumer {
// Simple consumer @KafkaListener(topics = ["orders"], groupId = "order-processor") fun processOrder(event: OrderEvent) { println("Processing order: ${event.orderId}, amount: ${event.amount}") }}Key differences: kafkajs and sarama make you wire up the poll loop and JSON
decoding by hand. Spring’s @KafkaListener runs the loop for you and deserializes
the payload into your OrderEvent data class — the method body is just business
logic.
Accessing Message Metadata
Section titled “Accessing Message Metadata”@Componentclass OrderConsumer {
// With full metadata @KafkaListener(topics = ["orders"], groupId = "order-processor") fun processOrderWithMetadata( @Payload event: OrderEvent, @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String, @Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int, @Header(KafkaHeaders.OFFSET) offset: Long, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long, @Header(KafkaHeaders.RECEIVED_KEY, required = false) key: String? ) { println("Received from $topic-$partition @ offset $offset") println("Key: $key, Timestamp: ${Instant.ofEpochMilli(timestamp)}") println("Order: ${event.orderId}, Amount: ${event.amount}") }
// With ConsumerRecord (raw access) @KafkaListener(topics = ["orders"], groupId = "order-processor-raw") fun processOrderRaw(record: ConsumerRecord<String, OrderEvent>) { println("Key: ${record.key()}") println("Value: ${record.value()}") println("Partition: ${record.partition()}, Offset: ${record.offset()}") record.headers().forEach { header -> println("Header: ${header.key()} = ${String(header.value())}") } }}The raw form gives you a ConsumerRecord<String, OrderEvent> with key, value,
partition, offset, and headers — equivalent to the message object you’d inspect
in kafkajs.
Batch Consumption
Section titled “Batch Consumption”@Componentclass BatchOrderConsumer {
// Process messages in batches for higher throughput @KafkaListener( topics = ["orders"], groupId = "order-batch-processor", containerFactory = "batchListenerFactory" ) fun processBatch(events: List<OrderEvent>) { println("Processing batch of ${events.size} orders") events.forEach { event -> println(" Order: ${event.orderId}") } }}
// Configuration for batch listener@Configurationclass KafkaBatchConfig {
@Bean fun batchListenerFactory( consumerFactory: ConsumerFactory<String, OrderEvent> ): ConcurrentKafkaListenerContainerFactory<String, OrderEvent> { val factory = ConcurrentKafkaListenerContainerFactory<String, OrderEvent>() factory.consumerFactory = consumerFactory factory.isBatchListener = true // Enable batch mode factory.containerProperties.ackMode = ContainerProperties.AckMode.BATCH return factory }}A batch listener takes a List<OrderEvent> instead of a single event, so you can
amortize work (one DB write, fewer commits) across a poll.
Multiple Topics and Pattern Matching
Section titled “Multiple Topics and Pattern Matching”@Componentclass MultiTopicConsumer {
// Listen to multiple topics @KafkaListener(topics = ["orders", "payments", "shipments"], groupId = "event-logger") fun logEvent( @Payload event: Any, @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String ) { println("[$topic] Event: $event") }
// Pattern-based subscription @KafkaListener(topicPattern = "order-.*", groupId = "order-all-events") fun processAllOrderEvents( @Payload event: Any, @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String ) { println("Order event from $topic: $event") }}Concurrency
Section titled “Concurrency”@Componentclass ConcurrentConsumer {
// 3 consumer threads — each gets a subset of partitions @KafkaListener( topics = ["orders"], groupId = "concurrent-processor", concurrency = "3" // 3 threads ) fun process(event: OrderEvent) { println("[${Thread.currentThread().name}] Processing: ${event.orderId}") }}Concurrency rules:
concurrency = Ncreates N consumer threads in the same consumer group.- Each thread gets some partitions (max: total partitions / N per thread).
- If
concurrency > partition count, excess threads sit idle. - For 6 partitions and
concurrency = 3: each thread gets 2 partitions.
Serialization
Section titled “Serialization”JSON Serialization (Default)
Section titled “JSON Serialization (Default)”Spring Kafka uses Jackson for JSON serialization by default:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.*" spring.json.value.default.type: "com.example.OrderEvent"// Events are automatically serialized/deserialized as JSONdata class OrderEvent( val orderId: String, val userId: Long, val amount: Double, val status: String, val items: List<OrderItem> = emptyList(), val timestamp: Instant = Instant.now())
data class OrderItem( val productId: String, val quantity: Int, val price: Double)
// Producer — OrderEvent is automatically serialized to JSONkafkaTemplate.send("orders", event.orderId, event)
// Consumer — JSON is automatically deserialized to OrderEvent@KafkaListener(topics = ["orders"])fun process(event: OrderEvent) { /* event is already deserialized */ }Type Information in Headers
Section titled “Type Information in Headers”Spring Kafka can include type information in headers so the consumer knows which class to deserialize to:
// Producer config — add type info to headers@Beanfun producerFactory(): ProducerFactory<String, Any> { val config = mapOf( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, // Include type info in headers JsonSerializer.ADD_TYPE_INFO_HEADERS to true ) return DefaultKafkaProducerFactory(config)}
// Consumer config — use header-based type resolution@Beanfun consumerFactory(): ConsumerFactory<String, Any> { val config = mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092", ConsumerConfig.GROUP_ID_CONFIG to "my-group", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "com.example.*", JsonDeserializer.USE_TYPE_INFO_HEADERS to true ) return DefaultKafkaConsumerFactory(config)}Multi-Type Topics
Section titled “Multi-Type Topics”When a topic contains multiple event types, model them as a sealed class and
branch with an exhaustive when:
// Define event hierarchysealed class OrderDomainEvent { abstract val orderId: String abstract val timestamp: Instant}
data class OrderCreated( override val orderId: String, val userId: Long, val amount: Double, override val timestamp: Instant = Instant.now()) : OrderDomainEvent()
data class OrderPaid( override val orderId: String, val paymentId: String, override val timestamp: Instant = Instant.now()) : OrderDomainEvent()
data class OrderShipped( override val orderId: String, val trackingNumber: String, override val timestamp: Instant = Instant.now()) : OrderDomainEvent()
// Producer — send different event types to the same topic@Serviceclass OrderEventProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
fun orderCreated(orderId: String, userId: Long, amount: Double) { val event = OrderCreated(orderId, userId, amount) kafkaTemplate.send("order-events", orderId, event) }
fun orderPaid(orderId: String, paymentId: String) { val event = OrderPaid(orderId, paymentId) kafkaTemplate.send("order-events", orderId, event) }
fun orderShipped(orderId: String, trackingNumber: String) { val event = OrderShipped(orderId, trackingNumber) kafkaTemplate.send("order-events", orderId, event) }}
// Consumer — handle different types@Componentclass OrderEventConsumer {
@KafkaListener(topics = ["order-events"], groupId = "order-event-handler") fun handle(event: OrderDomainEvent) { when (event) { is OrderCreated -> handleCreated(event) is OrderPaid -> handlePaid(event) is OrderShipped -> handleShipped(event) } }
private fun handleCreated(event: OrderCreated) { println("Order ${event.orderId} created by user ${event.userId}") }
private fun handlePaid(event: OrderPaid) { println("Order ${event.orderId} paid: ${event.paymentId}") }
private fun handleShipped(event: OrderShipped) { println("Order ${event.orderId} shipped: ${event.trackingNumber}") }}Avro Serialization (Mention)
Section titled “Avro Serialization (Mention)”For production systems with strict schema governance, Apache Avro with the Confluent Schema Registry is the industry standard. Avro provides:
- Binary serialization (smaller, faster than JSON)
- Schema evolution (add/remove fields safely)
- Schema Registry enforces compatibility
// build.gradle.kts (for Avro — not covered in depth here)dependencies { implementation("io.confluent:kafka-avro-serializer:7.6.0") implementation("org.apache.avro:avro:1.11.3")}spring: kafka: producer: value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer consumer: value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer properties: schema.registry.url: http://localhost:8081Error Handling & Dead Letter Topics
Section titled “Error Handling & Dead Letter Topics”When a consumer fails to process a message, you need a strategy. Without one, the consumer retries infinitely (or skips the message silently). The production pattern is: retry with backoff, then route the poison message to a Dead Letter Topic.
flowchart LR
T["orders topic"] --> C["Consumer"]
C -->|"success"| OK["Commit offset"]
C -->|"throws"| R{"Retries left?"}
R -->|"yes, backoff"| C
R -->|"exhausted"| DLT["orders.DLT"]
DLT --> I["DLT consumer: log / store / replay"]
Default Error Handler
Section titled “Default Error Handler”By default, Spring Kafka logs the error and moves on. This is usually not what you want.
Retry with Backoff
Section titled “Retry with Backoff”import org.springframework.context.annotation.Beanimport org.springframework.context.annotation.Configurationimport org.springframework.kafka.listener.DefaultErrorHandlerimport org.springframework.util.backoff.FixedBackOff
@Configurationclass KafkaErrorConfig {
@Bean fun kafkaErrorHandler(): DefaultErrorHandler { // Retry 3 times with 1 second between retries val backoff = FixedBackOff(1000L, 3L) // interval, maxAttempts val handler = DefaultErrorHandler(backoff)
// Don't retry certain exceptions (they'll always fail) handler.addNotRetryableExceptions( com.fasterxml.jackson.core.JsonParseException::class.java, IllegalArgumentException::class.java )
return handler }}Dead Letter Topic (DLT)
Section titled “Dead Letter Topic (DLT)”After all retries are exhausted, send the failed message to a Dead Letter Topic for later inspection:
// Manual DLT implementation in kafkajsasync function processWithDLT(message: KafkaMessage) { try { await processMessage(message); } catch (error) { await producer.send({ topic: 'orders.DLT', messages: [{ key: message.key, value: message.value, headers: { ...message.headers, 'error': error.message } }] }); }}import org.springframework.kafka.listener.DeadLetterPublishingRecovererimport org.springframework.kafka.listener.DefaultErrorHandlerimport org.springframework.util.backoff.ExponentialBackOff
@Configurationclass KafkaDLTConfig {
@Bean fun kafkaErrorHandler( kafkaTemplate: KafkaTemplate<String, Any> ): DefaultErrorHandler { // Send to DLT after retries exhausted val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, ex -> // Custom DLT topic naming TopicPartition("${record.topic()}.DLT", -1) // -1 = any partition }
// Exponential backoff: 1s, 2s, 4s, 8s, then give up val backoff = ExponentialBackOff(1000L, 2.0).apply { maxElapsedTime = 15000L // Give up after 15 seconds total }
return DefaultErrorHandler(recoverer, backoff).apply { addNotRetryableExceptions( com.fasterxml.jackson.core.JsonParseException::class.java ) } }}Key differences: in kafkajs you hand-roll the try/catch and re-publish to the
DLT yourself. Spring’s DeadLetterPublishingRecoverer plus DefaultErrorHandler
do the retry, backoff, and DLT routing declaratively — no try/catch in your
listener.
Consuming from DLT
Section titled “Consuming from DLT”@Componentclass DLTConsumer {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(topics = ["orders.DLT"], groupId = "dlt-processor") fun processDLT( record: ConsumerRecord<String, Any>, @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) originalTopic: String?, @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) errorMessage: String?, @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) originalOffset: ByteArray? ) { logger.error(""" Dead letter received: Original topic: $originalTopic Original offset: ${originalOffset?.let { String(it) }} Error: $errorMessage Key: ${record.key()} Value: ${record.value()} """.trimIndent())
// Options: // 1. Log and alert for manual investigation // 2. Store in a database for retry later // 3. Try to fix and re-publish to original topic }}Non-Blocking Retry Topics
Section titled “Non-Blocking Retry Topics”For more sophisticated retry patterns, Spring Kafka supports non-blocking retries using separate retry topics. Failed messages flow through a chain of delay topics before landing in the DLT, so the main topic is never blocked by one slow message.
import org.springframework.kafka.retrytopic.RetryTopicConfigurationimport org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder
@Configurationclass KafkaRetryConfig {
@Bean fun retryTopicConfig( kafkaTemplate: KafkaTemplate<String, Any> ): RetryTopicConfiguration { return RetryTopicConfigurationBuilder .newInstance() .exponentialBackoff(1000, 2.0, 30000) // 1s, 2s, 4s, 8s, 16s, 30s .maxAttempts(5) .autoCreateTopics(true, 1, 1.toShort()) // (numPartitions, replFactor) .includeTopics(listOf("orders")) .create(kafkaTemplate) }}This automatically creates a chain of topics:
flowchart LR O["orders"] --> R0["orders-retry-0 (1s)"] R0 --> R1["orders-retry-1 (2s)"] R1 --> R2["orders-retry-2 (4s)"] R2 --> DLT["orders.DLT"]
Error Handling Comparison
Section titled “Error Handling Comparison”| Feature | kafkajs (TS) | sarama (Go) | Spring Kafka |
|---|---|---|---|
| Retry | Manual | Manual | Built-in (DefaultErrorHandler) |
| Backoff | Manual | Manual | FixedBackOff, ExponentialBackOff |
| DLT | Manual | Manual | DeadLetterPublishingRecoverer |
| Non-blocking retry | Manual | Manual | @RetryableTopic / RetryTopicConfiguration |
| Per-exception rules | Manual | Manual | addNotRetryableExceptions() |
Exactly-Once Semantics
Section titled “Exactly-Once Semantics”Kafka provides three delivery guarantees:
| Guarantee | Meaning | How |
|---|---|---|
| At-most-once | Message may be lost, never duplicated | Auto-commit offsets before processing |
| At-least-once | Message never lost, may be duplicated | Commit offsets after processing (default) |
| Exactly-once | Message processed exactly once | Idempotent producer + transactional consumer |
Idempotent Producer
Section titled “Idempotent Producer”Prevents duplicate messages caused by producer retries:
spring: kafka: producer: properties: enable.idempotence: true # Default in newer Kafka versions acks: all max.in.flight.requests.per.connection: 5How it works: The producer assigns a sequence number to each message. The
broker deduplicates based on (producerId, sequence). No code changes needed.
Transactional Producer
Section titled “Transactional Producer”For atomic multi-message sends (all or nothing):
spring: kafka: producer: transaction-id-prefix: order-tx- # Enables transactions@Serviceclass TransactionalProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// All messages in this method are sent atomically fun processOrderTransaction(order: OrderEvent) { kafkaTemplate.executeInTransaction { template -> // These are all committed or all rolled back template.send("orders", order.orderId, order) template.send("order-events", order.orderId, OrderCreated(order.orderId, order.userId, order.amount)) template.send("analytics", "order-created", mapOf("orderId" to order.orderId, "amount" to order.amount)) } }}Consume-Transform-Produce Pattern
Section titled “Consume-Transform-Produce Pattern”The most important exactly-once pattern: consume a message, process it, produce a result, and commit the offset — all atomically.
sequenceDiagram participant In as raw-orders participant C as Consumer participant P as Producer participant Out as enriched-orders In->>C: poll(order) C->>C: enrichOrder(order) Note over C,P: single Kafka transaction C->>P: send(enriched) P->>Out: enriched-orders P->>In: commit consumer offset
@Configurationclass ExactlyOnceConfig {
@Bean fun producerFactory(): ProducerFactory<String, Any> { val config = mapOf( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, ProducerConfig.TRANSACTIONAL_ID_CONFIG to "order-processor-tx" ) return DefaultKafkaProducerFactory(config) }
@Bean fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> { return KafkaTemplate(producerFactory) }
@Bean fun kafkaTransactionManager( producerFactory: ProducerFactory<String, Any> ): KafkaTransactionManager<String, Any> { return KafkaTransactionManager(producerFactory) }
@Bean fun listenerContainerFactory( consumerFactory: ConsumerFactory<String, Any>, kafkaTransactionManager: KafkaTransactionManager<String, Any> ): ConcurrentKafkaListenerContainerFactory<String, Any> { val factory = ConcurrentKafkaListenerContainerFactory<String, Any>() factory.consumerFactory = consumerFactory factory.containerProperties.transactionManager = kafkaTransactionManager // Consumer reads committed messages only factory.containerProperties.kafkaConsumerProperties[ ConsumerConfig.ISOLATION_LEVEL_CONFIG ] = "read_committed" return factory }}
@Componentclass ExactlyOnceProcessor(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// Consume → transform → produce (all in one transaction) @KafkaListener(topics = ["raw-orders"], groupId = "order-enricher") fun processAndForward(order: OrderEvent) { // The consumer offset commit, processing, and producer send // are all part of the same Kafka transaction val enrichedOrder = enrichOrder(order) kafkaTemplate.send("enriched-orders", order.orderId, enrichedOrder) // Offset is committed as part of the producer transaction }
private fun enrichOrder(order: OrderEvent): OrderEvent { return order.copy(status = "ENRICHED") }}When to Use Exactly-Once
Section titled “When to Use Exactly-Once”| Scenario | Guarantee Needed | Why |
|---|---|---|
| Logging, metrics | At-most-once | Losing a log line is OK |
| Order processing | At-least-once + idempotent consumer | Duplicate check is simpler than EOS |
| Financial transactions | Exactly-once | Cannot tolerate duplicates or loss |
| Consume-transform-produce | Exactly-once | Atomic pipeline processing |
Idempotent Consumer Pattern
Section titled “Idempotent Consumer Pattern”Instead of exactly-once semantics, make your consumer handle duplicates. Build a
unique event ID from topic + partition + offset and skip anything you’ve already
seen:
@Componentclass IdempotentOrderConsumer( private val orderRepository: OrderRepository, private val processedEventRepository: ProcessedEventRepository) { @KafkaListener(topics = ["orders"], groupId = "idempotent-processor") @Transactional // Database transaction fun process( event: OrderEvent, @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String, @Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int, @Header(KafkaHeaders.OFFSET) offset: Long ) { // Create a unique event ID from topic + partition + offset val eventId = "$topic-$partition-$offset"
// Check if already processed if (processedEventRepository.existsById(eventId)) { println("Duplicate event $eventId, skipping") return }
// Process the event orderRepository.save(Order( orderId = event.orderId, userId = event.userId, amount = event.amount, status = "PROCESSING" ))
// Mark as processed (in same DB transaction) processedEventRepository.save(ProcessedEvent( eventId = eventId, processedAt = Instant.now() )) }}Offset Management
Section titled “Offset Management”Auto Commit vs Manual Commit
Section titled “Auto Commit vs Manual Commit”Auto commit (default):
spring: kafka: consumer: enable-auto-commit: true auto-commit-interval: 5000 # Every 5 secondsManual commit (recommended for production):
spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: MANUAL # or MANUAL_IMMEDIATE, RECORD, BATCHAcknowledgment Modes
Section titled “Acknowledgment Modes”// MANUAL — you control exactly when offset is committed@KafkaListener(topics = ["orders"], groupId = "manual-ack")fun processWithManualAck( event: OrderEvent, acknowledgment: Acknowledgment) { try { processOrder(event) acknowledgment.acknowledge() // Commit offset only after successful processing } catch (e: Exception) { // Don't acknowledge — message will be redelivered throw e }}
// RECORD — auto-commit after each record is processed (Spring manages it)// This is the default when enable-auto-commit=false// spring.kafka.listener.ack-mode=RECORD
// BATCH — auto-commit after all records in a poll batch are processed// spring.kafka.listener.ack-mode=BATCHAck Mode Comparison
Section titled “Ack Mode Comparison”| Ack Mode | Behavior | Guarantee | Performance |
|---|---|---|---|
RECORD | Commit after each record | At-least-once (per record) | Slower (many commits) |
BATCH | Commit after all records in poll | At-least-once (per batch) | Better (fewer commits) |
MANUAL | You call ack.acknowledge() | At-least-once (you decide) | Flexible |
MANUAL_IMMEDIATE | Commit immediately on ack | At-least-once (immediate) | Flexible |
| Auto-commit | Timer-based (not message-based) | At-most-once possible | Best throughput |
Recommendation: Use RECORD or BATCH for most cases. Use MANUAL when you
need precise control (e.g., after writing to a database).
Seeking to Specific Offsets
Section titled “Seeking to Specific Offsets”@Componentclass OffsetSeekConsumer : ConsumerSeekAware {
private lateinit var seekCallback: ConsumerSeekAware.ConsumerSeekCallback
override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback) { this.seekCallback = callback }
// Seek to beginning (replay all messages) fun seekToBeginning(topic: String, partition: Int) { seekCallback.seekToBeginning(topic, partition) }
// Seek to specific offset fun seekToOffset(topic: String, partition: Int, offset: Long) { seekCallback.seek(topic, partition, offset) }
// Seek to timestamp fun seekToTimestamp(topic: String, partition: Int, timestamp: Long) { seekCallback.seekToTimestamp(topic, partition, timestamp) }
@KafkaListener(topics = ["orders"], groupId = "seek-consumer") fun process(event: OrderEvent) { println("Processing: ${event.orderId}") }}Schema Evolution
Section titled “Schema Evolution”When event schemas change over time (add fields, remove fields, rename), you need a strategy.
Backward-Compatible Changes (Safe)
Section titled “Backward-Compatible Changes (Safe)”These changes are safe with JSON serialization:
// Version 1data class OrderEvent( val orderId: String, val amount: Double, val timestamp: Instant)
// Version 2 — added optional field (backward compatible)data class OrderEvent( val orderId: String, val amount: Double, val timestamp: Instant, val currency: String = "USD" // New field with default)
// Version 3 — added another optional fielddata class OrderEvent( val orderId: String, val amount: Double, val timestamp: Instant, val currency: String = "USD", val metadata: Map<String, String> = emptyMap() // New field with default)Rules for safe JSON evolution:
- Add new fields with defaults → old consumers ignore them, new consumers use defaults.
- Don’t remove required fields → old messages won’t deserialize.
- Don’t rename fields → same as remove + add.
- Don’t change field types → deserialization fails.
Jackson Configuration for Forward Compatibility
Section titled “Jackson Configuration for Forward Compatibility”import com.fasterxml.jackson.annotation.JsonIgnorePropertiesimport com.fasterxml.jackson.databind.DeserializationFeatureimport com.fasterxml.jackson.databind.ObjectMapperimport com.fasterxml.jackson.module.kotlin.registerKotlinModule
// Global: ignore unknown fields (consumer doesn't fail on new fields)val objectMapper = ObjectMapper() .registerKotlinModule() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
// Per-class: ignore unknown fields@JsonIgnoreProperties(ignoreUnknown = true)data class OrderEvent( val orderId: String, val amount: Double, val timestamp: Instant)Versioned Topics Strategy
Section titled “Versioned Topics Strategy”For breaking changes, use versioned topics — orders.v1 for the old format,
orders.v2 for the new — and run both consumers during the migration:
// Consumer that handles both during migration@Componentclass VersionedConsumer {
@KafkaListener(topics = ["orders.v1"], groupId = "order-processor") fun processV1(event: OrderEventV1) { val converted = event.toV2() processOrder(converted) }
@KafkaListener(topics = ["orders.v2"], groupId = "order-processor") fun processV2(event: OrderEventV2) { processOrder(event) }}Testing Kafka
Section titled “Testing Kafka”Embedded Kafka for Integration Tests
Section titled “Embedded Kafka for Integration Tests”dependencies { testImplementation("org.springframework.kafka:spring-kafka-test")}import org.springframework.kafka.test.context.EmbeddedKafkaimport org.springframework.boot.test.context.SpringBootTest
@SpringBootTest@EmbeddedKafka( partitions = 1, topics = ["orders", "orders.DLT"], brokerProperties = [ "listeners=PLAINTEXT://localhost:9092", "port=9092" ])class OrderProcessingIntegrationTest {
@Autowired lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Autowired lateinit var orderRepository: OrderRepository
@Test fun `should process order event`() { // Given val event = OrderEvent( orderId = "test-001", userId = 1L, amount = 99.99, status = "CREATED" )
// When kafkaTemplate.send("orders", event.orderId, event).get()
// Then — wait for async consumer to process await().atMost(Duration.ofSeconds(10)).untilAsserted { val order = orderRepository.findByOrderId("test-001") assertNotNull(order) assertEquals("PROCESSING", order!!.status) } }
@Test fun `should send to DLT on failure`() { // Given — an event that will cause processing failure val badEvent = OrderEvent( orderId = "bad-order", userId = -1L, // Invalid user ID causes failure amount = -100.0, status = "CREATED" )
// When kafkaTemplate.send("orders", badEvent.orderId, badEvent).get()
// Then — message should end up in DLT val consumer = createTestConsumer("orders.DLT") val records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10)) assertTrue(records.count() > 0) }}Testcontainers Kafka
Section titled “Testcontainers Kafka”For tests that need a real Kafka broker:
dependencies { testImplementation("org.testcontainers:kafka:1.20.4")}@SpringBootTest@Testcontainersclass KafkaTestcontainersTest {
companion object { @Container val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")) .withKraft()
@JvmStatic @DynamicPropertySource fun kafkaProperties(registry: DynamicPropertyRegistry) { registry.add("spring.kafka.bootstrap-servers") { kafka.bootstrapServers } } }
@Autowired lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Test fun `should produce and consume messages`() { val event = OrderEvent("order-1", 1L, 50.0, "CREATED") kafkaTemplate.send("orders", event.orderId, event).get()
// Assert consumption... }}Practice
Section titled “Practice”Put the producer/consumer patterns to work — build real pipelines backed by the shared Kafka stack.