Skip to content

Consensus, Orchestration & Failover

What you will learn: how a failover orchestrator detects a dead leader and elects a new one, and how that workflow is built from goroutines, buffered channels, mutexes, and a tree of cancellable contexts — the way real distributed-systems concurrency is wired in Go.

Prerequisites: this builds directly on concurrency (goroutines, channels, fan-out/fan-in, sync.WaitGroup), context (cancellation, deadlines, context trees), and sync & memory model (mutex, sync.Cond, sync.Map, lock granularity). It also leans on generics, interfaces, and errors, and references gRPC & protobuf and service anatomy.


In the multigres cluster, multiorch is the brain that keeps a PostgreSQL shard available. When the primary dies, it is the component that notices, picks a survivor, and re-points everyone at the new leader. Get that wrong and you get split-brain (two primaries accepting writes) or data loss. So this is where the codebase’s concurrency discipline is at its most deliberate: every goroutine has an owner, every RPC has a deadline tied to a parent context, and shared state is either cloned or guarded.

This page is the most concurrency-dense in the project track, and it rewards a top-down read. We’ll go: the protocol vocabulary, then the three-loop engine that owns the goroutine lifecycles, then the worked election state machine, then the supporting primitives, and finally the safety properties that keep it from corrupting the cluster.


Everything the coordinator does is implemented against a handful of gRPC RPCs on the pooler. The appointment protocol itself is small — recruit, then concurrently promote and inform — and it’s worth reading because the shape of the protocol is what makes the concurrency safe.

proto/consensusservice.proto
// Appointment protocol: Recruit, then concurrent Promote (to the leader) and
// SetPrimary (to the other cohort members).
// Recruit asks a pooler to revoke all terms below the one specified and
// record the coordinator's exclusive claim on that term.
rpc Recruit(consensusdata.RecruitRequest) returns (consensusdata.RecruitResponse);
// Promote sends a complete shard-state proposal to a pooler. The designated
// leader promotes its postgres; all other cohort members point replication at
// the new primary.
rpc Promote(consensusdata.PromoteRequest) returns (consensusdata.PromoteResponse);
// SetPrimary tells a pooler which postgres primary to follow at a given
// consensus term.
rpc SetPrimary(consensusdata.SetPrimaryRequest) returns (consensusdata.SetPrimaryResponse);

Two of the messages carry the load-bearing safety semantics:

  • CoordinatorProposal bundles a TermRevocation (the authority to act — “revoke all terms below N”), the proposed leader address, and the full proposed rule. The coordinator must revoke previous terms before it may propose. That’s how a stale coordinator’s proposals get rejected: their term is below the new revocation floor.
  • SetPrimaryRequest is the idempotency anchor. Its contract is worth quoting verbatim:
proto/consensusdata.proto
// ... if the supplied rule is strictly higher it applies the
// change (standby: update primary_conninfo; stale primary: demote), otherwise
// it returns success without changes. Idempotent under retries and safe
// against out-of-order delivery from stale recovery rounds.

That “strictly higher rule” guarantee means a duplicate or late SetPrimary from an earlier, slower recovery round is a harmless no-op. Hold onto that — it’s exactly why the coordinator can safely fire RPCs at nodes it isn’t sure about (see the election section below).

The cohort, term/TermRevocation, quorum, and DurabilityPolicy vocabulary all live in these proto files. See gRPC & protobuf for how the .proto becomes the typed client the coordinator calls.


The engine: three loops, one store, one shutdown context

Section titled “The engine: three loops, one store, one shutdown context”

The engine is the simplest possible structure for a job like this: three loops at different cadences, all communicating through a single in-memory store. The loops never call each other directly — they coordinate only through the shared store, which keeps their interactions trivially reasoned-about.

The three-loop engine
Rendering diagram…

The recovery loop (1s) is where failover lives: it reads health snapshots, detects problems, and acts. The healthcheck loop keeps the store fresh; the maintenance loop handles bookkeeping.

The Engine struct: two mutexes, one shutdown context

Section titled “The Engine struct: two mutexes, one shutdown context”
go/services/multiorch/recovery/engine.go
type Engine struct {
// ...
poolerStore *store.PoolerStore
healthStream *HealthStream
mu sync.Mutex // protects shardWatchTargets
shardWatchTargets []config.WatchTarget
detectedProblemsMu sync.Mutex
detectedProblems []types.Problem // for metrics and gRPC diagnostics
bookkeepingRunner *timer.PeriodicRunner
recoveryRunner *timer.PeriodicRunner
// Root of the cancellation tree for the whole service
shutdownCtx context.Context
cancel context.CancelFunc
}

Two things are worth internalising here.

Two mutexes, two invariants. mu guards shardWatchTargets (config reload); detectedProblemsMu guards detectedProblems (read by metrics and gRPC handlers). They are not interchangeable. The rule is one mutex per independent piece of mutable state — finer granularity means less contention and clearer reasoning. Using a single lock for both would couple a slow gRPC diagnostics read to a config reload for no reason. See lock granularity.

A single shutdownCtx is the root of the cancellation tree for the entire service. Stop() cancels it, then stops each runner:

go/services/multiorch/recovery/engine.go
func (re *Engine) Stop() {
re.cancel()
re.bookkeepingRunner.Stop()
re.recoveryRunner.Stop()
re.poolerWatcher.Stop()
re.healthStream.Shutdown()
}

Everything downstream — every per-RPC deadline in an election — is derived from this one context. Cancel the root and the whole tree unwinds.

PeriodicRunner: the loop primitive that owns the goroutine

Section titled “PeriodicRunner: the loop primitive that owns the goroutine”

Each loop is a *timer.PeriodicRunner. This is the canonical “goroutine lifecycle owned by a parent context” idiom, hand-built so that Stop() is synchronous: when it returns, the in-flight callback is guaranteed done.

Its machinery is a small state machine (stopped / running / stopping) guarded by a sync.Mutex plus sync.Cond, with a sync.WaitGroup tracking the one in-flight callback and a context.CancelFunc to interrupt it. Two methods carry the safety reasoning.

execute() implements backpressure:

go/tools/timer/periodic_runner.go
func (r *PeriodicRunner) execute() {
r.mu.Lock()
r.wg.Add(1)
callback := r.callback
ctx := r.ctx
// Release lock during callback execution to avoid blocking Stop()
r.mu.Unlock()
callback(ctx) // <-- runs WITHOUT the lock held
r.wg.Done()
r.mu.Lock()
defer r.mu.Unlock()
if r.state != running {
return
}
// Schedule next execution only after this one completes (backpressure)
r.scheduleNext()
}

Two deliberate choices:

  • The callback runs with the lock released. A recovery cycle can take seconds (it makes RPCs); holding mu across it would block Stop() and every Running() check.
  • The next tick is scheduled only after the current callback returns. There is never more than one cycle in flight. This backpressure prevents a slow recovery cycle from piling up overlapping cycles that would race each other on the shared store.

Stop() waits outside the lock:

go/tools/timer/periodic_runner.go
func (r *PeriodicRunner) Stop() {
r.mu.Lock()
if r.state != running {
r.mu.Unlock()
return
}
r.state = stopping
if r.cancel != nil {
r.cancel() // interrupt the in-flight callback
}
if r.timer != nil {
r.timer.Stop()
r.timer = nil
}
r.mu.Unlock() // <-- release BEFORE waiting
// Wait for any in-flight callback to complete (outside lock to avoid deadlock)
r.wg.Wait()
r.mu.Lock()
r.state = stopped
r.cond.Broadcast()
r.mu.Unlock()
}

The stopping state plus sync.Cond lets a concurrent Start() block until a Stop() fully finishes, so Start/Stop/Start cycles are safe.


From detection to election: the recovery cycle

Section titled “From detection to election: the recovery cycle”

The recovery loop’s callback is performRecoveryCycle. It is a classic fan-out / fan-in with sync.WaitGroup: detect problems, group them by shard, then process each shard concurrently.

The recovery cycle
Rendering diagram…
go/services/multiorch/recovery/recovery_loop.go
// Process each shard independently in parallel
var wg sync.WaitGroup
for _, shardProblems := range problemsByShard {
wg.Add(1)
go func(problems []types.Problem) {
defer wg.Done()
re.processShardProblems(ctx, problems[0].ShardKey, problems)
}(shardProblems)
}
wg.Wait()

Shards are independent failure domains, so they recover in parallel; wg.Wait() makes the whole cycle synchronous, so the backpressure above measures real cycle time.

Detection itself is interface-driven. analysis.DefaultAnalyzers(...) returns analyzers that each Analyze a shard view and emit problems. The “leader is dead” decision lives in its own analyzer, and it is mostly suppression of false positives — a great window into distributed-systems edge cases. It declines to fail over if, for example, replicas are still connected to the supposedly dead leader, or if the “deadness” is just TCP-keepalive staleness:

go/services/multiorch/recovery/analysis/leader_is_dead_analyzer.go
// Suppress failover during a known pg_promote() window.

When it does fire, it emits one shard-scoped problem at PriorityEmergency whose recovery action is an AppointLeaderAction. That action is the bridge into consensus.

attemptRecovery: re-validate, then bound the action

Section titled “attemptRecovery: re-validate, then bound the action”

Before acting on a detected problem, attemptRecovery re-checks it against fresh store state, then wraps the action in a deadline:

go/services/multiorch/recovery/recovery_loop.go
// Force re-poll to validate the problem still exists
stillExists, err := re.recheckProblem(ctx, problem)
// ... if !stillExists: skip ...
// Execute recovery action under a bounded context
ctx, cancel := context.WithTimeout(ctx, problem.RecoveryAction.Metadata().Timeout)
defer cancel()
err = problem.RecoveryAction.Execute(ctx, problem)

recheckProblem is an idempotency guard: in the roughly one second between detection and action, the cluster may have self-healed (or another orchestrator may have acted). Acting on stale state is how you cause unnecessary failovers, so it re-runs analysis on current store state and bails if the problem is gone.

The action’s timeout comes from its own metadata. For appointment:

go/services/multiorch/recovery/actions/appoint_leader.go
Timeout: 2*timeouts.RuleWriteTimeout + 5*time.Second,

This is context-tree composition done right. The action context must be long enough to cover its two sequential RPC phases (Recruit, then Promote/SetPrimary), each bounded by RuleWriteTimeout, plus margin. Size it shorter and the outer deadline would cancel a phase mid-flight; detach it (use context.Background()) and a Stop() couldn’t cancel an in-flight election. See context.

AppointLeaderAction.Execute gathers the cohort, re-verifies that no healthy primary already exists, then calls into the coordinator. That hands off to the worked example below.


The worked example: the election state machine

Section titled “The worked example: the election state machine”

This is the heart of the chapter. coordinatorLedRuleChange.Run implements the appointment protocol as a four-phase state machine. The same code serves both failover and bootstrap, parameterised by callbacks (tryBuildProposal, checkProposalPossible) that the coordinator wires up.

The appointment state machine
Rendering diagram…

Phase 1 — fan-out into a buffered channel

Section titled “Phase 1 — fan-out into a buffered channel”
go/services/multiorch/consensus/rule_change.go
func (r *coordinatorLedRuleChange) dispatchRecruit(ctx context.Context, cohort []*multiorchdatapb.PoolerHealthState, revocation *clustermetadatapb.TermRevocation) chan recruitResult {
recruited := make(chan recruitResult, len(cohort))
for _, p := range cohort {
go func() {
recruited <- recruitResult{pooler: p, cs: r.recruit(ctx, p, revocation)}
}()
}
return recruited
}

One goroutine per cohort member, each writing its result to a channel. The channel capacity is len(cohort)exactly the number of producers. This is load-bearing, not incidental (see Phase 2).

go/services/multiorch/consensus/rule_change.go
func (r *coordinatorLedRuleChange) collectRecruitsAndBuildProposal(/* ... */) (*consensusdatapb.PromoteRequest, string, error) {
var err error
var p *consensusdatapb.CoordinatorProposal
statuses := make([]*clustermetadatapb.ConsensusStatus, 0, len(cohort))
for range len(cohort) {
rr := <-recruited
if rr.cs == nil {
continue
}
statuses = append(statuses, rr.cs)
p, err = r.tryBuildProposal(revocation, statuses)
if err == nil {
// ... build PromoteRequest, return at the FIRST viable proposal
return propReq, leaderKey, nil
}
}
// ...
}

The coordinator accumulates each recruit’s ConsensusStatus (which carries that node’s WAL position) and, after each arrival, asks tryBuildProposal whether a durability-quorum-satisfying proposal exists yet. The moment one does, it commits to that leader and returns — without waiting for the slowest nodes. This is consensus-correct because only committed writes are durable across the quorum, and the chosen leader carries every committed write by definition.

The coordinator never invents a leader: it picks the most-advanced node that still satisfies the durability quorum. The trade-off: uncommitted WAL on a late-arriving, more-advanced node is discarded (that node gets pg_rewind’d on rejoin) rather than preserved — correct, because uncommitted means not durable.

Phase 3/4 — propose to everyone, wait for all, fail fast on the leader

Section titled “Phase 3/4 — propose to everyone, wait for all, fail fast on the leader”
go/services/multiorch/consensus/rule_change.go
promoteResults := r.dispatchPromote(ctx, cohort, propReq, leaderKey)
// Phase 4: Wait for every Promote to return, not just the leader's.
leaderErr := r.waitForPromotes(ctx, cohort, promoteResults)

dispatchPromote again fans out into a buffered chan promoteResult of size len(cohort): the leader gets Promote, everyone else gets SetPrimary. waitForPromotes then drains all of them, with one asymmetry:

go/services/multiorch/consensus/rule_change.go
for range len(cohort) {
pr := <-promoteResults
if pr.err != nil {
if pr.isLeader {
return pr.err // leader failed: nothing to inform anyone about
}
// non-leader failure: log a warning, keep waiting
}
}

Every leaf RPC wraps the inherited context:

go/services/multiorch/consensus/rule_change.go
func (r *coordinatorLedRuleChange) recruit(ctx context.Context, /* ... */) *clustermetadatapb.ConsensusStatus {
rpcCtx, cancel := context.WithTimeout(ctx, timeouts.RuleWriteTimeout)
defer cancel()
resp, err := r.coordinator.rpcClient.Recruit(rpcCtx, p.MultiPooler, /* ... */)
// ...
}

The full chain is engine.shutdownCtx → recovery cycle ctxattemptRecovery’s WithTimeout(ctx, action.Timeout) → per-RPC WithTimeout(ctx, RuleWriteTimeout). Cancel the engine and every in-flight Recruit/Promote unwinds. The defer cancel() on each RPC releases its timer immediately on return, so timers don’t pile up. This is the context tree from context made concrete.


ProtoStore[K, V] — thread-safe shared state via deep-clone

Section titled “ProtoStore[K, V] — thread-safe shared state via deep-clone”

The store every loop reads and writes is generic:

go/services/multiorch/store/store.go
type ProtoStore[K comparable, V proto.Message] struct {
mu sync.Mutex
items map[K]V
}

A sync.Mutex plus a map, where every Get/Set/Range deep-clones via proto.Clone:

go/services/multiorch/store/store.go
func (s *ProtoStore[K, V]) Get(key K) (V, bool) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.items[key]
if !ok {
var zero V
return zero, false
}
// Return a deep clone so callers get an isolated copy
cloned := proto.Clone(v).(V)
return cloned, true
}

This is the defend against shared mutable state by handing out copies pattern. A caller can mutate what it gets back without ever touching the canonical copy under the lock, so there is no aliasing between goroutines. It uses generics with a proto.Message constraint, so the same store works for every proto type (see generics).

Why mutex+map and not sync.Map? sync.Map is tuned for write-once-read-many, but the recovery engine is write-heavy (health updates every few seconds, topology churn), where sync.Map’s optimisations don’t pay off.

DoUpdate gives an atomic read-modify-write under one lock acquisition:

go/services/multiorch/store/store.go
func (s *ProtoStore[K, V]) DoUpdate(key K, fn func(value V) V) {
s.mu.Lock()
defer s.mu.Unlock()
// Skip the update if the key doesn't exist to avoid accidentally creating new entries.
if v, ok := s.items[key]; ok {
var zero V
if newValue := fn(v); any(newValue) != any(zero) {
s.items[key] = newValue
}
}
}

Compare this to the coordinator’s policyCache, a sync.Map: a durability-policy-per-database cache that is written once and read many times — a legitimate sync.Map use. Same codebase, two different shared-state tools chosen by access pattern. That’s the lesson.

HealthStream — a long-lived goroutine per pooler

Section titled “HealthStream — a long-lived goroutine per pooler”

Detection needs fresh data, which comes from one long-lived gRPC stream per pooler. Start launches it:

go/services/multiorch/recovery/health_stream.go
hs.wg.Go(func() {
defer func() {
hs.mu.Lock()
delete(hs.streams, poolerID)
hs.mu.Unlock()
}()
hs.runStream(ctx, poolerID, entry)
})

The goroutine removes itself from the map via defer when it exits, so Stop only has to call entry.cancel() and let the goroutine clean up — no shared deletion race. runStream loops over a retry iterator and reads current pooler metadata from the store on each reconnect (so host/port changes are picked up):

go/services/multiorch/recovery/health_stream.go
r := retry.New(streamReconnectInitialBackoff, streamReconnectMaxBackoff, retry.WithInitialDelay())
for _, err := range r.Attempts(ctx) {
if err != nil {
return
}
// ... streamOnce(...) ; on success r.Reset() ...
}

The subtle part is the staleness watchdog inside streamOnce:

go/services/multiorch/recovery/health_stream.go
watchdogCtx, cancelWatchdog := context.WithCancel(ctx)
defer cancelWatchdog()
// resetCh carries the new timer duration whenever a message is received.
// Buffered so the recv loop never blocks on the watchdog goroutine.
resetCh := make(chan time.Duration, 1)
go func() {
current := initialStaleness
timer := time.NewTimer(current)
defer timer.Stop()
for {
select {
case d := <-resetCh:
current = d
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(current)
case <-timer.C:
// no message in time -> cancel the stream
cancelWatchdog()
return
case <-watchdogCtx.Done():
return
}
}
}()
stream, err := hs.rpcClient.ManagerHealthStream(watchdogCtx, poolerHealth.MultiPooler)

timeouts — deadlines as documented constants

Section titled “timeouts — deadlines as documented constants”

All the deadlines live in one place, with rationale comments. The numbers were tuned against real failover tests:

go/common/timeouts/rpc.go
const RuleWriteTimeout = 30 * time.Second // per election RPC
const DefaultHealthStreamStalenessTimeout = 90 * time.Second // watchdog window
const DefaultSnapshotInterval = 5 * time.Second // proactive snapshot rate
const PollResponseWait = 500 * time.Millisecond // cap on poll-for-fresh-snapshot

Centralising them keeps the action timeout (2*RuleWriteTimeout + 5s) and the per-RPC timeout in lockstep — change one constant and the relationship holds.


Observability and the anti-split-brain safeguards

Section titled “Observability and the anti-split-brain safeguards”

Run brackets the election with structured events through a tiny interface-driven logger:

go/common/eventlog/eventlog.go
type Event interface {
EventType() string
LogAttrs() []slog.Attr
}
func Emit(ctx context.Context, logger *slog.Logger, outcome Outcome, event Event, extra ...any) {
level := slog.LevelInfo
if outcome == Failed {
level = slog.LevelError
}
// ... logger.LogAttrs(ctx, level, "multigres.event", attrs...)
}

Outcome is Started/Success/Failed; Failed logs at ERROR, the rest at INFO; ctx is threaded for OpenTelemetry correlation (see mterrors & observability). The interface lets any typed event plug in — see interfaces.

The event for a promotion is PrimaryPromotion:

go/common/eventlog/events.go
type PrimaryPromotion struct {
NewPrimary string
ProposedTerm int64
Reason string
RecruitMs *int64
PromoteMs *int64
}

RecruitMs/PromoteMs are pointers so they can be “not applicable” (nil) on the Started event and on a recruitment failure that never reached the promote phase. Run splits latency into the two phases using a monotonic clock (time.Since), so wall-clock adjustments don’t corrupt the numbers. It emits Started before any RPC and Success/Failed at the end.

Split-brain prevention is not assumed — it’s defended at multiple independent points, and removing any one re-opens the door:

GuardWhat it prevents
checkRecentAcceptance (4s backoff)Two coordinators recruiting at once → thrashing
checkProposalPossible pre-voteStarting a recruit round that can’t reach quorum
recheckProblem against fresh stateActing on a problem that already self-healed
SetPrimary applies only strictly-higher ruleA stale/duplicate RPC demoting the new primary
TermRevocation floorA stale coordinator’s proposals being accepted
go/services/multiorch/consensus/rule_change.go
// checkRecentAcceptance: back off if another coordinator may be mid-election
const backoffWindow = 4 * time.Second
// ... if a cohort member accepted a revocation within 4s, return UNAVAILABLE and abort.

AppointLeader is documented idempotent precisely because these guards make a retried election safe. That’s what lets attemptRecovery mark the action retryable.


Why is the recruited channel in dispatchRecruit buffered with capacity len(cohort), and what fails if it’s unbuffered? collectRecruitsAndBuildProposal commits at the first viable proposal and stops reading. The goroutines for slow nodes still need to send their one result and exit. With a buffer of len(cohort) (one slot per producer) every goroutine can send and return even after the consumer left. Unbuffered, those goroutines would block forever on the send — a leaked goroutine per slow node on every election.
Why does PeriodicRunner.Stop() call r.wg.Wait() after releasing r.mu? execute() re-acquires r.mu after its callback finishes. If Stop() held r.mu while waiting on the WaitGroup, execute() would block forever trying to acquire r.mu and the WaitGroup would never reach zero — deadlock. Cancel under the lock (to interrupt), but wait outside it.
Phase 4 waits for every cohort member’s RPC but returns immediately if the leader’s fails. Why the asymmetry? The rule change commits when the leader’s Promote succeeds. Non-leader SetPrimarys still need to complete so those nodes learn the new primary (avoiding re-wiring next cycle), so we wait for them. But if the leader fails, there is no new primary for anyone to learn about, so waiting buys nothing — fail fast.
Why use a mutex+map ProtoStore for pooler state but a sync.Map for the policy cache? Access pattern. Pooler state is write-heavy (health/topology churn every few seconds), where sync.Map’s write-once-read-many optimisations don’t help and proto.Clone-on-access is what’s actually needed. The policy cache is written once per database and read many times — the textbook sync.Map case.

  1. Trace the failover end-to-end. Starting from a dead leader, follow the path through the real files and write down, at each hop, which context deadline is set and which goroutines are spawned: leader_is_dead_analyzer.go Analyzerecovery_loop.go performRecoveryCycle (the sync.WaitGroup fan-out) → attemptRecovery (recheckProblem + WithTimeout) → actions/appoint_leader.go Executeconsensus/coordinator.go AppointLeader/runFailoverconsensus/rule_change.go Run (the four phases).

  2. Concurrency audit of rule_change.go. In your own words, explain (a) why recruited is buffered to len(cohort), (b) what would deadlock or leak if it were unbuffered, and (c) why collectRecruitsAndBuildProposal may return before reading all len(cohort) results.

  3. Lock-discipline diagram. Read periodic_runner.go Stop() and execute(). Draw the lock acquire/release timeline for a Stop() that races an in-flight callback, and explain exactly where a deadlock would appear if wg.Wait() were moved inside the held lock. Then define “backpressure” in this context and name the bug it prevents under a slow recovery cycle.

  4. Deadline-chain grep. Run grep -rn "context.WithTimeout" go/services/multiorch. For each hit, identify the parent context and the timeout value, then explain how the chain ties back to engine.shutdownCtx so a Stop() cancels an in-flight election. Cross-check that appoint_leader.go’s Metadata().Timeout (2*RuleWriteTimeout + 5s) is at least the two sequential RuleWriteTimeout phases in rule_change.go.

  5. Extend the event model (read-only). Sketch a new typed Event for events.go (e.g. a hypothetical CohortReconcile) implementing EventType()/LogAttrs(), and point at one place in the recovery actions where Emit(ctx, logger, Started/Success/Failed, ...) is already used as a template. Describe the diff; don’t write the file.


Continue to mterrors & observability to see how Emit, OpenTelemetry spans, and mterrors wrapping turn this subsystem’s events into traces and structured errors.