Skip to content

Order Processing Pipeline

Build an async order pipeline on Spring Kafka: a REST endpoint publishes an OrderEvent to an orders topic, a consumer processes it (validate → persist → update status), and anything that fails is retried with exponential backoff and ultimately parked on a Dead Letter Topic (orders.DLT) for inspection. It’s the canonical “accept fast, process later, never silently drop a message” backend pattern.

If you’ve built a queue worker in TS (BullMQ) or Go (a channel + worker pool), this is the same shape — but Kafka gives you the durable log, consumer groups for horizontal scale, and Spring Kafka wires the retry/DLT machinery for you.

  1. An OrderProducer that publishes OrderEvent messages to the orders topic.
  2. An OrderConsumer (@KafkaListener) that validates the order, saves it to Postgres, and moves its status CREATED → PROCESSING → COMPLETED.
  3. Exponential-backoff retry (1s initial, 2x multiplier, capped at 15s elapsed).
  4. A Dead Letter Topic (orders.DLT) that catches messages after retries are exhausted.
  5. A DLTConsumer that logs the failure and stores it in a failed_orders table.
  6. A GET /api/orders/{id}/status endpoint to poll processing status.

The producer and consumer never talk directly — Kafka’s log sits between them, so the POST returns immediately (202 Accepted) while processing happens asynchronously. Failures get retried, then diverted to the DLT.

Order pipeline topology
Rendering diagram…

A standard Spring Boot layout — controller, producer, consumer, config, plus JPA model and Flyway migration:

  • Directoryorder-pipeline/
    • build.gradle.kts Spring Boot + Kafka + JPA deps
    • settings.gradle.kts project name
    • Directorysrc/main/kotlin/com/example/
      • Application.kt Spring Boot entrypoint
      • Directoryconfig/
        • KafkaConfig.kt topics, retry backoff, DLT recoverer
      • Directorycontroller/
        • OrderController.kt REST API
      • Directoryproducer/
        • OrderProducer.kt publishes to the orders topic
      • Directoryconsumer/
        • OrderConsumer.kt processes orders
        • DLTConsumer.kt handles dead letters
      • Directoryevent/
        • OrderEvent.kt the message payload
      • Directorymodel/
        • Order.kt JPA entity
      • Directoryservice/
        • OrderService.kt validation + persistence
      • Directoryrepository/
        • OrderRepository.kt Spring Data JPA
    • Directorysrc/main/resources/
      • application.yml Kafka + datasource config
      • db/migration/V1__create_orders.sql orders + failed_orders tables

The payload is a plain data class. Every field has a default — that matters for Kafka’s JsonDeserializer, which needs a no-arg constructor to reconstruct the object on the consumer side. OrderItem is nested and serialized inline.

src/main/kotlin/com/example/event/OrderEvent.kt
package com.example.event
import java.time.Instant
data class OrderEvent(
val orderId: String = "",
val userId: Long = 0,
val amount: Double = 0.0,
val status: String = "CREATED",
val items: List<OrderItem> = emptyList(),
val timestamp: Instant = Instant.now()
)
data class OrderItem(
val productId: String = "",
val quantity: Int = 0,
val price: Double = 0.0
)

This is where the resilience lives. Two NewTopic beans declare the topics (Spring creates them on startup via KafkaAdmin). The orders topic gets 6 partitions for parallelism; the DLT needs only 1.

The DefaultErrorHandler is the heart of it: when a @KafkaListener throws, this handler retries on the configured ExponentialBackOff (1s, then 2s, then 4s… capped at 15s total elapsed), and once retries are exhausted the DeadLetterPublishingRecoverer republishes the failed record to <topic>.DLT — here, orders.DLT.

src/main/kotlin/com/example/config/KafkaConfig.kt
package com.example.config
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.ExponentialBackOff
@Configuration
class KafkaConfig {
companion object {
const val ORDERS_TOPIC = "orders"
const val ORDERS_DLT_TOPIC = "orders.DLT"
}
@Bean
fun ordersTopic(): NewTopic = TopicBuilder
.name(ORDERS_TOPIC)
.partitions(6)
.replicas(1)
.build()
@Bean
fun deadLetterTopic(): NewTopic = TopicBuilder
.name(ORDERS_DLT_TOPIC)
.partitions(1)
.replicas(1)
.build()
@Bean
fun kafkaErrorHandler(kafkaTemplate: KafkaTemplate<String, Any>): DefaultErrorHandler {
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate)
val backoff = ExponentialBackOff(1000L, 2.0).apply {
maxElapsedTime = 15000L
}
return DefaultErrorHandler(recoverer, backoff)
}
}

A thin wrapper over KafkaTemplate<String, Any>. The send takes a key (event.orderId) and a value (the event). The key controls partitioning — all events for the same orderId land on the same partition and are therefore processed in order.

src/main/kotlin/com/example/producer/OrderProducer.kt
package com.example.producer
import com.example.config.KafkaConfig
import com.example.event.OrderEvent
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
@Service
class OrderProducer(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
private val logger = LoggerFactory.getLogger(javaClass)
fun sendOrder(event: OrderEvent) {
logger.info("Sending order event: orderId={}, amount={}", event.orderId, event.amount)
kafkaTemplate.send(KafkaConfig.ORDERS_TOPIC, event.orderId, event)
}
}

The POST handler generates an orderId, builds the event, fires it at the producer, and returns 202 Accepted immediately — it does not wait for processing. That’s the async contract: the client gets an id and polls /{id}/status later. The nested CreateOrderRequest is the request body shape (Jackson + the Kotlin module deserialize it).

src/main/kotlin/com/example/controller/OrderController.kt
package com.example.controller
import com.example.event.OrderEvent
import com.example.event.OrderItem
import com.example.producer.OrderProducer
import com.example.service.OrderService
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.util.UUID
@RestController
@RequestMapping("/api/orders")
class OrderController(
private val orderProducer: OrderProducer,
private val orderService: OrderService
) {
data class CreateOrderRequest(
val userId: Long,
val amount: Double,
val items: List<OrderItem> = emptyList()
)
@PostMapping
fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<Map<String, String>> {
val orderId = UUID.randomUUID().toString()
val event = OrderEvent(
orderId = orderId,
userId = request.userId,
amount = request.amount,
items = request.items,
status = "CREATED"
)
orderProducer.sendOrder(event)
return ResponseEntity.status(HttpStatus.ACCEPTED).body(
mapOf("orderId" to orderId, "status" to "ACCEPTED")
)
}
@GetMapping("/{id}/status")
fun getOrderStatus(@PathVariable id: String): ResponseEntity<Map<String, String>> {
val status = orderService.getOrderStatus(id)
return ResponseEntity.ok(mapOf("orderId" to id, "status" to status))
}
}

The @KafkaListener is the worker. Spring deserializes the JSON value straight into an OrderEvent parameter, so you write plain business logic. The key move is that throwing an exception is your error signal — Spring’s DefaultErrorHandler catches it, retries per the backoff, and routes to the DLT when retries run out. So an invalid order just throws IllegalArgumentException and the framework does the rest.

src/main/kotlin/com/example/consumer/OrderConsumer.kt
package com.example.consumer
import com.example.config.KafkaConfig
import com.example.event.OrderEvent
import com.example.service.OrderService
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class OrderConsumer(
private val orderService: OrderService
) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(
topics = [KafkaConfig.ORDERS_TOPIC],
groupId = "order-processor"
)
fun processOrder(event: OrderEvent) {
logger.info("Processing order: orderId={}, userId={}, amount={}", event.orderId, event.userId, event.amount)
// Validate — an invalid order throws, so it goes to retry then the DLT
require(event.userId > 0 && event.amount > 0 && event.items.isNotEmpty()) {
"Invalid order: $event"
}
// Persist and advance status CREATED -> PROCESSING -> COMPLETED
orderService.processOrder(event)
logger.info("Order processed successfully: orderId={}", event.orderId)
}
}

Persisting: Order.kt, OrderRepository.kt, OrderService.kt

Section titled “Persisting: Order.kt, OrderRepository.kt, OrderService.kt”

The order is stored via Spring Data JPA. Order is the @Entity (note status and updatedAt are var — they change as the order advances). OrderRepository is a one-line interface; Spring generates the implementation. OrderService validates, saves, and advances the status, and answers status queries for the controller.

src/main/kotlin/com/example/model/Order.kt
package com.example.model
import jakarta.persistence.*
import java.time.Instant
@Entity
@Table(name = "orders")
data class Order(
@Id
val orderId: String = "",
val userId: Long = 0,
val amount: Double = 0.0,
var status: String = "CREATED",
val createdAt: Instant = Instant.now(),
var updatedAt: Instant = Instant.now()
)
src/main/kotlin/com/example/repository/OrderRepository.kt
package com.example.repository
import com.example.model.Order
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.stereotype.Repository
@Repository
interface OrderRepository : JpaRepository<Order, String>

The service ties it together: save the order as PROCESSING, do the work, then flip it to COMPLETED. The status query backs the polling endpoint.

src/main/kotlin/com/example/service/OrderService.kt
package com.example.service
import com.example.event.OrderEvent
import com.example.model.Order
import com.example.repository.OrderRepository
import org.springframework.stereotype.Service
import java.time.Instant
@Service
class OrderService(
private val orderRepository: OrderRepository
) {
fun processOrder(event: OrderEvent) {
val order = Order(
orderId = event.orderId,
userId = event.userId,
amount = event.amount,
status = "PROCESSING"
)
orderRepository.save(order)
// ... fulfilment work would go here ...
order.status = "COMPLETED"
order.updatedAt = Instant.now()
orderRepository.save(order)
}
fun getOrderStatus(orderId: String): String =
orderRepository.findById(orderId)
.map { it.status }
.orElse("UNKNOWN")
}

A second @KafkaListener on the DLT, in its own consumer group. When the DeadLetterPublishingRecoverer republishes a failed record it attaches headers — KafkaHeaders.DLT_ORIGINAL_TOPIC and KafkaHeaders.DLT_EXCEPTION_MESSAGE — which you read with @Header to record exactly what failed and why. The intent is to log the failure and persist it to the failed_orders table for manual review.

src/main/kotlin/com/example/consumer/DLTConsumer.kt
package com.example.consumer
import com.example.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.stereotype.Component
@Component
class DLTConsumer {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(
topics = [KafkaConfig.ORDERS_DLT_TOPIC],
groupId = "dlt-processor"
)
fun processDLT(
record: ConsumerRecord<String, Any>,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) originalTopic: String?,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) errorMessage: String?
) {
logger.error(
"Dead letter received: originalTopic={}, key={}, error={}",
originalTopic, record.key(), errorMessage
)
// Store in failed_orders table for manual review
}
}

No serialization code anywhere — it’s all config. The producer uses Spring Kafka’s JsonSerializer; the consumer uses JsonDeserializer with spring.json.trusted.packages whitelisting com.example.* so it’ll deserialize your event types. acks: all + enable.idempotence: true give you safe, exactly-once-ish publishing; concurrency: 3 runs three listener threads against the 6-partition topic.

src/main/resources/application.yml
server:
port: 8085
spring:
datasource:
url: jdbc:postgresql://localhost:5432/kotlin_course
username: dev
password: dev
jpa:
hibernate:
ddl-auto: validate
show-sql: true
flyway:
enabled: true
kafka:
bootstrap-servers: localhost:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
consumer:
group-id: order-processor
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "com.example.*"
listener:
ack-mode: record
concurrency: 3

The schema is owned by Flyway, not Hibernate (ddl-auto: validate only checks the entity matches). Two tables: orders for the happy path and failed_orders for dead letters.

src/main/resources/db/migration/V1__create_orders.sql
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(36) PRIMARY KEY,
user_id BIGINT NOT NULL,
amount DOUBLE PRECISION NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'CREATED',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS failed_orders (
id BIGSERIAL PRIMARY KEY,
order_id VARCHAR(36),
original_topic VARCHAR(255),
error_message TEXT,
payload JSONB,
failed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
  1. Start Kafka and Postgres from the shared infra (one-time per session):

    Terminal window
    cd shared-infra && docker compose up -d postgres kafka kafka-ui
  2. Boot the app (starts on port 8085):

    Terminal window
    ./gradlew bootRun
  3. Create a valid order — it returns 202 with an orderId:

    Terminal window
    curl -X POST http://localhost:8085/api/orders \
    -H "Content-Type: application/json" \
    -d '{"userId": 1, "amount": 99.99, "items": [{"productId": "prod-1", "quantity": 2, "price": 49.99}]}'
  4. Poll its status (async, so give it a moment):

    Terminal window
    curl http://localhost:8085/api/orders/{id}/status
  5. Send an invalid order to trip the retry → DLT path:

    Terminal window
    curl -X POST http://localhost:8085/api/orders \
    -H "Content-Type: application/json" \
    -d '{"userId": -1, "amount": -100, "items": []}'
  6. Open Kafka UI at http://localhost:8090 to watch the orders and orders.DLT topics fill up.