Event Sourcing Lite
Build a small event-sourced system: every change to a bank account is an immutable event published to Kafka, and the account’s “current state” is a projection rebuilt by replaying those events into a PostgreSQL read model. Nothing mutates a row in place — the event stream is the source of truth, and the projection is just a cache you can throw away and rebuild.
If you’re coming from a CRUD-with-an-ORM background, this is the inversion: instead
of UPDATE accounts SET balance = ?, you append a MoneyDeposited event and let a
consumer fold it into the read model. The log is the database.
How it fits together
Section titled “How it fits together”Commands hit a REST API, the command handler validates them against the current
projection and emits events to a Kafka topic. A projector consumes that topic,
writes each event to an append-only event_log, and updates the
account_projections read model. Queries read from the projection, never from the
log.
flowchart LR
C["REST client"] -->|"POST /api/accounts/{id}/deposit"| H["Command Handler"]
H -->|"validate vs projection"| H
H -->|"AccountEvent"| K["Kafka topic<br/>account-events"]
K --> P["Account Projector<br/>(consumer)"]
P -->|"append-only"| L["event_log table"]
P -->|"fold into read model"| R["account_projections table"]
Q["GET /api/accounts/{id}"] --> R
Requirements
Section titled “Requirements”- Define the domain events:
AccountCreated,MoneyDeposited,MoneyWithdrawn,AccountClosed. - Implement a command handler that validates each command and publishes an event to Kafka.
- Implement a projector that consumes events and updates the
account_projectionstable. - Store every event in an append-only
event_logtable. - Serve the query endpoint from the projection table, not the event log.
- Make the consumer idempotent — replaying the same event must not double the balance (use topic-partition-offset as a stable event ID).
The REST surface:
| Method | URL | Description |
|---|---|---|
| POST | /api/accounts | Create a new account |
| POST | /api/accounts/{id}/deposit | Deposit money |
| POST | /api/accounts/{id}/withdraw | Withdraw money |
| POST | /api/accounts/{id}/close | Close the account |
| GET | /api/accounts/{id} | Current account state (from the projection) |
| GET | /api/accounts/{id}/events | Event history for an account |
The worked solution
Section titled “The worked solution”A single Spring Boot module. The event definitions, command handler, and projector are the interesting parts; the controller and repository are thin glue.
Directoryevent-sourcing/
- build.gradle.kts Spring Boot + Kafka + JPA + Flyway
- settings.gradle.kts project name
Directorysrc/main/kotlin/com/example/
- Application.kt Spring Boot entry point
- config/KafkaConfig.kt topic definition
- event/AccountEvent.kt the sealed event hierarchy
- producer/AccountCommandHandler.kt validate + publish
- consumer/AccountProjector.kt consume + project
- controller/AccountController.kt REST endpoints
- model/AccountProjection.kt JPA read-model entity
- repository/AccountRepository.kt Spring Data repo
Directorysrc/main/resources/
- application.yml port, datasource, Kafka serializers
Directorydb/migration/ Flyway schema
- …
The events
Section titled “The events”The four state changes are modelled as a sealed class hierarchy — Kotlin’s answer
to a TypeScript discriminated union or a Go interface with a fixed set of
implementers. Because the set is closed, the when in the projector can be checked
for exhaustiveness by the compiler. Every event carries an accountId and a
timestamp; the rest of the fields are event-specific.
package com.example.event
import java.time.Instant
sealed class AccountEvent { abstract val accountId: String abstract val timestamp: Instant}
data class AccountCreated( override val accountId: String, val ownerId: String, val ownerName: String, override val timestamp: Instant = Instant.now()) : AccountEvent()
data class MoneyDeposited( override val accountId: String, val amount: Double, override val timestamp: Instant = Instant.now()) : AccountEvent()
data class MoneyWithdrawn( override val accountId: String, val amount: Double, override val timestamp: Instant = Instant.now()) : AccountEvent()
data class AccountClosed( override val accountId: String, val reason: String = "", override val timestamp: Instant = Instant.now()) : AccountEvent()These travel over Kafka as JSON. Spring’s JsonSerializer writes a type header so
the matching JsonDeserializer can rebuild the concrete subclass on the consumer
side — that’s why application.yml trusts com.example.*:
server: port: 8086
spring: datasource: url: jdbc:postgresql://localhost:5432/kotlin_course username: dev password: dev jpa: hibernate: ddl-auto: validate show-sql: true flyway: enabled: true kafka: bootstrap-servers: localhost:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: account-projector key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest properties: spring.json.trusted.packages: "com.example.*" listener: ack-mode: recordThe account key is the Kafka message key, so all events for one account land on the same partition and are processed in order — essential when balance changes depend on prior ones. The topic is declared with 6 partitions:
@Configurationclass KafkaConfig {
companion object { const val ACCOUNT_EVENTS_TOPIC = "account-events" }
@Bean fun accountEventsTopic(): NewTopic = TopicBuilder .name(ACCOUNT_EVENTS_TOPIC) .partitions(6) .replicas(1) .build()}The command handler (write side)
Section titled “The command handler (write side)”The command handler is the only place that validates business rules. It reads the current state from the projection (via the repository), checks the invariants, and — only if they hold — publishes an event. Note what it does not do: it never writes the balance itself. The new balance only exists once the projector has consumed the event.
package com.example.producer
import com.example.config.KafkaConfigimport com.example.event.*import com.example.repository.AccountRepositoryimport org.springframework.kafka.core.KafkaTemplateimport org.springframework.stereotype.Serviceimport java.util.UUID
@Serviceclass AccountCommandHandler( private val kafkaTemplate: KafkaTemplate<String, Any>, private val accounts: AccountRepository) {
fun createAccount(ownerId: String, ownerName: String): String { val accountId = UUID.randomUUID().toString() val event = AccountCreated(accountId, ownerId, ownerName) kafkaTemplate.send(KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId, event) return accountId }
fun deposit(accountId: String, amount: Double) { require(amount > 0) { "Amount must be positive" } val acct = accounts.findById(accountId) .orElseThrow { IllegalArgumentException("Account not found") } require(acct.status != "CLOSED") { "Account is closed" } kafkaTemplate.send( KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId, MoneyDeposited(accountId, amount) ) }
fun withdraw(accountId: String, amount: Double) { require(amount > 0) { "Amount must be positive" } val acct = accounts.findById(accountId) .orElseThrow { IllegalArgumentException("Account not found") } require(acct.status != "CLOSED") { "Account is closed" } require(acct.balance >= amount) { "Insufficient funds" } kafkaTemplate.send( KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId, MoneyWithdrawn(accountId, amount) ) }
fun closeAccount(accountId: String, reason: String) { val acct = accounts.findById(accountId) .orElseThrow { IllegalArgumentException("Account not found") } require(acct.status != "CLOSED") { "Account already closed" } kafkaTemplate.send( KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId, AccountClosed(accountId, reason) ) }}require(condition) { "message" } is Kotlin’s argument-precondition helper: it
throws IllegalArgumentException with that message when the condition is false —
the idiomatic guard clause, like an early throw new Error(...) in TS or
if !ok { return err } in Go.
The projection (read model)
Section titled “The projection (read model)”The read model is a plain JPA entity. Its fields are var (mutable) because the
projector folds events into it; the events themselves are immutable, the projection
is the running total.
@Entity@Table(name = "account_projections")data class AccountProjection( @Id val accountId: String = "", val ownerId: String = "", val ownerName: String = "", var balance: Double = 0.0, var status: String = "ACTIVE", var lastUpdated: Instant = Instant.now())@Repositoryinterface AccountRepository : JpaRepository<AccountProjection, String>The projector (consume + project)
Section titled “The projector (consume + project)”This is the heart of the system. The @KafkaListener receives each AccountEvent,
deduplicates it by its topic-partition-offset, appends it to event_log, then
folds it into the projection. The when (event) is exhaustive over the sealed
hierarchy, so adding a new event type later is a compile error until you handle it.
package com.example.consumer
import com.example.config.KafkaConfigimport com.example.event.*import com.example.model.AccountProjectionimport com.example.repository.AccountRepositoryimport org.springframework.kafka.annotation.KafkaListenerimport org.springframework.kafka.support.KafkaHeadersimport org.springframework.messaging.handler.annotation.Headerimport org.springframework.messaging.handler.annotation.Payloadimport org.springframework.stereotype.Componentimport org.springframework.transaction.annotation.Transactional
@Componentclass AccountProjector( private val accounts: AccountRepository, private val eventLog: EventLogRepository) {
@KafkaListener(topics = [KafkaConfig.ACCOUNT_EVENTS_TOPIC], groupId = "account-projector") @Transactional fun handleEvent( @Payload event: AccountEvent, @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String, @Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int, @Header(KafkaHeaders.OFFSET) offset: Long ) { // Stable, replay-proof event id: topic-partition-offset val eventId = "$topic-$partition-$offset" if (eventLog.existsByEventId(eventId)) return // idempotent: already applied
eventLog.save(EventLogEntry(eventId, event)) // append-only log
when (event) { // fold into the read model is AccountCreated -> accounts.save( AccountProjection(event.accountId, event.ownerId, event.ownerName) ) is MoneyDeposited -> update(event.accountId) { it.balance += event.amount } is MoneyWithdrawn -> update(event.accountId) { it.balance -= event.amount } is AccountClosed -> update(event.accountId) { it.status = "CLOSED" } } }
private fun update(accountId: String, change: (AccountProjection) -> Unit) { accounts.findById(accountId).ifPresent { acct -> change(acct) acct.lastUpdated = java.time.Instant.now() accounts.save(acct) } }}The idempotency trick is the key takeaway. Kafka guarantees at-least-once
delivery, so the same event can arrive twice (a rebalance, a redeployment with
auto-offset-reset: earliest, a manual replay). The (topic, partition, offset)
triple uniquely identifies a delivered record, so storing it as a UNIQUE column
lets the consumer skip anything it has already applied — without that guard, a
replay would deposit the money twice.
The append-only log
Section titled “The append-only log”Flyway creates two tables. account_projections is the disposable read model;
event_log is the durable source of truth — append-only, with the event_id
UNIQUE constraint backing the idempotency check.
CREATE TABLE IF NOT EXISTS account_projections ( account_id VARCHAR(36) PRIMARY KEY, owner_id VARCHAR(255) NOT NULL, owner_name VARCHAR(255) NOT NULL, balance DOUBLE PRECISION NOT NULL DEFAULT 0.0, status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE', last_updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());CREATE TABLE IF NOT EXISTS event_log ( id BIGSERIAL PRIMARY KEY, event_id VARCHAR(100) UNIQUE NOT NULL, account_id VARCHAR(36) NOT NULL, event_type VARCHAR(50) NOT NULL, payload JSONB NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());
CREATE INDEX idx_event_log_account_id ON event_log(account_id);CREATE INDEX idx_event_log_timestamp ON event_log(timestamp);The controller
Section titled “The controller”Thin glue: parse the request, call the command handler, return 202 Accepted.
Queries read straight from the repository (the projection), never from the log.
@RestController@RequestMapping("/api/accounts")class AccountController( private val commandHandler: AccountCommandHandler, private val accountRepository: AccountRepository) {
data class CreateAccountRequest(val ownerId: String, val ownerName: String) data class DepositRequest(val amount: Double) data class WithdrawRequest(val amount: Double) data class CloseRequest(val reason: String = "")
@PostMapping fun createAccount(@RequestBody request: CreateAccountRequest): ResponseEntity<Map<String, String>> { val accountId = commandHandler.createAccount(request.ownerId, request.ownerName) return ResponseEntity.status(HttpStatus.ACCEPTED).body( mapOf("accountId" to accountId, "status" to "PROCESSING") ) }
@PostMapping("/{id}/deposit") fun deposit(@PathVariable id: String, @RequestBody request: DepositRequest): ResponseEntity<Map<String, String>> { commandHandler.deposit(id, request.amount) return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING")) }
@PostMapping("/{id}/withdraw") fun withdraw(@PathVariable id: String, @RequestBody request: WithdrawRequest): ResponseEntity<Map<String, String>> { commandHandler.withdraw(id, request.amount) return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING")) }
@PostMapping("/{id}/close") fun close(@PathVariable id: String, @RequestBody request: CloseRequest): ResponseEntity<Map<String, String>> { commandHandler.closeAccount(id, request.reason) return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING")) }
@GetMapping("/{id}") fun getAccount(@PathVariable id: String): ResponseEntity<Any> { val account = accountRepository.findById(id) return if (account.isPresent) ResponseEntity.ok(account.get()) else ResponseEntity.notFound().build() }
@GetMapping("/{id}/events") fun getEvents(@PathVariable id: String): ResponseEntity<Any> = ResponseEntity.ok(eventLog.findByAccountIdOrderByTimestamp(id))}Run and test
Section titled “Run and test”This exercise needs Kafka and PostgreSQL running. Bring them up from the shared infra stack first (see the Event-Driven with Kafka module for the full setup).
-
Start the infrastructure:
Terminal window cd shared-infradocker compose up -d postgres kafka kafka-ui -
Run the app (starts on port 8086):
Terminal window ./gradlew bootRun -
Create an account — the response gives you the
accountId:Terminal window curl -X POST http://localhost:8086/api/accounts \-H "Content-Type: application/json" \-d '{"ownerId": "user-1", "ownerName": "Alice"}' -
Deposit and withdraw (substitute the real
id):Terminal window curl -X POST http://localhost:8086/api/accounts/{id}/deposit \-H "Content-Type: application/json" -d '{"amount": 100.00}'curl -X POST http://localhost:8086/api/accounts/{id}/withdraw \-H "Content-Type: application/json" -d '{"amount": 25.50}' -
Read the current state (from the projection) and the event history (from the log):
Terminal window curl http://localhost:8086/api/accounts/{id}curl http://localhost:8086/api/accounts/{id}/events