Order Processing Pipeline
Build an async order pipeline on Spring Kafka: a REST endpoint publishes an
OrderEvent to an orders topic, a consumer processes it (validate → persist →
update status), and anything that fails is retried with exponential backoff and
ultimately parked on a Dead Letter Topic (orders.DLT) for inspection. It’s the
canonical “accept fast, process later, never silently drop a message” backend
pattern.
If you’ve built a queue worker in TS (BullMQ) or Go (a channel + worker pool), this is the same shape — but Kafka gives you the durable log, consumer groups for horizontal scale, and Spring Kafka wires the retry/DLT machinery for you.
What you’ll build
Section titled “What you’ll build”- An
OrderProducerthat publishesOrderEventmessages to theorderstopic. - An
OrderConsumer(@KafkaListener) that validates the order, saves it to Postgres, and moves its statusCREATED → PROCESSING → COMPLETED. - Exponential-backoff retry (1s initial, 2x multiplier, capped at 15s elapsed).
- A Dead Letter Topic (
orders.DLT) that catches messages after retries are exhausted. - A
DLTConsumerthat logs the failure and stores it in afailed_orderstable. - A
GET /api/orders/{id}/statusendpoint to poll processing status.
Pipeline topology
Section titled “Pipeline topology”The producer and consumer never talk directly — Kafka’s log sits between them, so
the POST returns immediately (202 Accepted) while processing happens
asynchronously. Failures get retried, then diverted to the DLT.
flowchart LR
POST["POST /api/orders"] --> PROD["OrderProducer"]
PROD -->|"orders topic"| K["Kafka"]
K --> CONS["OrderConsumer (group: order-processor)"]
CONS -->|"valid"| DB[("Postgres orders")]
CONS -.->|"retries exhausted"| DLT["orders.DLT topic"]
DLT --> DLTC["DLTConsumer (group: dlt-processor)"]
DLTC --> FDB[("Postgres failed_orders")]
The worked solution
Section titled “The worked solution”A standard Spring Boot layout — controller, producer, consumer, config, plus JPA model and Flyway migration:
Directoryorder-pipeline/
- build.gradle.kts Spring Boot + Kafka + JPA deps
- settings.gradle.kts project name
Directorysrc/main/kotlin/com/example/
- Application.kt Spring Boot entrypoint
Directoryconfig/
- KafkaConfig.kt topics, retry backoff, DLT recoverer
Directorycontroller/
- OrderController.kt REST API
Directoryproducer/
- OrderProducer.kt publishes to the orders topic
Directoryconsumer/
- OrderConsumer.kt processes orders
- DLTConsumer.kt handles dead letters
Directoryevent/
- OrderEvent.kt the message payload
Directorymodel/
- Order.kt JPA entity
Directoryservice/
- OrderService.kt validation + persistence
Directoryrepository/
- OrderRepository.kt Spring Data JPA
Directorysrc/main/resources/
- application.yml Kafka + datasource config
- db/migration/V1__create_orders.sql orders + failed_orders tables
The message: OrderEvent.kt
Section titled “The message: OrderEvent.kt”The payload is a plain data class. Every field has a default — that matters for
Kafka’s JsonDeserializer, which needs a no-arg constructor to reconstruct the
object on the consumer side. OrderItem is nested and serialized inline.
package com.example.event
import java.time.Instant
data class OrderEvent( val orderId: String = "", val userId: Long = 0, val amount: Double = 0.0, val status: String = "CREATED", val items: List<OrderItem> = emptyList(), val timestamp: Instant = Instant.now())
data class OrderItem( val productId: String = "", val quantity: Int = 0, val price: Double = 0.0)Topics + retry + DLT: KafkaConfig.kt
Section titled “Topics + retry + DLT: KafkaConfig.kt”This is where the resilience lives. Two NewTopic beans declare the topics (Spring
creates them on startup via KafkaAdmin). The orders topic gets 6 partitions for
parallelism; the DLT needs only 1.
The DefaultErrorHandler is the heart of it: when a @KafkaListener throws, this
handler retries on the configured ExponentialBackOff (1s, then 2s, then 4s…
capped at 15s total elapsed), and once retries are exhausted the
DeadLetterPublishingRecoverer republishes the failed record to
<topic>.DLT — here, orders.DLT.
package com.example.config
import org.apache.kafka.clients.admin.NewTopicimport org.springframework.context.annotation.Beanimport org.springframework.context.annotation.Configurationimport org.springframework.kafka.config.TopicBuilderimport org.springframework.kafka.core.KafkaTemplateimport org.springframework.kafka.listener.DeadLetterPublishingRecovererimport org.springframework.kafka.listener.DefaultErrorHandlerimport org.springframework.util.backoff.ExponentialBackOff
@Configurationclass KafkaConfig {
companion object { const val ORDERS_TOPIC = "orders" const val ORDERS_DLT_TOPIC = "orders.DLT" }
@Bean fun ordersTopic(): NewTopic = TopicBuilder .name(ORDERS_TOPIC) .partitions(6) .replicas(1) .build()
@Bean fun deadLetterTopic(): NewTopic = TopicBuilder .name(ORDERS_DLT_TOPIC) .partitions(1) .replicas(1) .build()
@Bean fun kafkaErrorHandler(kafkaTemplate: KafkaTemplate<String, Any>): DefaultErrorHandler { val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate)
val backoff = ExponentialBackOff(1000L, 2.0).apply { maxElapsedTime = 15000L }
return DefaultErrorHandler(recoverer, backoff) }}Publishing: OrderProducer.kt
Section titled “Publishing: OrderProducer.kt”A thin wrapper over KafkaTemplate<String, Any>. The send takes a key
(event.orderId) and a value (the event). The key controls partitioning — all
events for the same orderId land on the same partition and are therefore
processed in order.
package com.example.producer
import com.example.config.KafkaConfigimport com.example.event.OrderEventimport org.slf4j.LoggerFactoryimport org.springframework.kafka.core.KafkaTemplateimport org.springframework.stereotype.Service
@Serviceclass OrderProducer( private val kafkaTemplate: KafkaTemplate<String, Any>) { private val logger = LoggerFactory.getLogger(javaClass)
fun sendOrder(event: OrderEvent) { logger.info("Sending order event: orderId={}, amount={}", event.orderId, event.amount) kafkaTemplate.send(KafkaConfig.ORDERS_TOPIC, event.orderId, event) }}The REST entrypoint: OrderController.kt
Section titled “The REST entrypoint: OrderController.kt”The POST handler generates an orderId, builds the event, fires it at the
producer, and returns 202 Accepted immediately — it does not wait for
processing. That’s the async contract: the client gets an id and polls
/{id}/status later. The nested CreateOrderRequest is the request body shape
(Jackson + the Kotlin module deserialize it).
package com.example.controller
import com.example.event.OrderEventimport com.example.event.OrderItemimport com.example.producer.OrderProducerimport com.example.service.OrderServiceimport org.springframework.http.HttpStatusimport org.springframework.http.ResponseEntityimport org.springframework.web.bind.annotation.*import java.util.UUID
@RestController@RequestMapping("/api/orders")class OrderController( private val orderProducer: OrderProducer, private val orderService: OrderService) {
data class CreateOrderRequest( val userId: Long, val amount: Double, val items: List<OrderItem> = emptyList() )
@PostMapping fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<Map<String, String>> { val orderId = UUID.randomUUID().toString() val event = OrderEvent( orderId = orderId, userId = request.userId, amount = request.amount, items = request.items, status = "CREATED" ) orderProducer.sendOrder(event) return ResponseEntity.status(HttpStatus.ACCEPTED).body( mapOf("orderId" to orderId, "status" to "ACCEPTED") ) }
@GetMapping("/{id}/status") fun getOrderStatus(@PathVariable id: String): ResponseEntity<Map<String, String>> { val status = orderService.getOrderStatus(id) return ResponseEntity.ok(mapOf("orderId" to id, "status" to status)) }}Consuming: OrderConsumer.kt
Section titled “Consuming: OrderConsumer.kt”The @KafkaListener is the worker. Spring deserializes the JSON value straight
into an OrderEvent parameter, so you write plain business logic. The key move is
that throwing an exception is your error signal — Spring’s DefaultErrorHandler
catches it, retries per the backoff, and routes to the DLT when retries run out.
So an invalid order just throws IllegalArgumentException and the framework does
the rest.
package com.example.consumer
import com.example.config.KafkaConfigimport com.example.event.OrderEventimport com.example.service.OrderServiceimport org.slf4j.LoggerFactoryimport org.springframework.kafka.annotation.KafkaListenerimport org.springframework.stereotype.Component
@Componentclass OrderConsumer( private val orderService: OrderService) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener( topics = [KafkaConfig.ORDERS_TOPIC], groupId = "order-processor" ) fun processOrder(event: OrderEvent) { logger.info("Processing order: orderId={}, userId={}, amount={}", event.orderId, event.userId, event.amount)
// Validate — an invalid order throws, so it goes to retry then the DLT require(event.userId > 0 && event.amount > 0 && event.items.isNotEmpty()) { "Invalid order: $event" }
// Persist and advance status CREATED -> PROCESSING -> COMPLETED orderService.processOrder(event)
logger.info("Order processed successfully: orderId={}", event.orderId) }}Persisting: Order.kt, OrderRepository.kt, OrderService.kt
Section titled “Persisting: Order.kt, OrderRepository.kt, OrderService.kt”The order is stored via Spring Data JPA. Order is the @Entity (note status
and updatedAt are var — they change as the order advances). OrderRepository
is a one-line interface; Spring generates the implementation. OrderService
validates, saves, and advances the status, and answers status queries for the
controller.
package com.example.model
import jakarta.persistence.*import java.time.Instant
@Entity@Table(name = "orders")data class Order( @Id val orderId: String = "", val userId: Long = 0, val amount: Double = 0.0, var status: String = "CREATED", val createdAt: Instant = Instant.now(), var updatedAt: Instant = Instant.now())package com.example.repository
import com.example.model.Orderimport org.springframework.data.jpa.repository.JpaRepositoryimport org.springframework.stereotype.Repository
@Repositoryinterface OrderRepository : JpaRepository<Order, String>The service ties it together: save the order as PROCESSING, do the work, then
flip it to COMPLETED. The status query backs the polling endpoint.
package com.example.service
import com.example.event.OrderEventimport com.example.model.Orderimport com.example.repository.OrderRepositoryimport org.springframework.stereotype.Serviceimport java.time.Instant
@Serviceclass OrderService( private val orderRepository: OrderRepository) {
fun processOrder(event: OrderEvent) { val order = Order( orderId = event.orderId, userId = event.userId, amount = event.amount, status = "PROCESSING" ) orderRepository.save(order)
// ... fulfilment work would go here ...
order.status = "COMPLETED" order.updatedAt = Instant.now() orderRepository.save(order) }
fun getOrderStatus(orderId: String): String = orderRepository.findById(orderId) .map { it.status } .orElse("UNKNOWN")}Handling dead letters: DLTConsumer.kt
Section titled “Handling dead letters: DLTConsumer.kt”A second @KafkaListener on the DLT, in its own consumer group. When the
DeadLetterPublishingRecoverer republishes a failed record it attaches headers —
KafkaHeaders.DLT_ORIGINAL_TOPIC and KafkaHeaders.DLT_EXCEPTION_MESSAGE — which
you read with @Header to record exactly what failed and why. The intent is to log
the failure and persist it to the failed_orders table for manual review.
package com.example.consumer
import com.example.config.KafkaConfigimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.slf4j.LoggerFactoryimport org.springframework.kafka.annotation.KafkaListenerimport org.springframework.kafka.support.KafkaHeadersimport org.springframework.messaging.handler.annotation.Headerimport org.springframework.stereotype.Component
@Componentclass DLTConsumer {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener( topics = [KafkaConfig.ORDERS_DLT_TOPIC], groupId = "dlt-processor" ) fun processDLT( record: ConsumerRecord<String, Any>, @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) originalTopic: String?, @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) errorMessage: String? ) { logger.error( "Dead letter received: originalTopic={}, key={}, error={}", originalTopic, record.key(), errorMessage )
// Store in failed_orders table for manual review }}Wiring: application.yml
Section titled “Wiring: application.yml”No serialization code anywhere — it’s all config. The producer uses Spring Kafka’s
JsonSerializer; the consumer uses JsonDeserializer with
spring.json.trusted.packages whitelisting com.example.* so it’ll deserialize
your event types. acks: all + enable.idempotence: true give you safe,
exactly-once-ish publishing; concurrency: 3 runs three listener threads against
the 6-partition topic.
server: port: 8085
spring: datasource: url: jdbc:postgresql://localhost:5432/kotlin_course username: dev password: dev jpa: hibernate: ddl-auto: validate show-sql: true flyway: enabled: true kafka: bootstrap-servers: localhost:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all retries: 3 properties: enable.idempotence: true consumer: group-id: order-processor key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest properties: spring.json.trusted.packages: "com.example.*" listener: ack-mode: record concurrency: 3The schema is owned by Flyway, not Hibernate (ddl-auto: validate only checks the
entity matches). Two tables: orders for the happy path and failed_orders for
dead letters.
CREATE TABLE IF NOT EXISTS orders ( order_id VARCHAR(36) PRIMARY KEY, user_id BIGINT NOT NULL, amount DOUBLE PRECISION NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'CREATED', created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());
CREATE TABLE IF NOT EXISTS failed_orders ( id BIGSERIAL PRIMARY KEY, order_id VARCHAR(36), original_topic VARCHAR(255), error_message TEXT, payload JSONB, failed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());Run it
Section titled “Run it”-
Start Kafka and Postgres from the shared infra (one-time per session):
Terminal window cd shared-infra && docker compose up -d postgres kafka kafka-ui -
Boot the app (starts on port 8085):
Terminal window ./gradlew bootRun -
Create a valid order — it returns
202with anorderId:Terminal window curl -X POST http://localhost:8085/api/orders \-H "Content-Type: application/json" \-d '{"userId": 1, "amount": 99.99, "items": [{"productId": "prod-1", "quantity": 2, "price": 49.99}]}' -
Poll its status (async, so give it a moment):
Terminal window curl http://localhost:8085/api/orders/{id}/status -
Send an invalid order to trip the retry → DLT path:
Terminal window curl -X POST http://localhost:8085/api/orders \-H "Content-Type: application/json" \-d '{"userId": -1, "amount": -100, "items": []}' -
Open Kafka UI at
http://localhost:8090to watch theordersandorders.DLTtopics fill up.