Skip to content

Event Sourcing Lite

Build a small event-sourced system: every change to a bank account is an immutable event published to Kafka, and the account’s “current state” is a projection rebuilt by replaying those events into a PostgreSQL read model. Nothing mutates a row in place — the event stream is the source of truth, and the projection is just a cache you can throw away and rebuild.

If you’re coming from a CRUD-with-an-ORM background, this is the inversion: instead of UPDATE accounts SET balance = ?, you append a MoneyDeposited event and let a consumer fold it into the read model. The log is the database.

Commands hit a REST API, the command handler validates them against the current projection and emits events to a Kafka topic. A projector consumes that topic, writes each event to an append-only event_log, and updates the account_projections read model. Queries read from the projection, never from the log.

Event-sourcing flow
Rendering diagram…
  1. Define the domain events: AccountCreated, MoneyDeposited, MoneyWithdrawn, AccountClosed.
  2. Implement a command handler that validates each command and publishes an event to Kafka.
  3. Implement a projector that consumes events and updates the account_projections table.
  4. Store every event in an append-only event_log table.
  5. Serve the query endpoint from the projection table, not the event log.
  6. Make the consumer idempotent — replaying the same event must not double the balance (use topic-partition-offset as a stable event ID).

The REST surface:

MethodURLDescription
POST/api/accountsCreate a new account
POST/api/accounts/{id}/depositDeposit money
POST/api/accounts/{id}/withdrawWithdraw money
POST/api/accounts/{id}/closeClose the account
GET/api/accounts/{id}Current account state (from the projection)
GET/api/accounts/{id}/eventsEvent history for an account

A single Spring Boot module. The event definitions, command handler, and projector are the interesting parts; the controller and repository are thin glue.

  • Directoryevent-sourcing/
    • build.gradle.kts Spring Boot + Kafka + JPA + Flyway
    • settings.gradle.kts project name
    • Directorysrc/main/kotlin/com/example/
      • Application.kt Spring Boot entry point
      • config/KafkaConfig.kt topic definition
      • event/AccountEvent.kt the sealed event hierarchy
      • producer/AccountCommandHandler.kt validate + publish
      • consumer/AccountProjector.kt consume + project
      • controller/AccountController.kt REST endpoints
      • model/AccountProjection.kt JPA read-model entity
      • repository/AccountRepository.kt Spring Data repo
    • Directorysrc/main/resources/
      • application.yml port, datasource, Kafka serializers
      • Directorydb/migration/ Flyway schema

The four state changes are modelled as a sealed class hierarchy — Kotlin’s answer to a TypeScript discriminated union or a Go interface with a fixed set of implementers. Because the set is closed, the when in the projector can be checked for exhaustiveness by the compiler. Every event carries an accountId and a timestamp; the rest of the fields are event-specific.

src/main/kotlin/com/example/event/AccountEvent.kt
package com.example.event
import java.time.Instant
sealed class AccountEvent {
abstract val accountId: String
abstract val timestamp: Instant
}
data class AccountCreated(
override val accountId: String,
val ownerId: String,
val ownerName: String,
override val timestamp: Instant = Instant.now()
) : AccountEvent()
data class MoneyDeposited(
override val accountId: String,
val amount: Double,
override val timestamp: Instant = Instant.now()
) : AccountEvent()
data class MoneyWithdrawn(
override val accountId: String,
val amount: Double,
override val timestamp: Instant = Instant.now()
) : AccountEvent()
data class AccountClosed(
override val accountId: String,
val reason: String = "",
override val timestamp: Instant = Instant.now()
) : AccountEvent()

These travel over Kafka as JSON. Spring’s JsonSerializer writes a type header so the matching JsonDeserializer can rebuild the concrete subclass on the consumer side — that’s why application.yml trusts com.example.*:

src/main/resources/application.yml
server:
port: 8086
spring:
datasource:
url: jdbc:postgresql://localhost:5432/kotlin_course
username: dev
password: dev
jpa:
hibernate:
ddl-auto: validate
show-sql: true
flyway:
enabled: true
kafka:
bootstrap-servers: localhost:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: account-projector
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "com.example.*"
listener:
ack-mode: record

The account key is the Kafka message key, so all events for one account land on the same partition and are processed in order — essential when balance changes depend on prior ones. The topic is declared with 6 partitions:

src/main/kotlin/com/example/config/KafkaConfig.kt
@Configuration
class KafkaConfig {
companion object {
const val ACCOUNT_EVENTS_TOPIC = "account-events"
}
@Bean
fun accountEventsTopic(): NewTopic = TopicBuilder
.name(ACCOUNT_EVENTS_TOPIC)
.partitions(6)
.replicas(1)
.build()
}

The command handler is the only place that validates business rules. It reads the current state from the projection (via the repository), checks the invariants, and — only if they hold — publishes an event. Note what it does not do: it never writes the balance itself. The new balance only exists once the projector has consumed the event.

src/main/kotlin/com/example/producer/AccountCommandHandler.kt
package com.example.producer
import com.example.config.KafkaConfig
import com.example.event.*
import com.example.repository.AccountRepository
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class AccountCommandHandler(
private val kafkaTemplate: KafkaTemplate<String, Any>,
private val accounts: AccountRepository
) {
fun createAccount(ownerId: String, ownerName: String): String {
val accountId = UUID.randomUUID().toString()
val event = AccountCreated(accountId, ownerId, ownerName)
kafkaTemplate.send(KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId, event)
return accountId
}
fun deposit(accountId: String, amount: Double) {
require(amount > 0) { "Amount must be positive" }
val acct = accounts.findById(accountId)
.orElseThrow { IllegalArgumentException("Account not found") }
require(acct.status != "CLOSED") { "Account is closed" }
kafkaTemplate.send(
KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId,
MoneyDeposited(accountId, amount)
)
}
fun withdraw(accountId: String, amount: Double) {
require(amount > 0) { "Amount must be positive" }
val acct = accounts.findById(accountId)
.orElseThrow { IllegalArgumentException("Account not found") }
require(acct.status != "CLOSED") { "Account is closed" }
require(acct.balance >= amount) { "Insufficient funds" }
kafkaTemplate.send(
KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId,
MoneyWithdrawn(accountId, amount)
)
}
fun closeAccount(accountId: String, reason: String) {
val acct = accounts.findById(accountId)
.orElseThrow { IllegalArgumentException("Account not found") }
require(acct.status != "CLOSED") { "Account already closed" }
kafkaTemplate.send(
KafkaConfig.ACCOUNT_EVENTS_TOPIC, accountId,
AccountClosed(accountId, reason)
)
}
}

require(condition) { "message" } is Kotlin’s argument-precondition helper: it throws IllegalArgumentException with that message when the condition is false — the idiomatic guard clause, like an early throw new Error(...) in TS or if !ok { return err } in Go.

The read model is a plain JPA entity. Its fields are var (mutable) because the projector folds events into it; the events themselves are immutable, the projection is the running total.

src/main/kotlin/com/example/model/AccountProjection.kt
@Entity
@Table(name = "account_projections")
data class AccountProjection(
@Id
val accountId: String = "",
val ownerId: String = "",
val ownerName: String = "",
var balance: Double = 0.0,
var status: String = "ACTIVE",
var lastUpdated: Instant = Instant.now()
)
src/main/kotlin/com/example/repository/AccountRepository.kt
@Repository
interface AccountRepository : JpaRepository<AccountProjection, String>

This is the heart of the system. The @KafkaListener receives each AccountEvent, deduplicates it by its topic-partition-offset, appends it to event_log, then folds it into the projection. The when (event) is exhaustive over the sealed hierarchy, so adding a new event type later is a compile error until you handle it.

src/main/kotlin/com/example/consumer/AccountProjector.kt
package com.example.consumer
import com.example.config.KafkaConfig
import com.example.event.*
import com.example.model.AccountProjection
import com.example.repository.AccountRepository
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
@Component
class AccountProjector(
private val accounts: AccountRepository,
private val eventLog: EventLogRepository
) {
@KafkaListener(topics = [KafkaConfig.ACCOUNT_EVENTS_TOPIC], groupId = "account-projector")
@Transactional
fun handleEvent(
@Payload event: AccountEvent,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long
) {
// Stable, replay-proof event id: topic-partition-offset
val eventId = "$topic-$partition-$offset"
if (eventLog.existsByEventId(eventId)) return // idempotent: already applied
eventLog.save(EventLogEntry(eventId, event)) // append-only log
when (event) { // fold into the read model
is AccountCreated -> accounts.save(
AccountProjection(event.accountId, event.ownerId, event.ownerName)
)
is MoneyDeposited -> update(event.accountId) { it.balance += event.amount }
is MoneyWithdrawn -> update(event.accountId) { it.balance -= event.amount }
is AccountClosed -> update(event.accountId) { it.status = "CLOSED" }
}
}
private fun update(accountId: String, change: (AccountProjection) -> Unit) {
accounts.findById(accountId).ifPresent { acct ->
change(acct)
acct.lastUpdated = java.time.Instant.now()
accounts.save(acct)
}
}
}

The idempotency trick is the key takeaway. Kafka guarantees at-least-once delivery, so the same event can arrive twice (a rebalance, a redeployment with auto-offset-reset: earliest, a manual replay). The (topic, partition, offset) triple uniquely identifies a delivered record, so storing it as a UNIQUE column lets the consumer skip anything it has already applied — without that guard, a replay would deposit the money twice.

Flyway creates two tables. account_projections is the disposable read model; event_log is the durable source of truth — append-only, with the event_id UNIQUE constraint backing the idempotency check.

src/main/resources/db/migration/V1__create_account_projections.sql
CREATE TABLE IF NOT EXISTS account_projections (
account_id VARCHAR(36) PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
owner_name VARCHAR(255) NOT NULL,
balance DOUBLE PRECISION NOT NULL DEFAULT 0.0,
status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE',
last_updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
src/main/resources/db/migration/V2__create_event_log.sql
CREATE TABLE IF NOT EXISTS event_log (
id BIGSERIAL PRIMARY KEY,
event_id VARCHAR(100) UNIQUE NOT NULL,
account_id VARCHAR(36) NOT NULL,
event_type VARCHAR(50) NOT NULL,
payload JSONB NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_event_log_account_id ON event_log(account_id);
CREATE INDEX idx_event_log_timestamp ON event_log(timestamp);

Thin glue: parse the request, call the command handler, return 202 Accepted. Queries read straight from the repository (the projection), never from the log.

src/main/kotlin/com/example/controller/AccountController.kt
@RestController
@RequestMapping("/api/accounts")
class AccountController(
private val commandHandler: AccountCommandHandler,
private val accountRepository: AccountRepository
) {
data class CreateAccountRequest(val ownerId: String, val ownerName: String)
data class DepositRequest(val amount: Double)
data class WithdrawRequest(val amount: Double)
data class CloseRequest(val reason: String = "")
@PostMapping
fun createAccount(@RequestBody request: CreateAccountRequest): ResponseEntity<Map<String, String>> {
val accountId = commandHandler.createAccount(request.ownerId, request.ownerName)
return ResponseEntity.status(HttpStatus.ACCEPTED).body(
mapOf("accountId" to accountId, "status" to "PROCESSING")
)
}
@PostMapping("/{id}/deposit")
fun deposit(@PathVariable id: String, @RequestBody request: DepositRequest): ResponseEntity<Map<String, String>> {
commandHandler.deposit(id, request.amount)
return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING"))
}
@PostMapping("/{id}/withdraw")
fun withdraw(@PathVariable id: String, @RequestBody request: WithdrawRequest): ResponseEntity<Map<String, String>> {
commandHandler.withdraw(id, request.amount)
return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING"))
}
@PostMapping("/{id}/close")
fun close(@PathVariable id: String, @RequestBody request: CloseRequest): ResponseEntity<Map<String, String>> {
commandHandler.closeAccount(id, request.reason)
return ResponseEntity.accepted().body(mapOf("status" to "PROCESSING"))
}
@GetMapping("/{id}")
fun getAccount(@PathVariable id: String): ResponseEntity<Any> {
val account = accountRepository.findById(id)
return if (account.isPresent) ResponseEntity.ok(account.get())
else ResponseEntity.notFound().build()
}
@GetMapping("/{id}/events")
fun getEvents(@PathVariable id: String): ResponseEntity<Any> =
ResponseEntity.ok(eventLog.findByAccountIdOrderByTimestamp(id))
}

This exercise needs Kafka and PostgreSQL running. Bring them up from the shared infra stack first (see the Event-Driven with Kafka module for the full setup).

  1. Start the infrastructure:

    Terminal window
    cd shared-infra
    docker compose up -d postgres kafka kafka-ui
  2. Run the app (starts on port 8086):

    Terminal window
    ./gradlew bootRun
  3. Create an account — the response gives you the accountId:

    Terminal window
    curl -X POST http://localhost:8086/api/accounts \
    -H "Content-Type: application/json" \
    -d '{"ownerId": "user-1", "ownerName": "Alice"}'
  4. Deposit and withdraw (substitute the real id):

    Terminal window
    curl -X POST http://localhost:8086/api/accounts/{id}/deposit \
    -H "Content-Type: application/json" -d '{"amount": 100.00}'
    curl -X POST http://localhost:8086/api/accounts/{id}/withdraw \
    -H "Content-Type: application/json" -d '{"amount": 25.50}'
  5. Read the current state (from the projection) and the event history (from the log):

    Terminal window
    curl http://localhost:8086/api/accounts/{id}
    curl http://localhost:8086/api/accounts/{id}/events