Skip to content

Event-Driven with Kafka

You’ve used Kafka through kafkajs (TypeScript) or sarama/confluent-kafka-go (Go). On the JVM, Spring Kafka is the standard — it provides KafkaTemplate for producing and @KafkaListener for consuming. This module covers production patterns: serialization, error handling, dead letter topics, exactly-once semantics, and offset management.

If you’ve used Kafka before, this is a quick reminder. If not, here are the core concepts: a producer writes to a partitioned topic, and a consumer group divides the partitions among its members.

Producer → topic → consumer group
Rendering diagram…
ConceptDescription
TopicNamed stream of messages (like a queue/channel name)
PartitionOrdered, immutable sequence within a topic. Messages within a partition are ordered.
OffsetPosition of a message within a partition (monotonically increasing)
Consumer GroupSet of consumers that cooperate to consume a topic. Each partition goes to exactly one consumer in the group.
KeyOptional message key — messages with the same key go to the same partition (ordering guarantee)
BrokerKafka server node
Use CaseToolWhy
Fire-and-forget notificationsRedis Pub/SubSimplest, lowest latency
Task queue (process once)RabbitMQ or Redis StreamsBuilt-in ack, simpler ops
Event sourcing, audit logKafkaPersistent, replayable, ordered
Data pipeline between servicesKafkaDecouples producers from consumers
Real-time analyticsKafkaHigh throughput, consumer groups
Request/reply RPCHTTP or gRPCKafka is not designed for request/reply
build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
kotlin("plugin.spring") version "2.1.0"
id("org.springframework.boot") version "3.4.1"
id("io.spring.dependency-management") version "1.1.7"
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
testImplementation("org.springframework.kafka:spring-kafka-test")
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # Wait for all replicas to acknowledge
retries: 3 # Retry on transient failures
properties:
enable.idempotence: true # Prevent duplicate messages on retry
max.in.flight.requests.per.connection: 5
consumer:
group-id: my-app-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest # Start from beginning if no committed offset
properties:
spring.json.trusted.packages: "com.example.*" # Trust classes for deserialization
listener:
ack-mode: record # Acknowledge each record individually
concurrency: 3 # Number of consumer threads
src/main/kotlin/com/example/config/KafkaTopicConfig.kt
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
@Configuration
class KafkaTopicConfig {
@Bean
fun ordersTopic(): NewTopic = TopicBuilder
.name("orders")
.partitions(6)
.replicas(1) // Use 3 in production
.build()
@Bean
fun orderEventsTopic(): NewTopic = TopicBuilder
.name("order-events")
.partitions(6)
.replicas(1)
.config("retention.ms", "604800000") // 7 days
.build()
@Bean
fun deadLetterTopic(): NewTopic = TopicBuilder
.name("orders.DLT") // Convention: original-topic.DLT
.partitions(1)
.replicas(1)
.build()
}
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['localhost:29092'] });
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{ key: 'order-123', value: JSON.stringify({ orderId: 123, amount: 99.99 }) }
]
});

In TS and Go you build the message and call send yourself; in Kotlin the KafkaTemplate<String, Any> handles serialization and gives you sync, async (callback), and fire-and-forget variants from the same API.

The message key determines which partition a message goes to. Messages with the same key always go to the same partition, guaranteeing ordering.

@Service
class EventProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// All events for the same order go to the same partition (ordered)
fun sendOrderEvent(orderId: String, event: Any) {
kafkaTemplate.send("order-events", orderId, event)
// orderId = "order-123" → always partition X
// orderId = "order-456" → always partition Y
}
// All events for the same user go to the same partition
fun sendUserEvent(userId: String, event: Any) {
kafkaTemplate.send("user-events", userId, event)
}
// No key = round-robin across partitions (no ordering guarantee)
fun sendMetricEvent(event: Any) {
kafkaTemplate.send("metrics", event)
}
}
Key StrategyOrdering GuaranteeUse Case
Entity ID (e.g., orderId)All events for same entity are orderedOrder lifecycle, user actions
No key (null)No ordering (round-robin)Metrics, logs, independent events
Composite key (userId:action)Grouped orderingUser-action streams
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log(`Processing order ${order.orderId} from partition ${partition}`);
}
});

Key differences: kafkajs and sarama make you wire up the poll loop and JSON decoding by hand. Spring’s @KafkaListener runs the loop for you and deserializes the payload into your OrderEvent data class — the method body is just business logic.

@Component
class OrderConsumer {
// With full metadata
@KafkaListener(topics = ["orders"], groupId = "order-processor")
fun processOrderWithMetadata(
@Payload event: OrderEvent,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long,
@Header(KafkaHeaders.RECEIVED_KEY, required = false) key: String?
) {
println("Received from $topic-$partition @ offset $offset")
println("Key: $key, Timestamp: ${Instant.ofEpochMilli(timestamp)}")
println("Order: ${event.orderId}, Amount: ${event.amount}")
}
// With ConsumerRecord (raw access)
@KafkaListener(topics = ["orders"], groupId = "order-processor-raw")
fun processOrderRaw(record: ConsumerRecord<String, OrderEvent>) {
println("Key: ${record.key()}")
println("Value: ${record.value()}")
println("Partition: ${record.partition()}, Offset: ${record.offset()}")
record.headers().forEach { header ->
println("Header: ${header.key()} = ${String(header.value())}")
}
}
}

The raw form gives you a ConsumerRecord<String, OrderEvent> with key, value, partition, offset, and headers — equivalent to the message object you’d inspect in kafkajs.

@Component
class BatchOrderConsumer {
// Process messages in batches for higher throughput
@KafkaListener(
topics = ["orders"],
groupId = "order-batch-processor",
containerFactory = "batchListenerFactory"
)
fun processBatch(events: List<OrderEvent>) {
println("Processing batch of ${events.size} orders")
events.forEach { event ->
println(" Order: ${event.orderId}")
}
}
}
// Configuration for batch listener
@Configuration
class KafkaBatchConfig {
@Bean
fun batchListenerFactory(
consumerFactory: ConsumerFactory<String, OrderEvent>
): ConcurrentKafkaListenerContainerFactory<String, OrderEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<String, OrderEvent>()
factory.consumerFactory = consumerFactory
factory.isBatchListener = true // Enable batch mode
factory.containerProperties.ackMode = ContainerProperties.AckMode.BATCH
return factory
}
}

A batch listener takes a List<OrderEvent> instead of a single event, so you can amortize work (one DB write, fewer commits) across a poll.

@Component
class MultiTopicConsumer {
// Listen to multiple topics
@KafkaListener(topics = ["orders", "payments", "shipments"], groupId = "event-logger")
fun logEvent(
@Payload event: Any,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String
) {
println("[$topic] Event: $event")
}
// Pattern-based subscription
@KafkaListener(topicPattern = "order-.*", groupId = "order-all-events")
fun processAllOrderEvents(
@Payload event: Any,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String
) {
println("Order event from $topic: $event")
}
}
@Component
class ConcurrentConsumer {
// 3 consumer threads — each gets a subset of partitions
@KafkaListener(
topics = ["orders"],
groupId = "concurrent-processor",
concurrency = "3" // 3 threads
)
fun process(event: OrderEvent) {
println("[${Thread.currentThread().name}] Processing: ${event.orderId}")
}
}

Concurrency rules:

  • concurrency = N creates N consumer threads in the same consumer group.
  • Each thread gets some partitions (max: total partitions / N per thread).
  • If concurrency > partition count, excess threads sit idle.
  • For 6 partitions and concurrency = 3: each thread gets 2 partitions.

Spring Kafka uses Jackson for JSON serialization by default:

application.yml
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.*"
spring.json.value.default.type: "com.example.OrderEvent"
// Events are automatically serialized/deserialized as JSON
data class OrderEvent(
val orderId: String,
val userId: Long,
val amount: Double,
val status: String,
val items: List<OrderItem> = emptyList(),
val timestamp: Instant = Instant.now()
)
data class OrderItem(
val productId: String,
val quantity: Int,
val price: Double
)
// Producer — OrderEvent is automatically serialized to JSON
kafkaTemplate.send("orders", event.orderId, event)
// Consumer — JSON is automatically deserialized to OrderEvent
@KafkaListener(topics = ["orders"])
fun process(event: OrderEvent) { /* event is already deserialized */ }

Spring Kafka can include type information in headers so the consumer knows which class to deserialize to:

// Producer config — add type info to headers
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val config = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
// Include type info in headers
JsonSerializer.ADD_TYPE_INFO_HEADERS to true
)
return DefaultKafkaProducerFactory(config)
}
// Consumer config — use header-based type resolution
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
val config = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092",
ConsumerConfig.GROUP_ID_CONFIG to "my-group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "com.example.*",
JsonDeserializer.USE_TYPE_INFO_HEADERS to true
)
return DefaultKafkaConsumerFactory(config)
}

When a topic contains multiple event types, model them as a sealed class and branch with an exhaustive when:

// Define event hierarchy
sealed class OrderDomainEvent {
abstract val orderId: String
abstract val timestamp: Instant
}
data class OrderCreated(
override val orderId: String,
val userId: Long,
val amount: Double,
override val timestamp: Instant = Instant.now()
) : OrderDomainEvent()
data class OrderPaid(
override val orderId: String,
val paymentId: String,
override val timestamp: Instant = Instant.now()
) : OrderDomainEvent()
data class OrderShipped(
override val orderId: String,
val trackingNumber: String,
override val timestamp: Instant = Instant.now()
) : OrderDomainEvent()
// Producer — send different event types to the same topic
@Service
class OrderEventProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
fun orderCreated(orderId: String, userId: Long, amount: Double) {
val event = OrderCreated(orderId, userId, amount)
kafkaTemplate.send("order-events", orderId, event)
}
fun orderPaid(orderId: String, paymentId: String) {
val event = OrderPaid(orderId, paymentId)
kafkaTemplate.send("order-events", orderId, event)
}
fun orderShipped(orderId: String, trackingNumber: String) {
val event = OrderShipped(orderId, trackingNumber)
kafkaTemplate.send("order-events", orderId, event)
}
}
// Consumer — handle different types
@Component
class OrderEventConsumer {
@KafkaListener(topics = ["order-events"], groupId = "order-event-handler")
fun handle(event: OrderDomainEvent) {
when (event) {
is OrderCreated -> handleCreated(event)
is OrderPaid -> handlePaid(event)
is OrderShipped -> handleShipped(event)
}
}
private fun handleCreated(event: OrderCreated) {
println("Order ${event.orderId} created by user ${event.userId}")
}
private fun handlePaid(event: OrderPaid) {
println("Order ${event.orderId} paid: ${event.paymentId}")
}
private fun handleShipped(event: OrderShipped) {
println("Order ${event.orderId} shipped: ${event.trackingNumber}")
}
}

For production systems with strict schema governance, Apache Avro with the Confluent Schema Registry is the industry standard. Avro provides:

  • Binary serialization (smaller, faster than JSON)
  • Schema evolution (add/remove fields safely)
  • Schema Registry enforces compatibility
// build.gradle.kts (for Avro — not covered in depth here)
dependencies {
implementation("io.confluent:kafka-avro-serializer:7.6.0")
implementation("org.apache.avro:avro:1.11.3")
}
application.yml
spring:
kafka:
producer:
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081

When a consumer fails to process a message, you need a strategy. Without one, the consumer retries infinitely (or skips the message silently). The production pattern is: retry with backoff, then route the poison message to a Dead Letter Topic.

Retry then dead letter
Rendering diagram…

By default, Spring Kafka logs the error and moves on. This is usually not what you want.

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.FixedBackOff
@Configuration
class KafkaErrorConfig {
@Bean
fun kafkaErrorHandler(): DefaultErrorHandler {
// Retry 3 times with 1 second between retries
val backoff = FixedBackOff(1000L, 3L) // interval, maxAttempts
val handler = DefaultErrorHandler(backoff)
// Don't retry certain exceptions (they'll always fail)
handler.addNotRetryableExceptions(
com.fasterxml.jackson.core.JsonParseException::class.java,
IllegalArgumentException::class.java
)
return handler
}
}

After all retries are exhausted, send the failed message to a Dead Letter Topic for later inspection:

// Manual DLT implementation in kafkajs
async function processWithDLT(message: KafkaMessage) {
try {
await processMessage(message);
} catch (error) {
await producer.send({
topic: 'orders.DLT',
messages: [{ key: message.key, value: message.value,
headers: { ...message.headers, 'error': error.message } }]
});
}
}

Key differences: in kafkajs you hand-roll the try/catch and re-publish to the DLT yourself. Spring’s DeadLetterPublishingRecoverer plus DefaultErrorHandler do the retry, backoff, and DLT routing declaratively — no try/catch in your listener.

@Component
class DLTConsumer {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(topics = ["orders.DLT"], groupId = "dlt-processor")
fun processDLT(
record: ConsumerRecord<String, Any>,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) originalTopic: String?,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) errorMessage: String?,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) originalOffset: ByteArray?
) {
logger.error("""
Dead letter received:
Original topic: $originalTopic
Original offset: ${originalOffset?.let { String(it) }}
Error: $errorMessage
Key: ${record.key()}
Value: ${record.value()}
""".trimIndent())
// Options:
// 1. Log and alert for manual investigation
// 2. Store in a database for retry later
// 3. Try to fix and re-publish to original topic
}
}

For more sophisticated retry patterns, Spring Kafka supports non-blocking retries using separate retry topics. Failed messages flow through a chain of delay topics before landing in the DLT, so the main topic is never blocked by one slow message.

import org.springframework.kafka.retrytopic.RetryTopicConfiguration
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder
@Configuration
class KafkaRetryConfig {
@Bean
fun retryTopicConfig(
kafkaTemplate: KafkaTemplate<String, Any>
): RetryTopicConfiguration {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2.0, 30000) // 1s, 2s, 4s, 8s, 16s, 30s
.maxAttempts(5)
.autoCreateTopics(true, 1, 1.toShort()) // (numPartitions, replFactor)
.includeTopics(listOf("orders"))
.create(kafkaTemplate)
}
}

This automatically creates a chain of topics:

Non-blocking retry topic chain
Rendering diagram…
Featurekafkajs (TS)sarama (Go)Spring Kafka
RetryManualManualBuilt-in (DefaultErrorHandler)
BackoffManualManualFixedBackOff, ExponentialBackOff
DLTManualManualDeadLetterPublishingRecoverer
Non-blocking retryManualManual@RetryableTopic / RetryTopicConfiguration
Per-exception rulesManualManualaddNotRetryableExceptions()

Kafka provides three delivery guarantees:

GuaranteeMeaningHow
At-most-onceMessage may be lost, never duplicatedAuto-commit offsets before processing
At-least-onceMessage never lost, may be duplicatedCommit offsets after processing (default)
Exactly-onceMessage processed exactly onceIdempotent producer + transactional consumer

Prevents duplicate messages caused by producer retries:

application.yml
spring:
kafka:
producer:
properties:
enable.idempotence: true # Default in newer Kafka versions
acks: all
max.in.flight.requests.per.connection: 5

How it works: The producer assigns a sequence number to each message. The broker deduplicates based on (producerId, sequence). No code changes needed.

For atomic multi-message sends (all or nothing):

application.yml
spring:
kafka:
producer:
transaction-id-prefix: order-tx- # Enables transactions
@Service
class TransactionalProducer(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// All messages in this method are sent atomically
fun processOrderTransaction(order: OrderEvent) {
kafkaTemplate.executeInTransaction { template ->
// These are all committed or all rolled back
template.send("orders", order.orderId, order)
template.send("order-events", order.orderId,
OrderCreated(order.orderId, order.userId, order.amount))
template.send("analytics", "order-created",
mapOf("orderId" to order.orderId, "amount" to order.amount))
}
}
}

The most important exactly-once pattern: consume a message, process it, produce a result, and commit the offset — all atomically.

Consume-transform-produce transaction
Rendering diagram…
@Configuration
class ExactlyOnceConfig {
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val config = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:29092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG to "order-processor-tx"
)
return DefaultKafkaProducerFactory(config)
}
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory)
}
@Bean
fun kafkaTransactionManager(
producerFactory: ProducerFactory<String, Any>
): KafkaTransactionManager<String, Any> {
return KafkaTransactionManager(producerFactory)
}
@Bean
fun listenerContainerFactory(
consumerFactory: ConsumerFactory<String, Any>,
kafkaTransactionManager: KafkaTransactionManager<String, Any>
): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory
factory.containerProperties.transactionManager = kafkaTransactionManager
// Consumer reads committed messages only
factory.containerProperties.kafkaConsumerProperties[
ConsumerConfig.ISOLATION_LEVEL_CONFIG
] = "read_committed"
return factory
}
}
@Component
class ExactlyOnceProcessor(private val kafkaTemplate: KafkaTemplate<String, Any>) {
// Consume → transform → produce (all in one transaction)
@KafkaListener(topics = ["raw-orders"], groupId = "order-enricher")
fun processAndForward(order: OrderEvent) {
// The consumer offset commit, processing, and producer send
// are all part of the same Kafka transaction
val enrichedOrder = enrichOrder(order)
kafkaTemplate.send("enriched-orders", order.orderId, enrichedOrder)
// Offset is committed as part of the producer transaction
}
private fun enrichOrder(order: OrderEvent): OrderEvent {
return order.copy(status = "ENRICHED")
}
}
ScenarioGuarantee NeededWhy
Logging, metricsAt-most-onceLosing a log line is OK
Order processingAt-least-once + idempotent consumerDuplicate check is simpler than EOS
Financial transactionsExactly-onceCannot tolerate duplicates or loss
Consume-transform-produceExactly-onceAtomic pipeline processing

Instead of exactly-once semantics, make your consumer handle duplicates. Build a unique event ID from topic + partition + offset and skip anything you’ve already seen:

@Component
class IdempotentOrderConsumer(
private val orderRepository: OrderRepository,
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["orders"], groupId = "idempotent-processor")
@Transactional // Database transaction
fun process(
event: OrderEvent,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long
) {
// Create a unique event ID from topic + partition + offset
val eventId = "$topic-$partition-$offset"
// Check if already processed
if (processedEventRepository.existsById(eventId)) {
println("Duplicate event $eventId, skipping")
return
}
// Process the event
orderRepository.save(Order(
orderId = event.orderId,
userId = event.userId,
amount = event.amount,
status = "PROCESSING"
))
// Mark as processed (in same DB transaction)
processedEventRepository.save(ProcessedEvent(
eventId = eventId,
processedAt = Instant.now()
))
}
}

Auto commit (default):

spring:
kafka:
consumer:
enable-auto-commit: true
auto-commit-interval: 5000 # Every 5 seconds

Manual commit (recommended for production):

spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: MANUAL # or MANUAL_IMMEDIATE, RECORD, BATCH
// MANUAL — you control exactly when offset is committed
@KafkaListener(topics = ["orders"], groupId = "manual-ack")
fun processWithManualAck(
event: OrderEvent,
acknowledgment: Acknowledgment
) {
try {
processOrder(event)
acknowledgment.acknowledge() // Commit offset only after successful processing
} catch (e: Exception) {
// Don't acknowledge — message will be redelivered
throw e
}
}
// RECORD — auto-commit after each record is processed (Spring manages it)
// This is the default when enable-auto-commit=false
// spring.kafka.listener.ack-mode=RECORD
// BATCH — auto-commit after all records in a poll batch are processed
// spring.kafka.listener.ack-mode=BATCH
Ack ModeBehaviorGuaranteePerformance
RECORDCommit after each recordAt-least-once (per record)Slower (many commits)
BATCHCommit after all records in pollAt-least-once (per batch)Better (fewer commits)
MANUALYou call ack.acknowledge()At-least-once (you decide)Flexible
MANUAL_IMMEDIATECommit immediately on ackAt-least-once (immediate)Flexible
Auto-commitTimer-based (not message-based)At-most-once possibleBest throughput

Recommendation: Use RECORD or BATCH for most cases. Use MANUAL when you need precise control (e.g., after writing to a database).

@Component
class OffsetSeekConsumer : ConsumerSeekAware {
private lateinit var seekCallback: ConsumerSeekAware.ConsumerSeekCallback
override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback) {
this.seekCallback = callback
}
// Seek to beginning (replay all messages)
fun seekToBeginning(topic: String, partition: Int) {
seekCallback.seekToBeginning(topic, partition)
}
// Seek to specific offset
fun seekToOffset(topic: String, partition: Int, offset: Long) {
seekCallback.seek(topic, partition, offset)
}
// Seek to timestamp
fun seekToTimestamp(topic: String, partition: Int, timestamp: Long) {
seekCallback.seekToTimestamp(topic, partition, timestamp)
}
@KafkaListener(topics = ["orders"], groupId = "seek-consumer")
fun process(event: OrderEvent) {
println("Processing: ${event.orderId}")
}
}

When event schemas change over time (add fields, remove fields, rename), you need a strategy.

These changes are safe with JSON serialization:

// Version 1
data class OrderEvent(
val orderId: String,
val amount: Double,
val timestamp: Instant
)
// Version 2 — added optional field (backward compatible)
data class OrderEvent(
val orderId: String,
val amount: Double,
val timestamp: Instant,
val currency: String = "USD" // New field with default
)
// Version 3 — added another optional field
data class OrderEvent(
val orderId: String,
val amount: Double,
val timestamp: Instant,
val currency: String = "USD",
val metadata: Map<String, String> = emptyMap() // New field with default
)

Rules for safe JSON evolution:

  • Add new fields with defaults → old consumers ignore them, new consumers use defaults.
  • Don’t remove required fields → old messages won’t deserialize.
  • Don’t rename fields → same as remove + add.
  • Don’t change field types → deserialization fails.

Jackson Configuration for Forward Compatibility

Section titled “Jackson Configuration for Forward Compatibility”
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
// Global: ignore unknown fields (consumer doesn't fail on new fields)
val objectMapper = ObjectMapper()
.registerKotlinModule()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
// Per-class: ignore unknown fields
@JsonIgnoreProperties(ignoreUnknown = true)
data class OrderEvent(
val orderId: String,
val amount: Double,
val timestamp: Instant
)

For breaking changes, use versioned topics — orders.v1 for the old format, orders.v2 for the new — and run both consumers during the migration:

// Consumer that handles both during migration
@Component
class VersionedConsumer {
@KafkaListener(topics = ["orders.v1"], groupId = "order-processor")
fun processV1(event: OrderEventV1) {
val converted = event.toV2()
processOrder(converted)
}
@KafkaListener(topics = ["orders.v2"], groupId = "order-processor")
fun processV2(event: OrderEventV2) {
processOrder(event)
}
}
build.gradle.kts
dependencies {
testImplementation("org.springframework.kafka:spring-kafka-test")
}
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = ["orders", "orders.DLT"],
brokerProperties = [
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
]
)
class OrderProcessingIntegrationTest {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Autowired
lateinit var orderRepository: OrderRepository
@Test
fun `should process order event`() {
// Given
val event = OrderEvent(
orderId = "test-001",
userId = 1L,
amount = 99.99,
status = "CREATED"
)
// When
kafkaTemplate.send("orders", event.orderId, event).get()
// Then — wait for async consumer to process
await().atMost(Duration.ofSeconds(10)).untilAsserted {
val order = orderRepository.findByOrderId("test-001")
assertNotNull(order)
assertEquals("PROCESSING", order!!.status)
}
}
@Test
fun `should send to DLT on failure`() {
// Given — an event that will cause processing failure
val badEvent = OrderEvent(
orderId = "bad-order",
userId = -1L, // Invalid user ID causes failure
amount = -100.0,
status = "CREATED"
)
// When
kafkaTemplate.send("orders", badEvent.orderId, badEvent).get()
// Then — message should end up in DLT
val consumer = createTestConsumer("orders.DLT")
val records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10))
assertTrue(records.count() > 0)
}
}

For tests that need a real Kafka broker:

build.gradle.kts
dependencies {
testImplementation("org.testcontainers:kafka:1.20.4")
}
@SpringBootTest
@Testcontainers
class KafkaTestcontainersTest {
companion object {
@Container
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
.withKraft()
@JvmStatic
@DynamicPropertySource
fun kafkaProperties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.bootstrap-servers") { kafka.bootstrapServers }
}
}
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Test
fun `should produce and consume messages`() {
val event = OrderEvent("order-1", 1L, 50.0, "CREATED")
kafkaTemplate.send("orders", event.orderId, event).get()
// Assert consumption...
}
}

Put the producer/consumer patterns to work — build real pipelines backed by the shared Kafka stack.