Skip to content

Notification Microservice

Build a production-quality notification microservice that ties together every module in the course: a REST API backed by PostgreSQL, Redis for caching and rate limiting, Kafka for asynchronous delivery, JWT authentication via Spring Security, OpenAPI docs, structured logging, Prometheus metrics, a Testcontainers test suite, and a Docker Compose stack. This is the capstone — the same shape as a real service you’d run in production.

This is a Spring Boot service (spring-boot-starter-web + Spring MVC), so if you’re coming from Express/Nest in TypeScript or net/http/chi in Go, the controller-service-repository layering will feel familiar — Spring just wires the layers together with annotations and dependency injection.

  • Domain modeling — JPA entities, a channel enum, a status state machine.
  • Database layer — Spring Data JPA repositories + Flyway migrations.
  • REST API — Spring MVC controllers with request DTOs and validation.
  • Kafka integration — produce notification events, consume and process them async.
  • Redis caching — cache user preferences, rate-limit delivery per user.
  • JWT security — a OncePerRequestFilter that validates tokens and populates the security context.
  • OpenAPI docs — springdoc annotations on the controllers, Swagger UI at /swagger-ui.html.
  • Observability — structured logging, Micrometer + Prometheus metrics, actuator health.
  • Testing — unit tests with mocks, integration tests with Testcontainers.
  • Deployment — a docker-compose.yml for the whole stack (Postgres, Redis, Kafka, Prometheus, Grafana).

A standard Gradle + Spring Boot tree, organized by layer: api (controllers, DTOs, plugins), domain (models + services), infrastructure (database, cache, messaging, auth), and config (Spring beans). Below are the highlighted files — the ones we walk through in the tour.

  • Directorynotification-service/
    • build.gradle.kts Spring Boot, JPA, Kafka, Redis, security, JWT deps
    • docker-compose.yml Postgres, Redis, Kafka, Kafka UI, Prometheus, Grafana
    • Directorysrc/
      • Directorymain/
        • Directorykotlin/com/example/notifications/
          • Application.kt @SpringBootApplication entrypoint
          • Directoryapi/
            • Directoryroutes/
              • NotificationController.kt REST endpoints for notifications
              • AuthController.kt mints JWT tokens for testing
              • PreferenceController.kt user channel preferences
              • HealthController.kt health endpoint
            • Directorydto/ request/response data classes
            • plugins/GlobalExceptionHandler.kt maps exceptions to responses
          • Directorydomain/
            • Directorymodel/
              • Notification.kt JPA entity + channel/status enums
              • UserPreference.kt per-channel opt-in entity
            • Directoryservice/
              • NotificationService.kt core business logic
              • UserPreferenceService.kt preference logic
          • Directoryinfrastructure/
            • Directorydatabase/
              • NotificationRepository.kt Spring Data JPA repo
              • UserPreferenceRepository.kt
            • Directorymessaging/
              • NotificationProducer.kt publishes to Kafka
              • NotificationConsumer.kt consumes + delivers async
            • Directorycache/ NotificationCache.kt Redis caching + rate limiting
            • Directoryauth/ JwtService.kt sign/verify JWTs
          • Directoryconfig/
            • SecurityConfig.kt Spring Security + JWT filter
            • KafkaConfig.kt Kafka bean wiring
          • di/AppModule.kt extra bean definitions
        • Directoryresources/
          • application.yml datasource, Kafka, JWT, actuator config
          • Directorydb/migration/
            • V1__create_notifications.sql notifications table
            • V2__create_user_preferences.sql preferences table
          • openapi/documentation.yaml
      • Directorytest/kotlin/com/example/notifications/
        • Directoryunit/ service + domain unit tests
        • integration/DatabaseIntegrationTest.kt Testcontainers

Two views. First the service topology: a request hits the REST API, passes through the JWT security filter, lands in the service layer, and fans out to three backing stores — PostgreSQL for durable state, Redis for caching and rate limits, and Kafka for asynchronous delivery.

Service topology
Rendering diagram…

Second the notification lifecycle: the synchronous request path persists a PENDING row and publishes an event; the asynchronous consumer picks it up, delivers it, and transitions the status to SENT (or FAILED, with retry).

Notification flow
Rendering diagram…

The center of the service is the Notification entity. It’s a JPA data class annotated with @Entity/@Table. Note the Kotlin idioms: default values on every property (so new Notification()-style construction is easy and JPA is happy), a nullable failureReason: String?, and a companion object constant for MAX_RETRIES. The channel and status are Kotlin enum classes persisted as strings via @Enumerated(EnumType.STRING).

src/main/kotlin/com/example/notifications/domain/model/Notification.kt
package com.example.notifications.domain.model
import jakarta.persistence.*
import java.time.Instant
@Entity
@Table(name = "notifications")
data class Notification(
@Id
val id: String = java.util.UUID.randomUUID().toString(),
val userId: String = "",
@Enumerated(EnumType.STRING)
val channel: NotificationChannel = NotificationChannel.EMAIL,
@Column(columnDefinition = "TEXT")
val body: String = "",
@Enumerated(EnumType.STRING)
var status: NotificationStatus = NotificationStatus.PENDING,
var failureReason: String? = null,
var retryCount: Int = 0,
val createdAt: Instant = Instant.now(),
var updatedAt: Instant = Instant.now(),
var sentAt: Instant? = null
) {
companion object {
const val MAX_RETRIES = 3
}
fun canRetry(): Boolean = retryCount < MAX_RETRIES && status == NotificationStatus.FAILED
}
enum class NotificationChannel {
EMAIL, SMS, PUSH, WEBHOOK
}
enum class NotificationStatus {
PENDING,
QUEUED,
SENT,
DELIVERED,
FAILED,
CANCELLED
}

The NotificationController is a Spring MVC @RestController. Each method maps an HTTP verb + path to a handler; @RequestBody deserializes JSON into a DTO, and the injected Authentication gives you the authenticated user (populated by the JWT filter further down). The @Operation/@Tag annotations feed springdoc to build the OpenAPI docs.

src/main/kotlin/com/example/notifications/api/routes/NotificationController.kt
@RestController
@RequestMapping("/api/notifications")
@Tag(name = "Notifications", description = "Notification management endpoints")
class NotificationController(
private val notificationService: NotificationService
) {
@PostMapping
@Operation(summary = "Send a notification")
fun createNotification(
@RequestBody request: CreateNotificationRequest,
authentication: Authentication
): ResponseEntity<NotificationResponse> {
val notification = Notification(
userId = request.userId,
channel = NotificationChannel.valueOf(request.channel.uppercase()),
body = request.body
)
val created = notificationService.createNotification(notification)
return ResponseEntity.status(HttpStatus.CREATED).body(NotificationResponse.from(created))
}
@GetMapping("/{id}")
@Operation(summary = "Get notification by ID")
fun getNotification(@PathVariable id: String): ResponseEntity<NotificationResponse> {
val notification = notificationService.getNotification(id)
return if (notification != null) {
ResponseEntity.ok(NotificationResponse.from(notification))
} else {
ResponseEntity.notFound().build()
}
}
@GetMapping
@Operation(summary = "List notifications for the authenticated user")
fun listNotifications(authentication: Authentication): ResponseEntity<List<NotificationResponse>> {
val userId = authentication.name
val notifications = notificationService.getNotificationsByUser(userId)
return ResponseEntity.ok(notifications.map { NotificationResponse.from(it) })
}
}

The request DTO is a plain Kotlin data class — Jackson (with the Kotlin module) handles the JSON, and metadata: Map<String, String> carries channel-specific extras like an email subject:

src/main/kotlin/com/example/notifications/api/dto/NotificationRequest.kt
data class CreateNotificationRequest(
val userId: String,
val channel: String,
val body: String,
val metadata: Map<String, String> = emptyMap()
)

NotificationService is the business-logic core, marked @Service. It persists a notification as PENDING, then (in the finished version) publishes to Kafka for async delivery. updateStatus is what the consumer calls back into to move a notification through its lifecycle.

The starter scaffold leaves the full workflow as // TODO — that’s your work: wire the producer in, check preferences, and consult the rate limiter before saving.

src/main/kotlin/com/example/notifications/domain/service/NotificationService.kt
@Service
class NotificationService(
private val notificationRepository: NotificationRepository
) {
private val logger = LoggerFactory.getLogger(javaClass)
fun createNotification(notification: Notification): Notification {
logger.info("Creating notification: userId={}, channel={}", notification.userId, notification.channel)
val saved = notificationRepository.save(
notification.copy(status = NotificationStatus.PENDING, createdAt = Instant.now())
)
// TODO: Publish to Kafka topic for async processing
// kafkaProducer.send(saved)
return saved
}
fun getNotification(id: String): Notification? =
notificationRepository.findById(id).orElse(null)
fun getNotificationsByUser(userId: String): List<Notification> =
notificationRepository.findByUserId(userId)
fun updateStatus(id: String, status: NotificationStatus, failureReason: String? = null): Notification? {
val notification = notificationRepository.findById(id).orElse(null) ?: return null
notification.status = status
notification.updatedAt = Instant.now()
notification.failureReason = failureReason
if (status == NotificationStatus.SENT) {
notification.sentAt = Instant.now()
}
return notificationRepository.save(notification)
}
}

The synchronous request returns fast; actual delivery happens off a Kafka topic. The producer wraps a KafkaTemplate<String, Any> and sends the notification keyed by id, so all events for one notification land on the same partition:

src/main/kotlin/com/example/notifications/infrastructure/messaging/NotificationProducer.kt
@Component
class NotificationProducer(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
private val logger = LoggerFactory.getLogger(javaClass)
companion object {
const val TOPIC = "notifications"
}
fun send(notification: Notification) {
logger.info("Publishing notification to Kafka: id={}, channel={}", notification.id, notification.channel)
kafkaTemplate.send(TOPIC, notification.id, notification)
}
}

The consumer is a @KafkaListener on that same topic, in its own consumer group. Spring deserializes the JSON straight back into a Notification. It simulates delivery, then calls updateStatus to mark the row SENT (or FAILED, with the failure message, for retry):

src/main/kotlin/com/example/notifications/infrastructure/messaging/NotificationConsumer.kt
@Component
class NotificationConsumer(
private val notificationService: NotificationService
) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(
topics = [NotificationProducer.TOPIC],
groupId = "notification-processor"
)
fun processNotification(notification: Notification) {
logger.info("Processing notification: id={}, channel={}", notification.id, notification.channel)
try {
// TODO: Simulate delivery based on channel type
// TODO: Update status to SENT on success
notificationService.updateStatus(notification.id, NotificationStatus.SENT)
logger.info("Notification delivered: id={}", notification.id)
} catch (e: Exception) {
logger.error("Notification delivery failed: id={}", notification.id, e)
notificationService.updateStatus(notification.id, NotificationStatus.FAILED, e.message)
}
}
}

The serialization is configured in application.ymlJsonSerializer on the producer, JsonDeserializer with spring.json.trusted.packages: "com.example.*" on the consumer so it’ll deserialize into your domain types:

src/main/resources/application.yml (kafka section)
spring:
kafka:
bootstrap-servers: localhost:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: notification-processor
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "com.example.*"

Persistence is Spring Data JPA: declare an interface extending JpaRepository<Notification, String> and Spring generates the implementation at runtime. The query methods (findByUserId, findByStatus) are derived — Spring parses the method name into a query. No SQL, no boilerplate.

src/main/kotlin/com/example/notifications/infrastructure/database/NotificationRepository.kt
@Repository
interface NotificationRepository : JpaRepository<Notification, String> {
fun findByUserId(userId: String): List<Notification>
fun findByStatus(status: NotificationStatus): List<Notification>
}

The schema itself is owned by Flyway, not Hibernate — note ddl-auto: validate in application.yml, which means Hibernate only checks the schema matches the entity, it never mutates it. The table comes from a versioned migration:

src/main/resources/db/migration/V1__create_notifications.sql
CREATE TABLE IF NOT EXISTS notifications (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
channel VARCHAR(20) NOT NULL,
body TEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
failure_reason TEXT,
retry_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
sent_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX idx_notifications_user_id ON notifications(user_id);
CREATE INDEX idx_notifications_status ON notifications(status);
CREATE INDEX idx_notifications_created_at ON notifications(created_at);

NotificationCache wraps a StringRedisTemplate for two jobs: caching user preferences (cache-aside) and rate-limiting delivery per user. The scaffold ships a simple counter-with-TTL rate limiter; the full version uses a sliding window.

src/main/kotlin/com/example/notifications/infrastructure/cache/NotificationCache.kt
@Component
class NotificationCache(
private val redisTemplate: StringRedisTemplate
) {
companion object {
private const val RATE_LIMIT_PREFIX = "rate:notify:"
private const val PREFERENCE_PREFIX = "pref:"
}
// Check if the user has exceeded their notification rate limit.
// TODO: implement a sliding-window limiter (Redis sorted set).
fun isRateLimited(userId: String, maxPerHour: Int): Boolean {
return false
}
fun recordNotificationSent(userId: String) {
val key = "$RATE_LIMIT_PREFIX$userId"
redisTemplate.opsForValue().increment(key)
redisTemplate.expire(key, Duration.ofHours(1))
}
}

JWTs are signed and verified by JwtService using the jjwt library. The signing key is derived from a configured secret (@Value("\${jwt.secret:...}") — a Spring property with a default). validateToken returns the Claims or null on any failure, so callers branch on null rather than catching exceptions.

src/main/kotlin/com/example/notifications/infrastructure/auth/JwtService.kt
@Service
class JwtService(
@Value("\${jwt.secret:my-super-secret-key-that-is-at-least-256-bits-long}")
private val secret: String,
@Value("\${jwt.expiration-ms:3600000}")
private val expirationMs: Long
) {
private val key: SecretKey by lazy {
Keys.hmacShaKeyFor(secret.toByteArray())
}
fun generateToken(username: String, role: String): String {
val now = Date()
val expiry = Date(now.time + expirationMs)
return Jwts.builder()
.subject(username)
.claim("role", role)
.issuedAt(now)
.expiration(expiry)
.signWith(key)
.compact()
}
fun validateToken(token: String): Claims? = try {
Jwts.parser()
.verifyWith(key)
.build()
.parseSignedClaims(token)
.payload
} catch (e: Exception) {
null
}
fun getUsernameFromToken(token: String): String? = validateToken(token)?.subject
fun getRoleFromToken(token: String): String? = validateToken(token)?.get("role", String::class.java)
}

SecurityConfig is where it all plugs into Spring Security: a stateless filter chain (no sessions), public routes for /auth/**, /actuator/**, and Swagger, and everything else authenticated. The custom JwtAuthFilter runs before Spring’s username/password filter — it reads the Bearer token, validates it, and on success builds an Authentication with a ROLE_<role> authority and puts it in the SecurityContextHolder (which is what fed authentication.name back in the controller).

src/main/kotlin/com/example/notifications/config/SecurityConfig.kt
@Configuration
@EnableWebSecurity
class SecurityConfig(
private val jwtAuthFilter: JwtAuthFilter
) {
@Bean
fun securityFilterChain(http: HttpSecurity): SecurityFilterChain {
http
.csrf { it.disable() }
.sessionManagement { it.sessionCreationPolicy(SessionCreationPolicy.STATELESS) }
.authorizeHttpRequests { auth ->
auth
.requestMatchers("/auth/**").permitAll()
.requestMatchers("/actuator/**").permitAll()
.requestMatchers("/swagger-ui/**", "/v3/api-docs/**").permitAll()
.anyRequest().authenticated()
}
.addFilterBefore(jwtAuthFilter, UsernamePasswordAuthenticationFilter::class.java)
return http.build()
}
}
@Component
class JwtAuthFilter(
private val jwtService: JwtService
) : OncePerRequestFilter() {
override fun doFilterInternal(
request: HttpServletRequest,
response: HttpServletResponse,
filterChain: FilterChain
) {
val authHeader = request.getHeader("Authorization")
if (authHeader != null && authHeader.startsWith("Bearer ")) {
val token = authHeader.substring(7)
val claims = jwtService.validateToken(token)
if (claims != null) {
val username = claims.subject
val role = claims.get("role", String::class.java) ?: "USER"
val authorities = listOf(SimpleGrantedAuthority("ROLE_$role"))
val authentication = UsernamePasswordAuthenticationToken(username, null, authorities)
SecurityContextHolder.getContext().authentication = authentication
}
}
filterChain.doFilter(request, response)
}
}

The service needs Postgres, Redis, and Kafka running. The course’s shared infra stack provides all three; this exercise also ships its own docker-compose.yml with the full observability stack (Prometheus + Grafana).

  1. Start the infrastructure (Postgres, Redis, Kafka, plus Kafka UI, Prometheus, Grafana):

    Terminal window
    cd shared-infra && docker compose up -d
    # or, from this exercise's own stack:
    docker compose up -d
  2. Run the application (starts on port 8088):

    Terminal window
    ./gradlew bootRun
  3. Mint a JWT (the /auth/token route is unauthenticated, for testing):

    Terminal window
    TOKEN=$(curl -s -X POST http://localhost:8088/auth/token \
    -H "Content-Type: application/json" \
    -d '{"username": "testuser", "role": "USER"}' | jq -r '.token')
  4. Send a notification:

    Terminal window
    curl -X POST http://localhost:8088/api/notifications \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $TOKEN" \
    -d '{
    "userId": "user-1",
    "channel": "email",
    "body": "Welcome to the notification service!",
    "metadata": {"subject": "Welcome"}
    }'
  5. Read it back, check health, and scrape metrics:

    Terminal window
    curl -H "Authorization: Bearer $TOKEN" http://localhost:8088/api/notifications
    curl http://localhost:8088/actuator/health
    curl http://localhost:8088/actuator/prometheus | grep notification

The OpenAPI docs live at http://localhost:8088/swagger-ui.html, the Kafka UI at http://localhost:8090, and Grafana at http://localhost:3000 (admin/admin).

Terminal window
# Unit tests (mocked dependencies)
./gradlew test

Integration tests use Testcontainers, so they require Docker — they spin up real Postgres and Kafka containers per run, giving you confidence the JPA mappings and migrations actually work against the real database.