Notification Microservice
Build a production-quality notification microservice that ties together every module in the course: a REST API backed by PostgreSQL, Redis for caching and rate limiting, Kafka for asynchronous delivery, JWT authentication via Spring Security, OpenAPI docs, structured logging, Prometheus metrics, a Testcontainers test suite, and a Docker Compose stack. This is the capstone — the same shape as a real service you’d run in production.
This is a Spring Boot service (spring-boot-starter-web + Spring MVC), so if
you’re coming from Express/Nest in TypeScript or net/http/chi in Go, the
controller-service-repository layering will feel familiar — Spring just wires the
layers together with annotations and dependency injection.
What it demonstrates
Section titled “What it demonstrates”- Domain modeling — JPA entities, a channel enum, a status state machine.
- Database layer — Spring Data JPA repositories + Flyway migrations.
- REST API — Spring MVC controllers with request DTOs and validation.
- Kafka integration — produce notification events, consume and process them async.
- Redis caching — cache user preferences, rate-limit delivery per user.
- JWT security — a
OncePerRequestFilterthat validates tokens and populates the security context. - OpenAPI docs — springdoc annotations on the controllers, Swagger UI at
/swagger-ui.html. - Observability — structured logging, Micrometer + Prometheus metrics, actuator health.
- Testing — unit tests with mocks, integration tests with Testcontainers.
- Deployment — a
docker-compose.ymlfor the whole stack (Postgres, Redis, Kafka, Prometheus, Grafana).
Project layout
Section titled “Project layout”A standard Gradle + Spring Boot tree, organized by layer: api (controllers,
DTOs, plugins), domain (models + services), infrastructure (database, cache,
messaging, auth), and config (Spring beans). Below are the highlighted files —
the ones we walk through in the tour.
Directorynotification-service/
- build.gradle.kts Spring Boot, JPA, Kafka, Redis, security, JWT deps
- docker-compose.yml Postgres, Redis, Kafka, Kafka UI, Prometheus, Grafana
Directorysrc/
Directorymain/
Directorykotlin/com/example/notifications/
- Application.kt
@SpringBootApplicationentrypoint Directoryapi/
Directoryroutes/
- NotificationController.kt REST endpoints for notifications
- AuthController.kt mints JWT tokens for testing
- PreferenceController.kt user channel preferences
- HealthController.kt health endpoint
Directorydto/ request/response data classes
- …
- plugins/GlobalExceptionHandler.kt maps exceptions to responses
Directorydomain/
Directorymodel/
- Notification.kt JPA entity + channel/status enums
- UserPreference.kt per-channel opt-in entity
Directoryservice/
- NotificationService.kt core business logic
- UserPreferenceService.kt preference logic
Directoryinfrastructure/
Directorydatabase/
- NotificationRepository.kt Spring Data JPA repo
- UserPreferenceRepository.kt
Directorymessaging/
- NotificationProducer.kt publishes to Kafka
- NotificationConsumer.kt consumes + delivers async
Directorycache/ NotificationCache.kt Redis caching + rate limiting
- …
Directoryauth/ JwtService.kt sign/verify JWTs
- …
Directoryconfig/
- SecurityConfig.kt Spring Security + JWT filter
- KafkaConfig.kt Kafka bean wiring
- di/AppModule.kt extra bean definitions
- Application.kt
Directoryresources/
- application.yml datasource, Kafka, JWT, actuator config
Directorydb/migration/
- V1__create_notifications.sql notifications table
- V2__create_user_preferences.sql preferences table
- openapi/documentation.yaml
Directorytest/kotlin/com/example/notifications/
Directoryunit/ service + domain unit tests
- …
- integration/DatabaseIntegrationTest.kt Testcontainers
Architecture
Section titled “Architecture”Two views. First the service topology: a request hits the REST API, passes through the JWT security filter, lands in the service layer, and fans out to three backing stores — PostgreSQL for durable state, Redis for caching and rate limits, and Kafka for asynchronous delivery.
flowchart TB
C["Client (curl / upstream)"] --> SEC["JWT security filter"]
SEC --> CTRL["NotificationController"]
CTRL --> SVC["NotificationService"]
SVC --> DB[("PostgreSQL")]
SVC --> CACHE[("Redis cache + rate limit")]
SVC --> PROD["NotificationProducer"]
PROD -->|"notifications topic"| K["Kafka"]
K --> CONS["NotificationConsumer"]
CONS -->|"deliver + updateStatus"| SVC
ACT["Actuator /prometheus"] -.-> SVC
Second the notification lifecycle: the synchronous request path persists a
PENDING row and publishes an event; the asynchronous consumer picks it up,
delivers it, and transitions the status to SENT (or FAILED, with retry).
sequenceDiagram
participant Client
participant API as NotificationController
participant Svc as NotificationService
participant DB as PostgreSQL
participant Kafka
participant Cons as NotificationConsumer
Client->>API: POST /api/notifications (Bearer JWT)
API->>Svc: createNotification()
Svc->>DB: save(status=PENDING)
Svc->>Kafka: send("notifications", id, notification)
API-->>Client: 201 Created
Kafka->>Cons: processNotification()
Cons->>Svc: updateStatus(id, SENT)
Svc->>DB: save(status=SENT, sentAt=now)
The domain model
Section titled “The domain model”The center of the service is the Notification entity. It’s a JPA data class
annotated with @Entity/@Table. Note the Kotlin idioms: default values on every
property (so new Notification()-style construction is easy and JPA is happy), a
nullable failureReason: String?, and a companion object constant for
MAX_RETRIES. The channel and status are Kotlin enum classes persisted as
strings via @Enumerated(EnumType.STRING).
package com.example.notifications.domain.model
import jakarta.persistence.*import java.time.Instant
@Entity@Table(name = "notifications")data class Notification( @Id val id: String = java.util.UUID.randomUUID().toString(), val userId: String = "", @Enumerated(EnumType.STRING) val channel: NotificationChannel = NotificationChannel.EMAIL, @Column(columnDefinition = "TEXT") val body: String = "", @Enumerated(EnumType.STRING) var status: NotificationStatus = NotificationStatus.PENDING, var failureReason: String? = null, var retryCount: Int = 0, val createdAt: Instant = Instant.now(), var updatedAt: Instant = Instant.now(), var sentAt: Instant? = null) { companion object { const val MAX_RETRIES = 3 }
fun canRetry(): Boolean = retryCount < MAX_RETRIES && status == NotificationStatus.FAILED}
enum class NotificationChannel { EMAIL, SMS, PUSH, WEBHOOK}
enum class NotificationStatus { PENDING, QUEUED, SENT, DELIVERED, FAILED, CANCELLED}The API layer
Section titled “The API layer”The NotificationController is a Spring MVC @RestController. Each method maps an
HTTP verb + path to a handler; @RequestBody deserializes JSON into a DTO, and the
injected Authentication gives you the authenticated user (populated by the JWT
filter further down). The @Operation/@Tag annotations feed springdoc to build
the OpenAPI docs.
@RestController@RequestMapping("/api/notifications")@Tag(name = "Notifications", description = "Notification management endpoints")class NotificationController( private val notificationService: NotificationService) {
@PostMapping @Operation(summary = "Send a notification") fun createNotification( @RequestBody request: CreateNotificationRequest, authentication: Authentication ): ResponseEntity<NotificationResponse> { val notification = Notification( userId = request.userId, channel = NotificationChannel.valueOf(request.channel.uppercase()), body = request.body ) val created = notificationService.createNotification(notification) return ResponseEntity.status(HttpStatus.CREATED).body(NotificationResponse.from(created)) }
@GetMapping("/{id}") @Operation(summary = "Get notification by ID") fun getNotification(@PathVariable id: String): ResponseEntity<NotificationResponse> { val notification = notificationService.getNotification(id) return if (notification != null) { ResponseEntity.ok(NotificationResponse.from(notification)) } else { ResponseEntity.notFound().build() } }
@GetMapping @Operation(summary = "List notifications for the authenticated user") fun listNotifications(authentication: Authentication): ResponseEntity<List<NotificationResponse>> { val userId = authentication.name val notifications = notificationService.getNotificationsByUser(userId) return ResponseEntity.ok(notifications.map { NotificationResponse.from(it) }) }}The request DTO is a plain Kotlin data class — Jackson (with the Kotlin module)
handles the JSON, and metadata: Map<String, String> carries channel-specific
extras like an email subject:
data class CreateNotificationRequest( val userId: String, val channel: String, val body: String, val metadata: Map<String, String> = emptyMap())The service layer
Section titled “The service layer”NotificationService is the business-logic core, marked @Service. It persists a
notification as PENDING, then (in the finished version) publishes to Kafka for
async delivery. updateStatus is what the consumer calls back into to move a
notification through its lifecycle.
The starter scaffold leaves the full workflow as // TODO — that’s your work: wire
the producer in, check preferences, and consult the rate limiter before saving.
@Serviceclass NotificationService( private val notificationRepository: NotificationRepository) { private val logger = LoggerFactory.getLogger(javaClass)
fun createNotification(notification: Notification): Notification { logger.info("Creating notification: userId={}, channel={}", notification.userId, notification.channel)
val saved = notificationRepository.save( notification.copy(status = NotificationStatus.PENDING, createdAt = Instant.now()) )
// TODO: Publish to Kafka topic for async processing // kafkaProducer.send(saved)
return saved }
fun getNotification(id: String): Notification? = notificationRepository.findById(id).orElse(null)
fun getNotificationsByUser(userId: String): List<Notification> = notificationRepository.findByUserId(userId)
fun updateStatus(id: String, status: NotificationStatus, failureReason: String? = null): Notification? { val notification = notificationRepository.findById(id).orElse(null) ?: return null notification.status = status notification.updatedAt = Instant.now() notification.failureReason = failureReason if (status == NotificationStatus.SENT) { notification.sentAt = Instant.now() } return notificationRepository.save(notification) }}Kafka: producer and consumer
Section titled “Kafka: producer and consumer”The synchronous request returns fast; actual delivery happens off a Kafka topic.
The producer wraps a KafkaTemplate<String, Any> and sends the notification keyed
by id, so all events for one notification land on the same partition:
@Componentclass NotificationProducer( private val kafkaTemplate: KafkaTemplate<String, Any>) { private val logger = LoggerFactory.getLogger(javaClass)
companion object { const val TOPIC = "notifications" }
fun send(notification: Notification) { logger.info("Publishing notification to Kafka: id={}, channel={}", notification.id, notification.channel) kafkaTemplate.send(TOPIC, notification.id, notification) }}The consumer is a @KafkaListener on that same topic, in its own consumer group.
Spring deserializes the JSON straight back into a Notification. It simulates
delivery, then calls updateStatus to mark the row SENT (or FAILED, with the
failure message, for retry):
@Componentclass NotificationConsumer( private val notificationService: NotificationService) { private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener( topics = [NotificationProducer.TOPIC], groupId = "notification-processor" ) fun processNotification(notification: Notification) { logger.info("Processing notification: id={}, channel={}", notification.id, notification.channel)
try { // TODO: Simulate delivery based on channel type // TODO: Update status to SENT on success notificationService.updateStatus(notification.id, NotificationStatus.SENT) logger.info("Notification delivered: id={}", notification.id) } catch (e: Exception) { logger.error("Notification delivery failed: id={}", notification.id, e) notificationService.updateStatus(notification.id, NotificationStatus.FAILED, e.message) } }}The serialization is configured in application.yml — JsonSerializer on the
producer, JsonDeserializer with spring.json.trusted.packages: "com.example.*"
on the consumer so it’ll deserialize into your domain types:
spring: kafka: bootstrap-servers: localhost:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: notification-processor key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest properties: spring.json.trusted.packages: "com.example.*"The database layer
Section titled “The database layer”Persistence is Spring Data JPA: declare an interface extending
JpaRepository<Notification, String> and Spring generates the implementation at
runtime. The query methods (findByUserId, findByStatus) are derived — Spring
parses the method name into a query. No SQL, no boilerplate.
@Repositoryinterface NotificationRepository : JpaRepository<Notification, String> { fun findByUserId(userId: String): List<Notification> fun findByStatus(status: NotificationStatus): List<Notification>}The schema itself is owned by Flyway, not Hibernate — note ddl-auto: validate
in application.yml, which means Hibernate only checks the schema matches the
entity, it never mutates it. The table comes from a versioned migration:
CREATE TABLE IF NOT EXISTS notifications ( id VARCHAR(36) PRIMARY KEY, user_id VARCHAR(255) NOT NULL, channel VARCHAR(20) NOT NULL, body TEXT NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'PENDING', failure_reason TEXT, retry_count INT NOT NULL DEFAULT 0, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), sent_at TIMESTAMP WITH TIME ZONE);
CREATE INDEX idx_notifications_user_id ON notifications(user_id);CREATE INDEX idx_notifications_status ON notifications(status);CREATE INDEX idx_notifications_created_at ON notifications(created_at);The cache layer
Section titled “The cache layer”NotificationCache wraps a StringRedisTemplate for two jobs: caching user
preferences (cache-aside) and rate-limiting delivery per user. The scaffold ships
a simple counter-with-TTL rate limiter; the full version uses a sliding window.
@Componentclass NotificationCache( private val redisTemplate: StringRedisTemplate) { companion object { private const val RATE_LIMIT_PREFIX = "rate:notify:" private const val PREFERENCE_PREFIX = "pref:" }
// Check if the user has exceeded their notification rate limit. // TODO: implement a sliding-window limiter (Redis sorted set). fun isRateLimited(userId: String, maxPerHour: Int): Boolean { return false }
fun recordNotificationSent(userId: String) { val key = "$RATE_LIMIT_PREFIX$userId" redisTemplate.opsForValue().increment(key) redisTemplate.expire(key, Duration.ofHours(1)) }}The auth layer
Section titled “The auth layer”JWTs are signed and verified by JwtService using the jjwt library. The signing
key is derived from a configured secret (@Value("\${jwt.secret:...}") — a Spring
property with a default). validateToken returns the Claims or null on any
failure, so callers branch on null rather than catching exceptions.
@Serviceclass JwtService( @Value("\${jwt.secret:my-super-secret-key-that-is-at-least-256-bits-long}") private val secret: String, @Value("\${jwt.expiration-ms:3600000}") private val expirationMs: Long) { private val key: SecretKey by lazy { Keys.hmacShaKeyFor(secret.toByteArray()) }
fun generateToken(username: String, role: String): String { val now = Date() val expiry = Date(now.time + expirationMs)
return Jwts.builder() .subject(username) .claim("role", role) .issuedAt(now) .expiration(expiry) .signWith(key) .compact() }
fun validateToken(token: String): Claims? = try { Jwts.parser() .verifyWith(key) .build() .parseSignedClaims(token) .payload } catch (e: Exception) { null }
fun getUsernameFromToken(token: String): String? = validateToken(token)?.subject fun getRoleFromToken(token: String): String? = validateToken(token)?.get("role", String::class.java)}SecurityConfig is where it all plugs into Spring Security: a stateless filter
chain (no sessions), public routes for /auth/**, /actuator/**, and Swagger, and
everything else authenticated. The custom JwtAuthFilter runs before Spring’s
username/password filter — it reads the Bearer token, validates it, and on
success builds an Authentication with a ROLE_<role> authority and puts it in
the SecurityContextHolder (which is what fed authentication.name back in the
controller).
@Configuration@EnableWebSecurityclass SecurityConfig( private val jwtAuthFilter: JwtAuthFilter) { @Bean fun securityFilterChain(http: HttpSecurity): SecurityFilterChain { http .csrf { it.disable() } .sessionManagement { it.sessionCreationPolicy(SessionCreationPolicy.STATELESS) } .authorizeHttpRequests { auth -> auth .requestMatchers("/auth/**").permitAll() .requestMatchers("/actuator/**").permitAll() .requestMatchers("/swagger-ui/**", "/v3/api-docs/**").permitAll() .anyRequest().authenticated() } .addFilterBefore(jwtAuthFilter, UsernamePasswordAuthenticationFilter::class.java)
return http.build() }}
@Componentclass JwtAuthFilter( private val jwtService: JwtService) : OncePerRequestFilter() {
override fun doFilterInternal( request: HttpServletRequest, response: HttpServletResponse, filterChain: FilterChain ) { val authHeader = request.getHeader("Authorization")
if (authHeader != null && authHeader.startsWith("Bearer ")) { val token = authHeader.substring(7) val claims = jwtService.validateToken(token)
if (claims != null) { val username = claims.subject val role = claims.get("role", String::class.java) ?: "USER" val authorities = listOf(SimpleGrantedAuthority("ROLE_$role")) val authentication = UsernamePasswordAuthenticationToken(username, null, authorities) SecurityContextHolder.getContext().authentication = authentication } }
filterChain.doFilter(request, response) }}Run it
Section titled “Run it”The service needs Postgres, Redis, and Kafka running. The course’s shared infra
stack provides all three; this exercise also ships its own docker-compose.yml
with the full observability stack (Prometheus + Grafana).
-
Start the infrastructure (Postgres, Redis, Kafka, plus Kafka UI, Prometheus, Grafana):
Terminal window cd shared-infra && docker compose up -d# or, from this exercise's own stack:docker compose up -d -
Run the application (starts on port 8088):
Terminal window ./gradlew bootRun -
Mint a JWT (the
/auth/tokenroute is unauthenticated, for testing):Terminal window TOKEN=$(curl -s -X POST http://localhost:8088/auth/token \-H "Content-Type: application/json" \-d '{"username": "testuser", "role": "USER"}' | jq -r '.token') -
Send a notification:
Terminal window curl -X POST http://localhost:8088/api/notifications \-H "Content-Type: application/json" \-H "Authorization: Bearer $TOKEN" \-d '{"userId": "user-1","channel": "email","body": "Welcome to the notification service!","metadata": {"subject": "Welcome"}}' -
Read it back, check health, and scrape metrics:
Terminal window curl -H "Authorization: Bearer $TOKEN" http://localhost:8088/api/notificationscurl http://localhost:8088/actuator/healthcurl http://localhost:8088/actuator/prometheus | grep notification
The OpenAPI docs live at http://localhost:8088/swagger-ui.html, the Kafka UI at
http://localhost:8090, and Grafana at http://localhost:3000 (admin/admin).
Run the tests
Section titled “Run the tests”# Unit tests (mocked dependencies)./gradlew testIntegration tests use Testcontainers, so they require Docker — they spin up real Postgres and Kafka containers per run, giving you confidence the JPA mappings and migrations actually work against the real database.