Skip to content

Event Bus with SharedFlow

Build a type-safe, in-process event bus using SharedFlow — the kind of thing you reach for to decouple components inside a backend service. It’s the same idea as Node.js EventEmitter or a Go channel fan-out, but type-safe: a subscriber asks for events of a particular type and the compiler (plus a runtime filterIsInstance) guarantees that’s all it gets.

The centerpiece is one EventBus class backed by a single MutableSharedFlow<Any>. Everything publishes onto it; subscribers carve out a typed, optionally-filtered view of the stream.

  1. An EventBus class with:

    • publish(event: Any) — a suspend function that emits to all matching subscribers.
    • subscribe<T>(filter: (T) -> Boolean = { true }): Flow<T> — subscribe to events of type T with an optional predicate. The default { true } means “everything of this type”.
    • A configurable replay count so late subscribers still see recent events.
    • Thread safety (you get this for free — SharedFlow is concurrency-safe).
  2. A few event types. Sealed classes are the idiomatic way to model a closed set of variants (think a TS discriminated union, or a Go interface with a fixed set of implementers):

    • UserEvent.LoggedIn / UserEvent.LoggedOut
    • OrderEvent.Created / OrderEvent.Shipped
    • SystemEvent (a plain data class here)
  3. A demo: several subscribers each listening for a different slice of the stream, with one publisher emitting a mix of events.

One publisher emits onto the shared flow; each subscriber gets its own filtered view of the same stream. Nothing is consumed or removed — every subscriber sees every event that matches its type and predicate.

Event bus fan-out
Rendering diagram…

A single-module Gradle project. One Main.kt holds the event types, the bus, and a runnable demo.

  • Directoryevent-bus/
    • build.gradle.kts coroutines dep + run config
    • settings.gradle.kts project name
    • Directorysrc/main/kotlin/
      • Main.kt event types, EventBus, demo

The only runtime dependency is kotlinx-coroutines-core — that’s where SharedFlow and the flow operators live. The test dependencies (turbine, kotlinx-coroutines-test) are there for when you write flow tests; Turbine’s .test { } block makes asserting on emissions clean.

build.gradle.kts
plugins {
kotlin("jvm") version "2.1.0"
application
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
testImplementation("app.cash.turbine:turbine:1.2.0")
testImplementation(kotlin("test"))
}
application {
mainClass.set("MainKt")
}
tasks.test {
useJUnitPlatform()
}
settings.gradle.kts
rootProject.name = "event-bus"

Each event family is a sealed class with data class variants — a closed set of possibilities the compiler knows about, which makes the when blocks in the subscribers exhaustive (no else branch needed). SystemEvent is just a plain data class here since it has a single shape.

src/main/kotlin/Main.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// --- Event types ---
sealed class UserEvent {
data class LoggedIn(val userId: String, val timestamp: Long = System.currentTimeMillis()) : UserEvent()
data class LoggedOut(val userId: String, val timestamp: Long = System.currentTimeMillis()) : UserEvent()
}
sealed class OrderEvent {
data class Created(val orderId: String, val amount: Double) : OrderEvent()
data class Shipped(val orderId: String, val trackingNumber: String) : OrderEvent()
}
data class SystemEvent(val type: String, val message: String)

This is the whole bus — 30 lines. The interesting parts:

  • The backing field is a single MutableSharedFlow<Any>(replay = replay). A SharedFlow is a hot flow: it emits whether or not anyone is listening (unlike a cold Flow<T>, which restarts for each collector). replay controls how many of the most recent events a brand-new subscriber receives on connect — set it to 1 and a late subscriber still sees the last event.
  • publish is suspend because emit suspends if a subscriber with a bounded buffer can’t keep up (back-pressure). For an unbounded/replay-only flow it effectively never suspends.
  • subscribe is inline + reified so filterIsInstance<T>() can check the runtime type — Kotlin erases generics like Java, and reified is what lets the type survive into the function body. The optional filter adds a predicate on top (e.g. only orders over $100). The result is a plain Flow<T> that each collector subscribes to independently.
  • subscribeAll() returns the raw stream via asSharedFlow() (a read-only view — callers can’t emit into your bus). subscriberCount exposes subscriptionCount.value, handy for diagnostics.
src/main/kotlin/Main.kt
// --- Event Bus ---
class EventBus(replay: Int = 0) {
private val _events = MutableSharedFlow<Any>(replay = replay)
/**
* Publish an event to all subscribers.
*/
suspend fun publish(event: Any) {
_events.emit(event)
}
/**
* Subscribe to events of a specific type with optional filtering.
*/
inline fun <reified T> subscribe(noinline filter: (T) -> Boolean = { true }): Flow<T> =
_events
.filterIsInstance<T>()
.filter(filter)
/**
* Subscribe to ALL events (untyped).
*/
fun subscribeAll(): Flow<Any> = _events.asSharedFlow()
/**
* Number of active subscribers.
*/
val subscriberCount: Int
get() = _events.subscriptionCount.value
}

Each subscriber is its own coroutine launched with launch, collecting a different slice of the bus. Notice subscriber 2 narrows to a single variant (OrderEvent.Created) and applies a predicate ({ it.amount > 100.0 }), so the $45 order is silently dropped for it but still seen by the all-orders and audit subscribers. The delay(100) before publishing gives every coroutine time to register — otherwise, with replay = 1, latecomers would only catch the most recent event.

src/main/kotlin/Main.kt
fun main() = runBlocking {
val bus = EventBus(replay = 1) // Replay last event for late subscribers
// Subscriber 1: All user events
val userSub = launch {
bus.subscribe<UserEvent>().collect { event ->
when (event) {
is UserEvent.LoggedIn -> println("[UserSub] Login: ${event.userId}")
is UserEvent.LoggedOut -> println("[UserSub] Logout: ${event.userId}")
}
}
}
// Subscriber 2: Only high-value orders (> $100)
val orderSub = launch {
bus.subscribe<OrderEvent.Created> { it.amount > 100.0 }.collect { event ->
println("[OrderSub] High-value order: ${event.orderId} = \$${event.amount}")
}
}
// Subscriber 3: All order events
val allOrdersSub = launch {
bus.subscribe<OrderEvent>().collect { event ->
when (event) {
is OrderEvent.Created -> println("[AllOrders] Created: ${event.orderId}")
is OrderEvent.Shipped -> println("[AllOrders] Shipped: ${event.orderId} -> ${event.trackingNumber}")
}
}
}
// Subscriber 4: System events
val systemSub = launch {
bus.subscribe<SystemEvent>().collect { event ->
println("[System] ${event.type}: ${event.message}")
}
}
// Subscriber 5: Audit log — all events
val auditLog = mutableListOf<String>()
val auditSub = launch {
bus.subscribeAll().collect { event ->
auditLog.add("[${System.currentTimeMillis()}] $event")
}
}
// Give subscribers time to register
delay(100)
// --- Publish events ---
println("=".repeat(60))
println("Publishing events...")
println("=".repeat(60))
bus.publish(UserEvent.LoggedIn("alice"))
delay(50)
bus.publish(OrderEvent.Created("ORD-001", 250.0))
delay(50)
bus.publish(OrderEvent.Created("ORD-002", 45.0)) // Below $100 filter
delay(50)
bus.publish(UserEvent.LoggedIn("bob"))
delay(50)
bus.publish(OrderEvent.Shipped("ORD-001", "TRACK-XYZ"))
delay(50)
bus.publish(SystemEvent("HEALTH", "All services nominal"))
delay(50)
bus.publish(UserEvent.LoggedOut("alice"))
delay(50)
// Let everything process
delay(200)
// Print audit log
println("=".repeat(60))
println("Audit Log (${auditLog.size} events):")
auditLog.forEach { println(" $it") }
// Cleanup
userSub.cancel()
orderSub.cancel()
allOrdersSub.cancel()
systemSub.cancel()
auditSub.cancel()
println("=".repeat(60))
println("Event bus demo complete.")
}
Terminal window
./gradlew run

You’ll see each subscriber print its slice of the stream — the OrderSub ignores the $45 order, while AllOrders and the audit log catch everything.

Terminal window
./gradlew test

The turbine and kotlinx-coroutines-test dependencies are wired up for testing flow emissions. Turbine’s flow.test { } extension lets you awaitItem() per emission and assert the bus delivers (and filters) exactly what you expect.