Capstone: Notification Service
This is where everything you’ve learned comes together. We build a Notification Service: a real microservice that accepts notification requests over a REST API or a Kafka topic, persists them in PostgreSQL, caches and rate-limits with Redis, authenticates with JWT, exposes OpenAPI docs, emits structured logs and Prometheus metrics, ships a full test suite with Testcontainers, and deploys via Docker Compose with a GitHub Actions pipeline.
This page is a guided tour, not a line-by-line dump. We show the instructive core of each layer; the full, runnable code is in the practice project linked at the bottom.
Architecture overview
Section titled “Architecture overview”A client (or an upstream service emitting Kafka events) hits the service. Routes pass through logging, metrics, rate-limit, and JWT middleware into the service layer, which fans out to three backing stores: PostgreSQL for durable state, Redis for caching and rate limiting, and Kafka for asynchronous delivery.
flowchart TB
C["Clients (REST API)"] --> GW["API Gateway / LB"]
GW --> R
subgraph SVC["Notification Service"]
R["Routes (Ktor)"] --> MW["Middleware: logging, metrics, rate limit"]
MW --> SEC["Security: JWT auth, RBAC"]
SEC --> SL["Service layer<br/>NotificationService<br/>UserPreferenceService"]
SL --> REPO["DB Repository"]
SL --> CACHE["Redis cache & rate limit"]
SL --> MSG["Kafka producer / consumer"]
end
REPO --> PG[("PostgreSQL")]
CACHE --> RD[("Redis")]
MSG --> KB[("Kafka broker")]
Module reference map
Section titled “Module reference map”Every part of this capstone ties back to a specific course module:
| Component | Module | What you learned |
|---|---|---|
| Kotlin language, data classes, sealed interfaces | 02 | Type system, null safety |
| Collections, sequences, scope functions | 03 | Data transformation |
| Generics, OOP patterns | 04 | Repository pattern, DI |
| Coroutines, structured concurrency | 05 | Async operations |
| Flow, reactive streams | 06 | Event processing |
| Gradle build system | 07 | Project setup, dependencies |
| Spring Boot / Ktor framework | 08 / 09 | REST API framework |
| PostgreSQL + Flyway | 10 | Database access, migrations |
| Redis caching | 11 | Cache-aside, rate limiting |
| Kafka events | 12 | Event-driven architecture |
| Testing + Testcontainers | 13 | Unit + integration tests |
| JWT auth, RBAC | 14 | Security middleware |
| OpenAPI, serialization | 15 | API documentation |
| Logging, metrics, health checks | 16 | Structured logging, Prometheus |
| Advanced patterns (sealed, DSL) | 17 | Domain modeling |
| Docker, deployment | 18 | Containerization, CI/CD |
Project setup
Section titled “Project setup”The build is a single Ktor application module with a Gradle version catalog. The
catalog (in settings.gradle.kts) pins Ktor, Exposed, Koin, Flyway, and
Testcontainers; build.gradle.kts wires the kotlin("jvm") and serialization
plugins and references the catalog aliases. Here’s the shape of the build script —
the catalog itself is long, so see the practice project for the full list.
plugins { kotlin("jvm") version libs.versions.kotlin kotlin("plugin.serialization") version libs.versions.kotlin application}
application { mainClass.set("com.example.notifications.ApplicationKt")}
dependencies { // Ktor server + content negotiation, auth, metrics, status pages… implementation(libs.ktor.server.core) implementation(libs.ktor.server.netty) implementation(libs.ktor.server.auth.jwt) // Database: Exposed + Postgres + HikariCP + Flyway implementation(libs.exposed.core) implementation(libs.postgresql) implementation(libs.flyway.core) // Redis (Jedis), Kafka clients, Koin DI, Logback + Micrometer implementation(libs.jedis) implementation(libs.kafka.clients) implementation(libs.koin.ktor) implementation(libs.micrometer.prometheus) // Testing: kotlin-test, Ktor test host, Testcontainers, MockK testImplementation(libs.testcontainers.postgresql) testImplementation(libs.testcontainers.kafka) testImplementation(libs.mockk)}
kotlin { jvmToolchain(21) }Package structure
Section titled “Package structure”The layout follows a clean layered shape — domain (models + business logic),
infrastructure (database, cache, messaging, auth), api (routes, DTOs,
plugins), and di (the Koin module). Each directory maps back to a course module.
Directorysrc/main/kotlin/com/example/notifications/
- Application.kt entry point, server configuration
Directoryconfig/
- AppConfig.kt configuration data classes
- DatabaseConfig.kt HikariCP + Flyway setup
- RedisConfig.kt Jedis pool setup
- KafkaConfig.kt producer + consumer config
Directorydomain/
Directorymodel/ domain models (Modules 02, 17)
- Notification.kt
- NotificationChannel.kt
- NotificationStatus.kt
- UserPreference.kt
Directoryservice/ business logic (Module 05)
- NotificationService.kt
- UserPreferenceService.kt
Directoryinfrastructure/
Directorydatabase/ Exposed tables + repos (Module 10)
- …
Directorycache/ Redis operations (Module 11)
- …
Directorymessaging/ Kafka producer + consumer (Module 12)
- …
Directoryauth/ JWT utilities (Module 14)
- …
Directoryapi/
Directoryroutes/ HTTP routes (Module 08/09)
- …
Directorydto/ request/response DTOs (Module 15)
- …
Directoryplugins/ Ktor plugins (Serialization, Security, Monitoring, ErrorHandling)
- …
Directorydi/
- AppModule.kt Koin DI module (Module 04)
Directorysrc/main/resources/
- application.conf HOCON configuration
Directorydb/migration/ Flyway migrations (Module 10)
- …
- logback.xml logging config (Module 16)
- openapi/documentation.yaml OpenAPI spec (Module 15)
Directorysrc/test/kotlin/com/example/notifications/
Directoryunit/ domain + service unit tests
- …
Directoryintegration/ database, Kafka, API integration tests
- …
- TestContainerConfig.kt shared test infra
Domain modeling
Section titled “Domain modeling”The domain leans on Kotlin’s type system (Module 02) and sealed interfaces (Module
17). A NotificationChannel is a sealed interface — each channel carries its
own delivery data, and a when over it is exhaustive, checked by the compiler.
@SerialName gives each variant a stable type discriminator in JSON.
@Serializablesealed interface NotificationChannel { @Serializable @SerialName("email") data class Email(val to: String, val subject: String) : NotificationChannel
@Serializable @SerialName("sms") data class Sms(val phoneNumber: String) : NotificationChannel
@Serializable @SerialName("push") data class Push(val deviceToken: String, val title: String) : NotificationChannel
@Serializable @SerialName("webhook") data class Webhook( val url: String, val headers: Map<String, String> = emptyMap() ) : NotificationChannel}The Notification entity validates itself in an init block (require(...)),
keeps its lifecycle as an enum, and exposes domain logic like canRetry() so
business rules live with the data, not scattered across services.
@Serializabledata class Notification( val id: String = generateId(), val userId: String, val channel: NotificationChannel, val body: String, val status: NotificationStatus = NotificationStatus.PENDING, val metadata: Map<String, String> = emptyMap(), @Serializable(with = InstantSerializer::class) val createdAt: Instant = Clock.System.now(), val sentAt: Instant? = null, val failureReason: String? = null, val retryCount: Int = 0) { init { require(body.isNotBlank()) { "Notification body must not be blank" } require(retryCount >= 0) { "Retry count must be non-negative" } }
companion object { const val MAX_RETRIES = 3 }
fun canRetry(): Boolean = retryCount < MAX_RETRIES && status == NotificationStatus.FAILED}The lifecycle states form a small state machine that the rest of the service
respects (only PENDING/QUEUED can be cancelled; only FAILED can be retried):
stateDiagram-v2 [*] --> PENDING PENDING --> QUEUED: queued to Kafka PENDING --> CANCELLED: cancel QUEUED --> SENT: delivered to provider QUEUED --> CANCELLED: cancel SENT --> DELIVERED: confirmed SENT --> FAILED: delivery error FAILED --> QUEUED: retry (< MAX_RETRIES) DELIVERED --> [*] CANCELLED --> [*]
User preferences gate which channels are allowed and the per-hour cap. Note the
when over the sealed channel — exhaustive, so adding a channel forces you to
handle it here:
@Serializabledata class UserPreference( val userId: String, val enabledChannels: Set<String> = setOf("email"), val quietHoursStart: Int? = null, // hour 0-23, null = none val quietHoursEnd: Int? = null, val maxPerHour: Int = 10) { fun isChannelEnabled(channel: NotificationChannel): Boolean { val name = when (channel) { is NotificationChannel.Email -> "email" is NotificationChannel.Sms -> "sms" is NotificationChannel.Push -> "push" is NotificationChannel.Webhook -> "webhook" } return name in enabledChannels }}The API surface uses DTOs (CreateNotificationRequest, NotificationResponse,
PagedResponse<T>, ErrorResponse) with Notification.toResponse() extension
functions doing the mapping — request validation lives in the DTO’s init block,
keeping routes thin.
Database layer
Section titled “Database layer”Reference: Module 10 — Database Access (PostgreSQL).
Schema changes are versioned with Flyway migrations under db/migration/.
The notifications table stores the channel as a type column plus a JSONB blob,
and is indexed on the access patterns the API actually uses.
CREATE TABLE notifications ( id VARCHAR(36) PRIMARY KEY, user_id VARCHAR(255) NOT NULL, channel_type VARCHAR(50) NOT NULL, channel_data JSONB NOT NULL, body TEXT NOT NULL, status VARCHAR(50) NOT NULL DEFAULT 'PENDING', metadata JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), sent_at TIMESTAMPTZ, failure_reason TEXT, retry_count INTEGER NOT NULL DEFAULT 0, CONSTRAINT chk_status CHECK (status IN ( 'PENDING', 'QUEUED', 'SENT', 'DELIVERED', 'FAILED', 'CANCELLED' )));
CREATE INDEX idx_notifications_user_status ON notifications(user_id, status);CREATE INDEX idx_notifications_created_at ON notifications(created_at DESC);DatabaseConfig.init() builds a HikariCP pool, runs the migrations on startup,
then connects Exposed to that same pool:
object DatabaseConfig { fun init(config: AppConfig.DatabaseConfig) { val dataSource = HikariDataSource(HikariConfig().apply { jdbcUrl = config.url username = config.username password = config.password driverClassName = "org.postgresql.Driver" maximumPoolSize = config.maxPoolSize })
Flyway.configure() .dataSource(dataSource) .locations("classpath:db/migration") .load() .migrate()
Database.connect(dataSource) }}The repository maps Notification to and from Exposed Table rows, running each
query as a suspending transaction on the IO dispatcher via a dbQuery {} helper.
The pagination query is the instructive one — it returns the page plus the total
count so the API can build a PagedResponse:
suspend fun findByUserId( userId: String, page: Int = 1, pageSize: Int = 20): Pair<List<Notification>, Long> = dbQuery { val total = NotificationTable .selectAll() .where { NotificationTable.userId eq userId } .count()
val notifications = NotificationTable .selectAll() .where { NotificationTable.userId eq userId } .orderBy(NotificationTable.createdAt to SortOrder.DESC) .limit(pageSize) .offset(((page - 1) * pageSize).toLong()) .map { it.toNotification() }
notifications to total}
// Run database queries on the IO dispatchersuspend fun <T> dbQuery(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() }REST API endpoints
Section titled “REST API endpoints”Reference: Module 09 — Ktor.
The entry point installs plugins, wires Koin DI, then mounts routes. Health checks
are public; everything else is behind authenticate("jwt"):
fun Application.configureApp() { val appConfig = AppConfig.load() DatabaseConfig.init(appConfig.database)
configureSerialization() configureSecurity(appConfig.jwt) configureMonitoring() configureErrorHandling()
install(Koin) { slf4jLogger() modules(appModule(appConfig)) }
routing { healthRoutes() authenticate("jwt") { notificationRoutes() preferenceRoutes() } }}A route reads the JWT principal, enforces authorization (a user may only act on
their own data unless their role claim is admin), then delegates to the
service and folds the Result into an HTTP response. Here’s the create handler:
post { val principal = call.principal<JWTPrincipal>() ?: return@post call.respond(HttpStatusCode.Unauthorized) val callerUserId = principal.subject ?: return@post call.respond(HttpStatusCode.Unauthorized)
val request = call.receive<CreateNotificationRequest>()
// Authorization: only admins can send for other users val role = principal.payload.getClaim("role").asString() if (role != "admin" && request.userId != callerUserId) { return@post call.respond( HttpStatusCode.Forbidden, ErrorResponse("forbidden", "Cannot send notifications for other users") ) }
notificationService.create(request).fold( onSuccess = { call.respond(HttpStatusCode.Created, it.toResponse()) }, onFailure = { call.respond( HttpStatusCode.BadRequest, ErrorResponse("validation_error", it.message ?: "Invalid request") ) } )}Health routes stay outside auth and add a readiness probe that actually pings the
database, returning 503 if a dependency is down:
fun Route.healthRoutes() { route("/health") { get { call.respond(mapOf("status" to "UP")) }
get("/ready") { val checks = mutableMapOf<String, String>() try { dbQuery { exec("SELECT 1") } checks["database"] = "UP" } catch (e: Exception) { checks["database"] = "DOWN: ${e.message}" } val allUp = checks.values.all { it == "UP" } val status = if (allUp) HttpStatusCode.OK else HttpStatusCode.ServiceUnavailable call.respond(status, mapOf("status" to if (allUp) "UP" else "DOWN", "checks" to checks)) } }}Kafka integration
Section titled “Kafka integration”Reference: Module 12 — Event-Driven with Kafka.
Notifications can also arrive (and are dispatched) asynchronously over Kafka. The
producer is configured for idempotence (enable.idempotence=true, acks=all)
so retries don’t create duplicates; the consumer commits offsets manually only
after successful processing, and failures route to a dead-letter queue.
flowchart LR
API["POST /api/notifications"] --> SVC["NotificationService.create"]
SVC --> DB[("PostgreSQL")]
SVC --> CACHE[("Redis")]
SVC -->|"produce"| T["notifications topic"]
T -->|"poll"| CONS["NotificationConsumer"]
CONS -->|"success: commitSync"| DONE["status SENT/DELIVERED"]
CONS -->|"failure"| DLQ["notifications.dlq"]
The producer wraps a KafkaProducer<String, String>, keys records by userId
(for partition locality), and serializes a NotificationEvent envelope:
suspend fun send(notification: Notification) { val event = NotificationEvent( eventType = "notification.created", notification = CreateNotificationRequest( userId = notification.userId, channel = notification.channel, body = notification.body, metadata = notification.metadata ) )
val record = ProducerRecord( KafkaConfig.NOTIFICATION_TOPIC, notification.userId, // key = userId for partition locality json.encodeToString(event) )
withContext(Dispatchers.IO) { producer.send(record) { metadata, exception -> if (exception != null) logger.error("Failed to send to Kafka", exception) else logger.info("Sent: topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset()) } }}The consumer runs its poll loop in a coroutine, dispatches by eventType, commits
only after a clean batch, and ships any record it can’t process to the DLQ:
fun start(scope: CoroutineScope) { consumer.subscribe(listOf(KafkaConfig.NOTIFICATION_TOPIC))
scope.launch(Dispatchers.IO) { while (running && isActive) { val records = consumer.poll(Duration.ofMillis(1000)) for (record in records) { try { processRecord(record) } catch (e: Exception) { logger.error("Failed: partition={}, offset={}", record.partition(), record.offset(), e) handleProcessingError(record, e) // -> DLQ } } if (!records.isEmpty) consumer.commitSync() // commit only after success } }}Redis caching and rate limiting
Section titled “Redis caching and rate limiting”Reference: Module 11 — Redis & Caching Patterns.
Redis does two jobs here. First, a cache-aside layer over notifications with a
5-minute TTL — getNotification reads cache then falls back to the DB. Second, a
sliding-window rate limiter built on a Redis sorted set, where each request is
a member scored by timestamp. The check is pipelined so the trim-count-add-expire
sequence is atomic-ish, and we remove the just-added entry if it would exceed the
cap:
suspend fun checkRateLimit(userId: String, maxPerHour: Int): Boolean = withContext(Dispatchers.IO) { jedisPool.resource.use { jedis -> val key = "$RATE_LIMIT_PREFIX$userId" val now = System.currentTimeMillis() val windowStart = now - (RATE_LIMIT_WINDOW_SECONDS * 1000)
val pipeline = jedis.pipelined() pipeline.zremrangeByScore(key, 0.0, windowStart.toDouble()) // drop old val countResponse = pipeline.zcard(key) // count window pipeline.zadd(key, now.toDouble(), "$now") // add this one pipeline.expire(key, RATE_LIMIT_WINDOW_SECONDS) pipeline.sync()
val allowed = countResponse.get() < maxPerHour if (!allowed) jedis.zrem(key, "$now") // roll back the add allowed } }Security — JWT authentication
Section titled “Security — JWT authentication”Reference: Module 14 — Security & Authentication.
The JwtService signs HMAC-256 tokens carrying the userId as subject and a
role claim; the Ktor jwt auth provider verifies issuer/audience and turns a
valid token into a JWTPrincipal the routes read:
fun Application.configureSecurity(jwtConfig: AppConfig.JwtConfig) { val jwtService = JwtService(jwtConfig)
install(Authentication) { jwt("jwt") { realm = "Notification Service" verifier(jwtService.verifier()) validate { credential -> if (credential.payload.subject != null) JWTPrincipal(credential.payload) else null } challenge { _, _ -> call.respond( HttpStatusCode.Unauthorized, ErrorResponse("unauthorized", "Invalid or missing token") ) } } }}Configuration follows the 12-factor approach — every value loads from an
environment variable with a sensible local default, in AppConfig.load(). Secrets
like JWT_SECRET default to a dev placeholder you must override in production.
OpenAPI and observability
Section titled “OpenAPI and observability”The API publishes an OpenAPI 3.0 spec at src/main/resources/openapi/documentation.yaml
and serves Swagger UI via configureSwagger() — after start it’s at
http://localhost:8080/swagger. The spec defines bearerAuth security, the
NotificationChannel oneOf discriminated union, and every response schema.
Observability (Module 16) is three plugins:
CallLoggingwith MDC fields (requestId,userId) for structured logs;logback.xmlswitches between a human console pattern and JSON output based on aLOG_FORMATenv var.MicrometerMetricswith a Prometheus registry and JVM binders, scraped at/metrics.StatusPagesmappingIllegalArgumentException→400,ContentTransformationException→400, and any otherException→ a logged500with a genericErrorResponseso internals never leak.
fun Application.configureMonitoring() { install(CallLogging) { level = Level.INFO filter { call -> call.request.path().startsWith("/api") } mdc("requestId") { it.request.header("X-Request-ID") ?: UUID.randomUUID().toString() } mdc("userId") { it.principal<JWTPrincipal>()?.subject ?: "anonymous" } }
val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) install(MicrometerMetrics) { this.registry = registry meterBinders = listOf(JvmMemoryMetrics(), JvmGcMetrics(), ProcessorMetrics()) } routing { get("/metrics") { call.respondText(registry.scrape()) } }}Testing
Section titled “Testing”Reference: Module 13 — Testing JVM Applications.
The suite has two tiers. Unit tests (MockK) cover domain invariants and
service orchestration with no infrastructure — e.g. create fails when the rate
limit is exceeded, verified with coVerify(exactly = 0) { repository.create(any()) }.
Integration tests spin up real Postgres and Kafka with Testcontainers from
a shared base class, and drive the API with Ktor’s testApplication.
abstract class IntegrationTestBase { companion object { val postgres = PostgreSQLContainer("postgres:16-alpine").apply { withDatabaseName("notifications_test") withUsername("test"); withPassword("test") } val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.0")) init { postgres.start(); kafka.start() } }
protected val testConfig = AppConfig( database = AppConfig.DatabaseConfig(postgres.jdbcUrl, postgres.username, postgres.password), redis = AppConfig.RedisConfig(), kafka = AppConfig.KafkaConfig(bootstrapServers = kafka.bootstrapServers), jwt = AppConfig.JwtConfig(secret = "test-secret-key-minimum-256-bits-long") )}An API test proves end-to-end auth and isolation — one user cannot read another’s notification:
@Testfun `users cannot access other users notifications`() = testApplication { application { configureApp() } val user1 = jwtService.generateToken("user-1", "user") val user2 = jwtService.generateToken("user-2", "user")
val created = client.post("/api/notifications") { bearerAuth(user1); contentType(ContentType.Application.Json) setBody("""{"userId":"user-1","channel":{"type":"email","to":"u@test.com","subject":"P"},"body":"Private"}""") } val id = Json.parseToJsonElement(created.bodyAsText()).jsonObject["id"]!!.jsonPrimitive.content
val resp = client.get("/api/notifications/$id") { bearerAuth(user2) } assertEquals(HttpStatusCode.Forbidden, resp.status)}Run them by tier:
./gradlew test # everything./gradlew test --tests "com.example.notifications.unit.*" # unit only./gradlew test --tests "com.example.notifications.integration.*" # integration onlyDeployment
Section titled “Deployment”Reference: Module 18 — Deployment & Production.
A multi-stage Dockerfile builds with the JDK image and runs on a slim JRE
image as a non-root user, with a HEALTHCHECK hitting /health:
FROM eclipse-temurin:21-jdk-alpine AS buildWORKDIR /appCOPY gradle/ gradle/COPY gradlew build.gradle.kts settings.gradle.kts ./RUN ./gradlew dependencies --no-daemon || trueCOPY src/ src/RUN ./gradlew installDist --no-daemon
FROM eclipse-temurin:21-jre-alpineWORKDIR /appRUN addgroup -S appgroup && adduser -S appuser -G appgroupCOPY --from=build /app/build/install/notification-service/ ./RUN chown -R appuser:appgroup /appUSER appuserHEALTHCHECK --interval=30s --timeout=5s --retries=3 \ CMD wget -qO- http://localhost:8080/health || exit 1EXPOSE 8080ENTRYPOINT ["./bin/notification-service"]docker-compose.yml brings up the whole stack — the app plus Postgres, Redis, and
a KRaft-mode Kafka, with optional dev (Kafka UI) and monitoring (Prometheus +
Grafana) profiles. Services wait on each other via healthchecks. A GitHub Actions
workflow builds, runs unit then integration tests, uploads reports, and (on
main) builds the Docker image with layer caching.
-
Start the full stack:
Terminal window docker compose up -d # core: app, postgres, redis, kafkadocker compose --profile dev up -d # + Kafka UIdocker compose ps # all services should be "healthy" -
Exercise the API (a real deploy gets the token from an auth service):
Terminal window curl -s -X POST http://localhost:8080/api/notifications \-H "Authorization: Bearer $TOKEN" \-H "Content-Type: application/json" \-d '{"userId":"user-1","channel":{"type":"email","to":"user@example.com","subject":"Welcome!"},"body":"Your account is ready."}' | jq -
Verify health, metrics, and docs:
Terminal window curl -s http://localhost:8080/health/ready | jqcurl -s http://localhost:8080/metrics | head -20open http://localhost:8080/swagger
Putting it all together
Section titled “Putting it all together”The NotificationService is the orchestrator that pulls every layer into one
flow. create() reads it top to bottom: check preferences, enforce the channel
allow-list, check the Redis rate limit, build the entity, persist it, cache it,
then produce to Kafka and mark it QUEUED. If Kafka fails, the row is still saved
so a retry mechanism can pick it up — durability before delivery.
suspend fun create(request: CreateNotificationRequest): Result<Notification> { val preference = preferenceService.getPreference(request.userId) // 1. prefs (cached)
if (!preference.isChannelEnabled(request.channel)) { // 2. channel allowed? return Result.failure( IllegalArgumentException("Channel ${channelName(request.channel)} is disabled") ) } if (!cache.checkRateLimit(request.userId, preference.maxPerHour)) { // 3. rate limit (Redis) return Result.failure( IllegalStateException("Rate limit exceeded: max ${preference.maxPerHour} per hour") ) }
val notification = Notification( // 4. build entity userId = request.userId, channel = request.channel, body = request.body, metadata = request.metadata ) val saved = repository.create(notification) // 5. persist (Postgres) cache.cacheNotification(saved) // 6. cache (Redis)
try { // 7. queue (Kafka) producer.send(saved) repository.updateStatus(saved.id, NotificationStatus.QUEUED) } catch (e: Exception) { logger.error("Failed to queue notification: {}", saved.id, e) // saved, not queued }
return Result.success(saved)}Koin wires the graph in one module — pool, cache, producer, repositories,
services, the JWT service, and the Kafka consumer — and the consumer is started
(and gracefully stopped) from configureApp():
fun appModule(config: AppConfig) = module { single { RedisConfig.createPool(config.redis) } single { NotificationCache(get()) } single { NotificationProducer(config.kafka.bootstrapServers) } single { NotificationRepository() } single { UserPreferenceService(get(), get()) } single { NotificationService(get(), get(), get(), get()) } single { JwtService(config.jwt) } single { NotificationConsumer(config.kafka.bootstrapServers, get(), get()) }}Architecture decisions
Section titled “Architecture decisions”| Decision | Why |
|---|---|
| Ktor over Spring Boot | Lighter weight, explicit configuration, Kotlin-first |
| Exposed over JPA | Kotlin DSL for SQL, better coroutine support |
| Koin over Spring DI | Simple, lightweight, no annotation processing |
| Sealed interfaces for channels | Exhaustive when enforced by the compiler |
| Redis sorted sets for rate limiting | Sliding window, atomic-ish operations |
| Idempotent Kafka producer | Avoids duplicate notifications on retry |
| Testcontainers for integration tests | Real infrastructure, no mocks |
Where to go next
Section titled “Where to go next”You now have a complete reference implementation. From here you can add more channels (Slack, Teams), notification templates with variable substitution, WebSocket delivery-status updates, distributed tracing with OpenTelemetry, a Kubernetes deploy (Module 18), or a frontend with Compose Multiplatform (Module 19).
Practice
Section titled “Practice”Build the whole thing yourself — the project skeleton and a worked reference walk you through every layer above, from domain models to the CI pipeline.