Consensus, Orchestration & Failover
What you will learn: how a failover orchestrator detects a dead leader and elects a new one, and how that workflow is built from goroutines, buffered channels, mutexes, and a tree of cancellable contexts — the way real distributed-systems concurrency is wired in Go.
Prerequisites: this builds directly on concurrency (goroutines, channels, fan-out/fan-in, sync.WaitGroup), context (cancellation, deadlines, context trees), and sync & memory model (mutex, sync.Cond, sync.Map, lock granularity). It also leans on generics, interfaces, and errors, and references gRPC & protobuf and service anatomy.
In the multigres cluster, multiorch is the brain that keeps a PostgreSQL shard available. When the primary dies, it is the component that notices, picks a survivor, and re-points everyone at the new leader. Get that wrong and you get split-brain (two primaries accepting writes) or data loss. So this is where the codebase’s concurrency discipline is at its most deliberate: every goroutine has an owner, every RPC has a deadline tied to a parent context, and shared state is either cloned or guarded.
This page is the most concurrency-dense in the project track, and it rewards a top-down read. We’ll go: the protocol vocabulary, then the three-loop engine that owns the goroutine lifecycles, then the worked election state machine, then the supporting primitives, and finally the safety properties that keep it from corrupting the cluster.
The contract: the appointment protocol
Section titled “The contract: the appointment protocol”Everything the coordinator does is implemented against a handful of gRPC RPCs on the pooler. The appointment protocol itself is small — recruit, then concurrently promote and inform — and it’s worth reading because the shape of the protocol is what makes the concurrency safe.
// Appointment protocol: Recruit, then concurrent Promote (to the leader) and// SetPrimary (to the other cohort members).
// Recruit asks a pooler to revoke all terms below the one specified and// record the coordinator's exclusive claim on that term.rpc Recruit(consensusdata.RecruitRequest) returns (consensusdata.RecruitResponse);
// Promote sends a complete shard-state proposal to a pooler. The designated// leader promotes its postgres; all other cohort members point replication at// the new primary.rpc Promote(consensusdata.PromoteRequest) returns (consensusdata.PromoteResponse);
// SetPrimary tells a pooler which postgres primary to follow at a given// consensus term.rpc SetPrimary(consensusdata.SetPrimaryRequest) returns (consensusdata.SetPrimaryResponse);Two of the messages carry the load-bearing safety semantics:
CoordinatorProposalbundles aTermRevocation(the authority to act — “revoke all terms below N”), the proposed leader address, and the full proposed rule. The coordinator must revoke previous terms before it may propose. That’s how a stale coordinator’s proposals get rejected: their term is below the new revocation floor.SetPrimaryRequestis the idempotency anchor. Its contract is worth quoting verbatim:
// ... if the supplied rule is strictly higher it applies the// change (standby: update primary_conninfo; stale primary: demote), otherwise// it returns success without changes. Idempotent under retries and safe// against out-of-order delivery from stale recovery rounds.That “strictly higher rule” guarantee means a duplicate or late SetPrimary from an earlier, slower recovery round is a harmless no-op. Hold onto that — it’s exactly why the coordinator can safely fire RPCs at nodes it isn’t sure about (see the election section below).
The cohort, term/TermRevocation, quorum, and DurabilityPolicy vocabulary all live in these proto files. See gRPC & protobuf for how the .proto becomes the typed client the coordinator calls.
The engine: three loops, one store, one shutdown context
Section titled “The engine: three loops, one store, one shutdown context”The engine is the simplest possible structure for a job like this: three loops at different cadences, all communicating through a single in-memory store. The loops never call each other directly — they coordinate only through the shared store, which keeps their interactions trivially reasoned-about.
flowchart TB
HC["Healthcheck Loop<br/>(every 5s)"]
RC["Recovery Loop<br/>(every 1s)"]
MT["Maintenance Loop"]
Store[("State Store<br/>(in-memory)")]
HC --> Store
RC --> Store
MT --> Store
subgraph Engine
HC
RC
MT
Store
end
The recovery loop (1s) is where failover lives: it reads health snapshots, detects problems, and acts. The healthcheck loop keeps the store fresh; the maintenance loop handles bookkeeping.
The Engine struct: two mutexes, one shutdown context
Section titled “The Engine struct: two mutexes, one shutdown context”type Engine struct { // ... poolerStore *store.PoolerStore healthStream *HealthStream
mu sync.Mutex // protects shardWatchTargets shardWatchTargets []config.WatchTarget
detectedProblemsMu sync.Mutex detectedProblems []types.Problem // for metrics and gRPC diagnostics
bookkeepingRunner *timer.PeriodicRunner recoveryRunner *timer.PeriodicRunner
// Root of the cancellation tree for the whole service shutdownCtx context.Context cancel context.CancelFunc}Two things are worth internalising here.
Two mutexes, two invariants. mu guards shardWatchTargets (config reload); detectedProblemsMu guards detectedProblems (read by metrics and gRPC handlers). They are not interchangeable. The rule is one mutex per independent piece of mutable state — finer granularity means less contention and clearer reasoning. Using a single lock for both would couple a slow gRPC diagnostics read to a config reload for no reason. See lock granularity.
A single shutdownCtx is the root of the cancellation tree for the entire service. Stop() cancels it, then stops each runner:
func (re *Engine) Stop() { re.cancel() re.bookkeepingRunner.Stop() re.recoveryRunner.Stop() re.poolerWatcher.Stop() re.healthStream.Shutdown()}Everything downstream — every per-RPC deadline in an election — is derived from this one context. Cancel the root and the whole tree unwinds.
PeriodicRunner: the loop primitive that owns the goroutine
Section titled “PeriodicRunner: the loop primitive that owns the goroutine”Each loop is a *timer.PeriodicRunner. This is the canonical “goroutine lifecycle owned by a parent context” idiom, hand-built so that Stop() is synchronous: when it returns, the in-flight callback is guaranteed done.
Its machinery is a small state machine (stopped / running / stopping) guarded by a sync.Mutex plus sync.Cond, with a sync.WaitGroup tracking the one in-flight callback and a context.CancelFunc to interrupt it. Two methods carry the safety reasoning.
execute() implements backpressure:
func (r *PeriodicRunner) execute() { r.mu.Lock() r.wg.Add(1) callback := r.callback ctx := r.ctx // Release lock during callback execution to avoid blocking Stop() r.mu.Unlock()
callback(ctx) // <-- runs WITHOUT the lock held r.wg.Done()
r.mu.Lock() defer r.mu.Unlock() if r.state != running { return } // Schedule next execution only after this one completes (backpressure) r.scheduleNext()}Two deliberate choices:
- The callback runs with the lock released. A recovery cycle can take seconds (it makes RPCs); holding
muacross it would blockStop()and everyRunning()check. - The next tick is scheduled only after the current callback returns. There is never more than one cycle in flight. This backpressure prevents a slow recovery cycle from piling up overlapping cycles that would race each other on the shared store.
Stop() waits outside the lock:
func (r *PeriodicRunner) Stop() { r.mu.Lock() if r.state != running { r.mu.Unlock() return } r.state = stopping if r.cancel != nil { r.cancel() // interrupt the in-flight callback } if r.timer != nil { r.timer.Stop() r.timer = nil } r.mu.Unlock() // <-- release BEFORE waiting
// Wait for any in-flight callback to complete (outside lock to avoid deadlock) r.wg.Wait()
r.mu.Lock() r.state = stopped r.cond.Broadcast() r.mu.Unlock()}The stopping state plus sync.Cond lets a concurrent Start() block until a Stop() fully finishes, so Start/Stop/Start cycles are safe.
From detection to election: the recovery cycle
Section titled “From detection to election: the recovery cycle”The recovery loop’s callback is performRecoveryCycle. It is a classic fan-out / fan-in with sync.WaitGroup: detect problems, group them by shard, then process each shard concurrently.
flowchart TB
Start(["Recovery tick (1s)"]) --> Detect["Run analyzers over store"]
Detect --> Group["Group problems by shard"]
Group --> FanOut{"For each shard"}
FanOut -->|goroutine| S1["processShardProblems"]
FanOut -->|goroutine| S2["processShardProblems"]
FanOut -->|goroutine| S3["processShardProblems"]
S1 --> Recheck["recheckProblem<br/>(re-validate vs fresh state)"]
S2 --> Recheck
S3 --> Recheck
Recheck --> Gone{"Still exists?"}
Gone -->|No, self-healed| Skip(["Skip"])
Gone -->|Yes| Bound["WithTimeout(action.Timeout)"]
Bound --> Exec["RecoveryAction.Execute<br/>(AppointLeader)"]
Exec --> Wait["wg.Wait() — cycle is synchronous"]
Skip --> Wait
// Process each shard independently in parallelvar wg sync.WaitGroupfor _, shardProblems := range problemsByShard { wg.Add(1) go func(problems []types.Problem) { defer wg.Done() re.processShardProblems(ctx, problems[0].ShardKey, problems) }(shardProblems)}wg.Wait()Shards are independent failure domains, so they recover in parallel; wg.Wait() makes the whole cycle synchronous, so the backpressure above measures real cycle time.
Detection itself is interface-driven. analysis.DefaultAnalyzers(...) returns analyzers that each Analyze a shard view and emit problems. The “leader is dead” decision lives in its own analyzer, and it is mostly suppression of false positives — a great window into distributed-systems edge cases. It declines to fail over if, for example, replicas are still connected to the supposedly dead leader, or if the “deadness” is just TCP-keepalive staleness:
// Suppress failover during a known pg_promote() window.When it does fire, it emits one shard-scoped problem at PriorityEmergency whose recovery action is an AppointLeaderAction. That action is the bridge into consensus.
attemptRecovery: re-validate, then bound the action
Section titled “attemptRecovery: re-validate, then bound the action”Before acting on a detected problem, attemptRecovery re-checks it against fresh store state, then wraps the action in a deadline:
// Force re-poll to validate the problem still existsstillExists, err := re.recheckProblem(ctx, problem)// ... if !stillExists: skip ...
// Execute recovery action under a bounded contextctx, cancel := context.WithTimeout(ctx, problem.RecoveryAction.Metadata().Timeout)defer cancel()err = problem.RecoveryAction.Execute(ctx, problem)recheckProblem is an idempotency guard: in the roughly one second between detection and action, the cluster may have self-healed (or another orchestrator may have acted). Acting on stale state is how you cause unnecessary failovers, so it re-runs analysis on current store state and bails if the problem is gone.
The action’s timeout comes from its own metadata. For appointment:
Timeout: 2*timeouts.RuleWriteTimeout + 5*time.Second,This is context-tree composition done right. The action context must be long enough to cover its two sequential RPC phases (Recruit, then Promote/SetPrimary), each bounded by RuleWriteTimeout, plus margin. Size it shorter and the outer deadline would cancel a phase mid-flight; detach it (use context.Background()) and a Stop() couldn’t cancel an in-flight election. See context.
AppointLeaderAction.Execute gathers the cohort, re-verifies that no healthy primary already exists, then calls into the coordinator. That hands off to the worked example below.
The worked example: the election state machine
Section titled “The worked example: the election state machine”This is the heart of the chapter. coordinatorLedRuleChange.Run implements the appointment protocol as a four-phase state machine. The same code serves both failover and bootstrap, parameterised by callbacks (tryBuildProposal, checkProposalPossible) that the coordinator wires up.
stateDiagram-v2 [*] --> Preflight Preflight --> Aborted: another coordinator recruited recently note right of Preflight checkRecentAcceptance (4s backoff) checkProposalPossible (feasible?) Emit(Started) end note Preflight --> Recruit: ok Recruit --> Collect: goroutine per cohort member, results to buffered chan Collect --> Promote: first viable proposal (fan-in, early commit) Collect --> Failed: no quorum-satisfying proposal Promote --> WaitAll: leader gets Promote, others get SetPrimary WaitAll --> Success: leader Promote succeeded WaitAll --> Failed: leader Promote failed (fail fast) Success --> [*] Failed --> [*]
Phase 1 — fan-out into a buffered channel
Section titled “Phase 1 — fan-out into a buffered channel”func (r *coordinatorLedRuleChange) dispatchRecruit(ctx context.Context, cohort []*multiorchdatapb.PoolerHealthState, revocation *clustermetadatapb.TermRevocation) chan recruitResult { recruited := make(chan recruitResult, len(cohort)) for _, p := range cohort { go func() { recruited <- recruitResult{pooler: p, cs: r.recruit(ctx, p, revocation)} }() } return recruited}One goroutine per cohort member, each writing its result to a channel. The channel capacity is len(cohort) — exactly the number of producers. This is load-bearing, not incidental (see Phase 2).
Phase 2 — fan-in with early commit
Section titled “Phase 2 — fan-in with early commit”func (r *coordinatorLedRuleChange) collectRecruitsAndBuildProposal(/* ... */) (*consensusdatapb.PromoteRequest, string, error) { var err error var p *consensusdatapb.CoordinatorProposal statuses := make([]*clustermetadatapb.ConsensusStatus, 0, len(cohort)) for range len(cohort) { rr := <-recruited if rr.cs == nil { continue } statuses = append(statuses, rr.cs) p, err = r.tryBuildProposal(revocation, statuses) if err == nil { // ... build PromoteRequest, return at the FIRST viable proposal return propReq, leaderKey, nil } } // ...}The coordinator accumulates each recruit’s ConsensusStatus (which carries that node’s WAL position) and, after each arrival, asks tryBuildProposal whether a durability-quorum-satisfying proposal exists yet. The moment one does, it commits to that leader and returns — without waiting for the slowest nodes. This is consensus-correct because only committed writes are durable across the quorum, and the chosen leader carries every committed write by definition.
The coordinator never invents a leader: it picks the most-advanced node that still satisfies the durability quorum. The trade-off: uncommitted WAL on a late-arriving, more-advanced node is discarded (that node gets pg_rewind’d on rejoin) rather than preserved — correct, because uncommitted means not durable.
Phase 3/4 — propose to everyone, wait for all, fail fast on the leader
Section titled “Phase 3/4 — propose to everyone, wait for all, fail fast on the leader”promoteResults := r.dispatchPromote(ctx, cohort, propReq, leaderKey)// Phase 4: Wait for every Promote to return, not just the leader's.leaderErr := r.waitForPromotes(ctx, cohort, promoteResults)dispatchPromote again fans out into a buffered chan promoteResult of size len(cohort): the leader gets Promote, everyone else gets SetPrimary. waitForPromotes then drains all of them, with one asymmetry:
for range len(cohort) { pr := <-promoteResults if pr.err != nil { if pr.isLeader { return pr.err // leader failed: nothing to inform anyone about } // non-leader failure: log a warning, keep waiting }}Per-RPC deadline composition
Section titled “Per-RPC deadline composition”Every leaf RPC wraps the inherited context:
func (r *coordinatorLedRuleChange) recruit(ctx context.Context, /* ... */) *clustermetadatapb.ConsensusStatus { rpcCtx, cancel := context.WithTimeout(ctx, timeouts.RuleWriteTimeout) defer cancel() resp, err := r.coordinator.rpcClient.Recruit(rpcCtx, p.MultiPooler, /* ... */) // ...}The full chain is engine.shutdownCtx → recovery cycle ctx → attemptRecovery’s WithTimeout(ctx, action.Timeout) → per-RPC WithTimeout(ctx, RuleWriteTimeout). Cancel the engine and every in-flight Recruit/Promote unwinds. The defer cancel() on each RPC releases its timer immediately on return, so timers don’t pile up. This is the context tree from context made concrete.
Supporting concurrency primitives
Section titled “Supporting concurrency primitives”ProtoStore[K, V] — thread-safe shared state via deep-clone
Section titled “ProtoStore[K, V] — thread-safe shared state via deep-clone”The store every loop reads and writes is generic:
type ProtoStore[K comparable, V proto.Message] struct { mu sync.Mutex items map[K]V}A sync.Mutex plus a map, where every Get/Set/Range deep-clones via proto.Clone:
func (s *ProtoStore[K, V]) Get(key K) (V, bool) { s.mu.Lock() defer s.mu.Unlock() v, ok := s.items[key] if !ok { var zero V return zero, false } // Return a deep clone so callers get an isolated copy cloned := proto.Clone(v).(V) return cloned, true}This is the defend against shared mutable state by handing out copies pattern. A caller can mutate what it gets back without ever touching the canonical copy under the lock, so there is no aliasing between goroutines. It uses generics with a proto.Message constraint, so the same store works for every proto type (see generics).
Why mutex+map and not sync.Map? sync.Map is tuned for write-once-read-many, but the recovery engine is write-heavy (health updates every few seconds, topology churn), where sync.Map’s optimisations don’t pay off.
DoUpdate gives an atomic read-modify-write under one lock acquisition:
func (s *ProtoStore[K, V]) DoUpdate(key K, fn func(value V) V) { s.mu.Lock() defer s.mu.Unlock() // Skip the update if the key doesn't exist to avoid accidentally creating new entries. if v, ok := s.items[key]; ok { var zero V if newValue := fn(v); any(newValue) != any(zero) { s.items[key] = newValue } }}Compare this to the coordinator’s policyCache, a sync.Map: a durability-policy-per-database cache that is written once and read many times — a legitimate sync.Map use. Same codebase, two different shared-state tools chosen by access pattern. That’s the lesson.
HealthStream — a long-lived goroutine per pooler
Section titled “HealthStream — a long-lived goroutine per pooler”Detection needs fresh data, which comes from one long-lived gRPC stream per pooler. Start launches it:
hs.wg.Go(func() { defer func() { hs.mu.Lock() delete(hs.streams, poolerID) hs.mu.Unlock() }() hs.runStream(ctx, poolerID, entry)})The goroutine removes itself from the map via defer when it exits, so Stop only has to call entry.cancel() and let the goroutine clean up — no shared deletion race. runStream loops over a retry iterator and reads current pooler metadata from the store on each reconnect (so host/port changes are picked up):
r := retry.New(streamReconnectInitialBackoff, streamReconnectMaxBackoff, retry.WithInitialDelay())for _, err := range r.Attempts(ctx) { if err != nil { return } // ... streamOnce(...) ; on success r.Reset() ...}The subtle part is the staleness watchdog inside streamOnce:
watchdogCtx, cancelWatchdog := context.WithCancel(ctx)defer cancelWatchdog()
// resetCh carries the new timer duration whenever a message is received.// Buffered so the recv loop never blocks on the watchdog goroutine.resetCh := make(chan time.Duration, 1)go func() { current := initialStaleness timer := time.NewTimer(current) defer timer.Stop() for { select { case d := <-resetCh: current = d if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(current) case <-timer.C: // no message in time -> cancel the stream cancelWatchdog() return case <-watchdogCtx.Done(): return } }}()stream, err := hs.rpcClient.ManagerHealthStream(watchdogCtx, poolerHealth.MultiPooler)timeouts — deadlines as documented constants
Section titled “timeouts — deadlines as documented constants”All the deadlines live in one place, with rationale comments. The numbers were tuned against real failover tests:
const RuleWriteTimeout = 30 * time.Second // per election RPCconst DefaultHealthStreamStalenessTimeout = 90 * time.Second // watchdog windowconst DefaultSnapshotInterval = 5 * time.Second // proactive snapshot rateconst PollResponseWait = 500 * time.Millisecond // cap on poll-for-fresh-snapshotCentralising them keeps the action timeout (2*RuleWriteTimeout + 5s) and the per-RPC timeout in lockstep — change one constant and the relationship holds.
Observability and the anti-split-brain safeguards
Section titled “Observability and the anti-split-brain safeguards”eventlog — typed lifecycle events
Section titled “eventlog — typed lifecycle events”Run brackets the election with structured events through a tiny interface-driven logger:
type Event interface { EventType() string LogAttrs() []slog.Attr}
func Emit(ctx context.Context, logger *slog.Logger, outcome Outcome, event Event, extra ...any) { level := slog.LevelInfo if outcome == Failed { level = slog.LevelError } // ... logger.LogAttrs(ctx, level, "multigres.event", attrs...)}Outcome is Started/Success/Failed; Failed logs at ERROR, the rest at INFO; ctx is threaded for OpenTelemetry correlation (see mterrors & observability). The interface lets any typed event plug in — see interfaces.
The event for a promotion is PrimaryPromotion:
type PrimaryPromotion struct { NewPrimary string ProposedTerm int64 Reason string RecruitMs *int64 PromoteMs *int64}RecruitMs/PromoteMs are pointers so they can be “not applicable” (nil) on the Started event and on a recruitment failure that never reached the promote phase. Run splits latency into the two phases using a monotonic clock (time.Since), so wall-clock adjustments don’t corrupt the numbers. It emits Started before any RPC and Success/Failed at the end.
Idempotency is enforced at every layer
Section titled “Idempotency is enforced at every layer”Split-brain prevention is not assumed — it’s defended at multiple independent points, and removing any one re-opens the door:
| Guard | What it prevents |
|---|---|
checkRecentAcceptance (4s backoff) | Two coordinators recruiting at once → thrashing |
checkProposalPossible pre-vote | Starting a recruit round that can’t reach quorum |
recheckProblem against fresh state | Acting on a problem that already self-healed |
SetPrimary applies only strictly-higher rule | A stale/duplicate RPC demoting the new primary |
TermRevocation floor | A stale coordinator’s proposals being accepted |
// checkRecentAcceptance: back off if another coordinator may be mid-electionconst backoffWindow = 4 * time.Second// ... if a cohort member accepted a revocation within 4s, return UNAVAILABLE and abort.AppointLeader is documented idempotent precisely because these guards make a retried election safe. That’s what lets attemptRecovery mark the action retryable.
Checkpoints
Section titled “Checkpoints”Why is the recruited channel in dispatchRecruit buffered with capacity len(cohort), and what fails if it’s unbuffered?
collectRecruitsAndBuildProposal commits at the first viable proposal and stops reading. The goroutines for slow nodes still need to send their one result and exit. With a buffer of len(cohort) (one slot per producer) every goroutine can send and return even after the consumer left. Unbuffered, those goroutines would block forever on the send — a leaked goroutine per slow node on every election.Why does PeriodicRunner.Stop() call r.wg.Wait() after releasing r.mu?
execute() re-acquires r.mu after its callback finishes. If Stop() held r.mu while waiting on the WaitGroup, execute() would block forever trying to acquire r.mu and the WaitGroup would never reach zero — deadlock. Cancel under the lock (to interrupt), but wait outside it.Phase 4 waits for every cohort member’s RPC but returns immediately if the leader’s fails. Why the asymmetry?
The rule change commits when the leader’sPromote succeeds. Non-leader SetPrimarys still need to complete so those nodes learn the new primary (avoiding re-wiring next cycle), so we wait for them. But if the leader fails, there is no new primary for anyone to learn about, so waiting buys nothing — fail fast.Why use a mutex+map ProtoStore for pooler state but a sync.Map for the policy cache?
Access pattern. Pooler state is write-heavy (health/topology churn every few seconds), where sync.Map’s write-once-read-many optimisations don’t help and proto.Clone-on-access is what’s actually needed. The policy cache is written once per database and read many times — the textbook sync.Map case.Exercises
Section titled “Exercises”-
Trace the failover end-to-end. Starting from a dead leader, follow the path through the real files and write down, at each hop, which context deadline is set and which goroutines are spawned:
leader_is_dead_analyzer.goAnalyze→recovery_loop.goperformRecoveryCycle(thesync.WaitGroupfan-out) →attemptRecovery(recheckProblem+WithTimeout) →actions/appoint_leader.goExecute→consensus/coordinator.goAppointLeader/runFailover→consensus/rule_change.goRun(the four phases). -
Concurrency audit of
rule_change.go. In your own words, explain (a) whyrecruitedis buffered tolen(cohort), (b) what would deadlock or leak if it were unbuffered, and (c) whycollectRecruitsAndBuildProposalmay return before reading alllen(cohort)results. -
Lock-discipline diagram. Read
periodic_runner.goStop()andexecute(). Draw the lock acquire/release timeline for aStop()that races an in-flight callback, and explain exactly where a deadlock would appear ifwg.Wait()were moved inside the held lock. Then define “backpressure” in this context and name the bug it prevents under a slow recovery cycle. -
Deadline-chain grep. Run
grep -rn "context.WithTimeout" go/services/multiorch. For each hit, identify the parent context and the timeout value, then explain how the chain ties back toengine.shutdownCtxso aStop()cancels an in-flight election. Cross-check thatappoint_leader.go’sMetadata().Timeout(2*RuleWriteTimeout + 5s) is at least the two sequentialRuleWriteTimeoutphases inrule_change.go. -
Extend the event model (read-only). Sketch a new typed
Eventforevents.go(e.g. a hypotheticalCohortReconcile) implementingEventType()/LogAttrs(), and point at one place in the recovery actions whereEmit(ctx, logger, Started/Success/Failed, ...)is already used as a template. Describe the diff; don’t write the file.
Continue to mterrors & observability to see how Emit, OpenTelemetry spans, and mterrors wrapping turn this subsystem’s events into traces and structured errors.