Event Bus with SharedFlow
Build a type-safe, in-process event bus using SharedFlow — the kind of thing
you reach for to decouple components inside a backend service. It’s the same idea
as Node.js EventEmitter or a Go channel fan-out, but type-safe: a subscriber
asks for events of a particular type and the compiler (plus a runtime
filterIsInstance) guarantees that’s all it gets.
What you’ll build
Section titled “What you’ll build”The centerpiece is one EventBus class backed by a single
MutableSharedFlow<Any>. Everything publishes onto it; subscribers carve out a
typed, optionally-filtered view of the stream.
-
An
EventBusclass with:publish(event: Any)— asuspendfunction that emits to all matching subscribers.subscribe<T>(filter: (T) -> Boolean = { true }): Flow<T>— subscribe to events of typeTwith an optional predicate. The default{ true }means “everything of this type”.- A configurable replay count so late subscribers still see recent events.
- Thread safety (you get this for free —
SharedFlowis concurrency-safe).
-
A few event types. Sealed classes are the idiomatic way to model a closed set of variants (think a TS discriminated union, or a Go interface with a fixed set of implementers):
UserEvent.LoggedIn/UserEvent.LoggedOutOrderEvent.Created/OrderEvent.ShippedSystemEvent(a plain data class here)
-
A demo: several subscribers each listening for a different slice of the stream, with one publisher emitting a mix of events.
How the fan-out works
Section titled “How the fan-out works”One publisher emits onto the shared flow; each subscriber gets its own filtered view of the same stream. Nothing is consumed or removed — every subscriber sees every event that matches its type and predicate.
flowchart TB P["Publisher"] -->|"publish(event)"| BUS["EventBus<br/>MutableSharedFlow<Any>"] BUS -->|"filterIsInstance<UserEvent>"| S1["UserEvent subscriber"] BUS -->|"filterIsInstance<OrderEvent.Created> + amount > 100"| S2["High-value order subscriber"] BUS -->|"filterIsInstance<OrderEvent>"| S3["All-orders subscriber"] BUS -->|"filterIsInstance<SystemEvent>"| S4["SystemEvent subscriber"] BUS -->|"asSharedFlow() (untyped)"| S5["Audit log subscriber"]
The worked solution
Section titled “The worked solution”A single-module Gradle project. One Main.kt holds the event types, the bus, and
a runnable demo.
Directoryevent-bus/
- build.gradle.kts coroutines dep + run config
- settings.gradle.kts project name
Directorysrc/main/kotlin/
- Main.kt event types, EventBus, demo
build.gradle.kts
Section titled “build.gradle.kts”The only runtime dependency is kotlinx-coroutines-core — that’s where
SharedFlow and the flow operators live. The test dependencies (turbine,
kotlinx-coroutines-test) are there for when you write flow tests; Turbine’s
.test { } block makes asserting on emissions clean.
plugins { kotlin("jvm") version "2.1.0" application}
repositories { mavenCentral()}
dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1") testImplementation("app.cash.turbine:turbine:1.2.0") testImplementation(kotlin("test"))}
application { mainClass.set("MainKt")}
tasks.test { useJUnitPlatform()}rootProject.name = "event-bus"The event types
Section titled “The event types”Each event family is a sealed class with data class variants — a closed set of
possibilities the compiler knows about, which makes the when blocks in the
subscribers exhaustive (no else branch needed). SystemEvent is just a plain
data class here since it has a single shape.
import kotlinx.coroutines.*import kotlinx.coroutines.flow.*
// --- Event types ---
sealed class UserEvent { data class LoggedIn(val userId: String, val timestamp: Long = System.currentTimeMillis()) : UserEvent() data class LoggedOut(val userId: String, val timestamp: Long = System.currentTimeMillis()) : UserEvent()}
sealed class OrderEvent { data class Created(val orderId: String, val amount: Double) : OrderEvent() data class Shipped(val orderId: String, val trackingNumber: String) : OrderEvent()}
data class SystemEvent(val type: String, val message: String)The EventBus
Section titled “The EventBus”This is the whole bus — 30 lines. The interesting parts:
- The backing field is a single
MutableSharedFlow<Any>(replay = replay). ASharedFlowis a hot flow: it emits whether or not anyone is listening (unlike a coldFlow<T>, which restarts for each collector).replaycontrols how many of the most recent events a brand-new subscriber receives on connect — set it to1and a late subscriber still sees the last event. publishissuspendbecauseemitsuspends if a subscriber with a bounded buffer can’t keep up (back-pressure). For an unbounded/replay-only flow it effectively never suspends.subscribeisinline+reifiedsofilterIsInstance<T>()can check the runtime type — Kotlin erases generics like Java, andreifiedis what lets the type survive into the function body. The optionalfilteradds a predicate on top (e.g. only orders over $100). The result is a plainFlow<T>that each collector subscribes to independently.subscribeAll()returns the raw stream viaasSharedFlow()(a read-only view — callers can’t emit into your bus).subscriberCountexposessubscriptionCount.value, handy for diagnostics.
// --- Event Bus ---
class EventBus(replay: Int = 0) { private val _events = MutableSharedFlow<Any>(replay = replay)
/** * Publish an event to all subscribers. */ suspend fun publish(event: Any) { _events.emit(event) }
/** * Subscribe to events of a specific type with optional filtering. */ inline fun <reified T> subscribe(noinline filter: (T) -> Boolean = { true }): Flow<T> = _events .filterIsInstance<T>() .filter(filter)
/** * Subscribe to ALL events (untyped). */ fun subscribeAll(): Flow<Any> = _events.asSharedFlow()
/** * Number of active subscribers. */ val subscriberCount: Int get() = _events.subscriptionCount.value}The demo
Section titled “The demo”Each subscriber is its own coroutine launched with launch, collecting a
different slice of the bus. Notice subscriber 2 narrows to a single variant
(OrderEvent.Created) and applies a predicate ({ it.amount > 100.0 }), so the
$45 order is silently dropped for it but still seen by the all-orders and audit
subscribers. The delay(100) before publishing gives every coroutine time to
register — otherwise, with replay = 1, latecomers would only catch the most
recent event.
fun main() = runBlocking { val bus = EventBus(replay = 1) // Replay last event for late subscribers
// Subscriber 1: All user events val userSub = launch { bus.subscribe<UserEvent>().collect { event -> when (event) { is UserEvent.LoggedIn -> println("[UserSub] Login: ${event.userId}") is UserEvent.LoggedOut -> println("[UserSub] Logout: ${event.userId}") } } }
// Subscriber 2: Only high-value orders (> $100) val orderSub = launch { bus.subscribe<OrderEvent.Created> { it.amount > 100.0 }.collect { event -> println("[OrderSub] High-value order: ${event.orderId} = \$${event.amount}") } }
// Subscriber 3: All order events val allOrdersSub = launch { bus.subscribe<OrderEvent>().collect { event -> when (event) { is OrderEvent.Created -> println("[AllOrders] Created: ${event.orderId}") is OrderEvent.Shipped -> println("[AllOrders] Shipped: ${event.orderId} -> ${event.trackingNumber}") } } }
// Subscriber 4: System events val systemSub = launch { bus.subscribe<SystemEvent>().collect { event -> println("[System] ${event.type}: ${event.message}") } }
// Subscriber 5: Audit log — all events val auditLog = mutableListOf<String>() val auditSub = launch { bus.subscribeAll().collect { event -> auditLog.add("[${System.currentTimeMillis()}] $event") } }
// Give subscribers time to register delay(100)
// --- Publish events --- println("=".repeat(60)) println("Publishing events...") println("=".repeat(60))
bus.publish(UserEvent.LoggedIn("alice")) delay(50)
bus.publish(OrderEvent.Created("ORD-001", 250.0)) delay(50)
bus.publish(OrderEvent.Created("ORD-002", 45.0)) // Below $100 filter delay(50)
bus.publish(UserEvent.LoggedIn("bob")) delay(50)
bus.publish(OrderEvent.Shipped("ORD-001", "TRACK-XYZ")) delay(50)
bus.publish(SystemEvent("HEALTH", "All services nominal")) delay(50)
bus.publish(UserEvent.LoggedOut("alice")) delay(50)
// Let everything process delay(200)
// Print audit log println("=".repeat(60)) println("Audit Log (${auditLog.size} events):") auditLog.forEach { println(" $it") }
// Cleanup userSub.cancel() orderSub.cancel() allOrdersSub.cancel() systemSub.cancel() auditSub.cancel()
println("=".repeat(60)) println("Event bus demo complete.")}Run it
Section titled “Run it”./gradlew runYou’ll see each subscriber print its slice of the stream — the OrderSub
ignores the $45 order, while AllOrders and the audit log catch everything.
Test it
Section titled “Test it”./gradlew testThe turbine and kotlinx-coroutines-test dependencies are wired up for testing
flow emissions. Turbine’s flow.test { } extension lets you awaitItem() per
emission and assert the bus delivers (and filters) exactly what you expect.