Concurrency: Goroutines & Channels
What you will learn: Go’s concurrency primitives — goroutines, channels, select, sync.WaitGroup, and errgroup — and the fan-out/fan-in, signal-coalescing, watchdog, and worker-loop patterns that show up across a real distributed system. We’ll read the routing, consensus, and pooling code of multigres (“Vitess for Postgres”) to see each pattern in production.
Prerequisites: pointers, values & memory (channels carry pointers and structs by value), errors (errgroup propagates the first error), and ideally interfaces. Cancellation is covered in depth in context; locks and the memory model in sync & the memory model. This page leans on both — read them next or alongside.
Goroutines and the go statement
Section titled “Goroutines and the go statement”A goroutine is a function executing concurrently with other goroutines in the same address space. You start one by prefixing a call with go. The statement returns immediately; the goroutine runs independently and its return value is discarded.
func work(id int) int { return id * 2 }
go work(1) // launched; the returned int is thrown awaygo func() { // common form: an anonymous closure fmt.Println("hi")}()fmt.Println("main continues immediately")There is no handle, no “thread object,” no join. If you want the result, a goroutine’s completion, or an error back, you must arrange a channel, a WaitGroup, or a context — the language gives you nothing automatic.
The cost model
Section titled “The cost model”A goroutine is not an OS thread. It starts with a tiny (a few KB) stack that grows and shrinks on demand, and the Go runtime multiplexes many goroutines onto a small pool of OS threads (GOMAXPROCS of them by default). Creating, parking, and waking a goroutine is cheap — orders of magnitude cheaper than a thread. That’s why the patterns below spawn one goroutine per cohort member or one per RPC without a second thought: a failover involving a dozen poolers launches a dozen goroutines, and that’s entirely normal.
The flip side: cheap to spawn does not mean free to leak. A goroutine blocked forever on a channel that nobody will ever send to (or close) sits in memory until the process exits. Most of this page is really about giving every goroutine a guaranteed exit path.
Loop-variable capture (Go 1.22+)
Section titled “Loop-variable capture (Go 1.22+)”In multigres’s consensus code, one goroutine is launched per cohort member, and the closure captures the loop variable p directly:
recruited := make(chan recruitResult, len(cohort))for _, p := range cohort { go func() { recruited <- recruitResult{pooler: p, cs: r.recruit(ctx, p, revocation)} }()}return recruitedEach goroutine references p — and gets the right p, because since Go 1.22 each iteration of a for ... range (and three-clause for) creates a fresh loop variable. Before 1.22, all goroutines would have captured the same shared p and most would observe the final cohort member; you had to write p := p inside the loop to shadow it.
Channels
Section titled “Channels”A channel is a typed, thread-safe conduit. chan T carries values of type T. Sends (ch <- v) and receives (v := <-ch) synchronize the two goroutines and — crucially — establish a happens-before relationship, so data written before a send is visible after the matching receive (see sync & the memory model).
Unbuffered vs buffered
Section titled “Unbuffered vs buffered”unbuf := make(chan int) // capacity 0: send blocks until a receiver is readybuf := make(chan int, 3) // capacity 3: send blocks only when 3 are queuedAn unbuffered channel is a synchronous rendezvous: the sender blocks until a receiver takes the value, and vice versa. A buffered channel lets the sender deposit up to cap values without a waiting receiver; it blocks only when full, and a receive blocks only when empty.
The single most important sizing decision in this codebase: buffer a results channel to the number of producers so producers never block on send, even if the collector stops reading early.
recruited := make(chan recruitResult, len(cohort))Why len(cohort) and not 0 or some fixed number? The collector reads results in a loop and may return early the moment it can build a valid proposal:
for range len(cohort) { rr := <-recruited if rr.cs == nil { continue } statuses = append(statuses, rr.cs) p, err = r.tryBuildProposal(revocation, statuses) if err == nil { return propReq, leaderKey, nil // <-- early return, stops reading }}If recruited were unbuffered, the recruit goroutines whose results are never read would block forever on recruited <- ... and leak. Because the buffer holds exactly len(cohort) slots, every producer can deposit its result and exit even if the collector walked away after the third one. The slots are reclaimed by GC when the channel becomes unreachable.
Channel direction
Section titled “Channel direction”A channel type can be restricted to send-only (chan<- T) or receive-only (<-chan T) in a function signature. The arrow points the way data flows. This is documentation enforced by the compiler: a function that only consumes a watch stream takes <-chan Event and physically cannot send or close it.
func consume(in <-chan int) { for v := range in { _ = v } } // can only receivefunc produce(out chan<- int) { out <- 1; close(out) } // can only send/closeThe codebase exposes topology watch results this way: conn.WatchRecursive returns (initial, changes, err) where changes is a receive channel the watcher loops over — see the case event, ok := <-changes example below.
The empty-struct signal channel
Section titled “The empty-struct signal channel”struct{} is the zero-width type: it occupies no memory. A chan struct{} is the idiomatic Go “signal” channel — it carries no data, only the event of a send or a close. The codebase uses it for stop signals and wakeups:
type timeoutThread struct { buf *Buffer notifyCh chan struct{} // signaled when the queue changes (enqueue or eviction) stopCh chan struct{} // closed to stop the goroutine}notifyCh <- struct{}{} means “something changed, wake up”; close(stopCh) means “stop.” No bytes cross the channel — the synchronization is the message.
Closing a channel
Section titled “Closing a channel”close(ch) marks a channel done. Three behaviors follow, and all three are load-bearing here:
- A receive on a closed channel returns immediately with the zero value, forever. This makes
closea one-shot broadcast: closestopChonce and every goroutine blocked on<-stopChunblocks at once. - The two-value receive
v, ok := <-chreportsok == falsewhen the channel is closed and drained, so you can tell a real zero value from “closed.” for v := range chreceives until the channel is closed and drained, then ends the loop.
done := make(chan struct{})go func() { <-done // blocks until close(done); zero value, ignored fmt.Println("stopped")}()close(done) // broadcasts to every receiver waiting on doneThe nil channel
Section titled “The nil channel”A nil channel (the zero value of chan T) blocks forever on both send and receive. Inside a select, a case on a nil channel is effectively disabled — never selected. This is occasionally a deliberate tool (set a channel variable to nil to switch off one branch of a select), but far more often a nil channel is an accidental silent deadlock: you forgot to make it, so the goroutine parks and never wakes. If a goroutine seems stuck on a channel op, check whether the channel was actually initialized.
select: multiplexing channel operations
Section titled “select: multiplexing channel operations”select waits on multiple channel operations at once and proceeds with whichever is ready. If several are ready, it picks one at random (preventing starvation). With a default clause, select becomes non-blocking: if no case is ready, default runs immediately.
select {case v := <-ch1: use(v)case ch2 <- x: // sentcase <-ctx.Done(): return ctx.Err()default: // nothing ready right now — don't block}The watch loop: ctx + a closed-detection receive
Section titled “The watch loop: ctx + a closed-detection receive”A discovery loop selects over the context (for shutdown) and a topology watch stream, using the two-value receive to notice when the upstream watch closes:
for { select { case <-gd.ctx.Done(): return case event, ok := <-changes: if !ok { gd.logger.Info("Cell watch channel closed, will reconnect") return } gd.processCellChange(event) }}Two exit paths, both clean: the owner cancels gd.ctx (graceful shutdown), or the upstream watch closes changes (ok == false, time to reconnect). There is no third path where this goroutine blocks indefinitely — that’s what makes it leak-free. The cancellation half of this pattern is the universal contract described in context.
The non-blocking “try-send” wakeup, and coalescing
Section titled “The non-blocking “try-send” wakeup, and coalescing”The most reusable concurrency idiom in this codebase is a buffered(1) channel plus a non-blocking send to wake a worker at most once and coalesce a burst of events into a single wakeup:
// notify signals the timeout thread to re-check the queue head.// Non-blocking: if a notification is already pending, this is a no-op.func (tt *timeoutThread) notify() { select { case tt.notifyCh <- struct{}{}: default: }}notifyCh has capacity 1. The first notify() deposits a token (send succeeds). A second notify() before the worker has drained finds the buffer full, so the send is not ready, so default runs and the call returns immediately. The result: N notifications collapse into one pending wakeup. The worker only needs to know “something changed since I last looked” — it re-reads the actual queue under a lock — so dropping the redundant signals is correct, not lossy.
Discovery uses the identical idiom with a separate notification queue held under a mutex — so a dropped signal is harmless because the data lives in the queue, not the channel:
func (gd *GlobalPoolerDiscovery) queueNotification(notif poolerNotification) { gd.notificationsMu.Lock() gd.notifications = append(gd.notifications, notif) gd.notificationsMu.Unlock()
// Wake up processor (non-blocking - buffered channel) select { case gd.notifySignal <- struct{}{}: default: // already signaled, processor will drain buffer }}The matching consumer waits on the signal, then drains the whole queue in a loop, so one wakeup can process many enqueued notifications (see “Worker / event-loop serialization” below).
Timeouts with time.Timer and the reset dance
Section titled “Timeouts with time.Timer and the reset dance”Selecting on a timer’s channel gives you a timeout branch. time.NewTimer(d) returns a *time.Timer whose C field is a <-chan time.Time that fires once after d. The timeout thread waits for the head entry’s deadline, a notification, or a stop, all at once:
select {case <-timer.C: // head entry's deadline reached — evict it tt.evictHead()case <-tt.notifyCh: // queue changed — re-check head continuecase <-tt.stopCh: return}The subtle part is reusing a timer across loop iterations. You cannot simply call Reset on a timer that has already fired (or might fire concurrently) — a stale tick can be sitting in timer.C and would fire spuriously the next time around. The canonical Stop-then-drain dance handles it:
// Reset the timer. We drain it first to avoid races.if !timer.Stop() { select { case <-timer.C: default: }}timer.Reset(delay)Stop() returns false if the timer had already fired (or was already stopped). In that case there may be a pending value in timer.C, so the inner non-blocking receive drains it — default covers the case where nothing is buffered (someone else already consumed it). Only after the channel is known-empty is Reset safe.
Fan-out / fan-in
Section titled “Fan-out / fan-in”Fan-out spawns N goroutines to do work in parallel; fan-in collects their N results back into one place. The two variants in this codebase differ in how the collector knows it is done.
flowchart LR
C["Caller"] --> D{"fan-out"}
D --> W1["goroutine 1"]
D --> W2["goroutine 2"]
D --> W3["goroutine N"]
W1 -->|send| R[("results channel")]
W2 -->|send| R
W3 -->|send| R
R --> F{"fan-in / collect"}
F --> C
Variant A: count the results (no closer goroutine)
Section titled “Variant A: count the results (no closer goroutine)”When the producer count is known up front, the collector simply reads exactly that many times. This is the recruit/promote dispatch in consensus:
// dispatch: one goroutine per cohort member, buffered to len(cohort)recruited := make(chan recruitResult, len(cohort))for _, p := range cohort { go func() { recruited <- recruitResult{pooler: p, cs: r.recruit(ctx, p, revocation)} }()}
// collect: read exactly len(cohort) resultsfor range len(cohort) { rr := <-recruited // ...}No close is needed and no WaitGroup is needed: the collector reads precisely len(cohort) values, the buffer guarantees no producer blocks, and the channel is never closed (closing is unnecessary when the consumer counts). for range len(cohort) is the Go 1.22 “range over an int” form — it iterates len(cohort) times.
Variant B: drain until close (the closer goroutine)
Section titled “Variant B: drain until close (the closer goroutine)”When the consumer wants to range the channel — or doesn’t know in advance how many values arrive — someone must close the channel so range/the final receive terminates. But no single producer can close it (a producer doesn’t know it’s the last one, and double-close panics). The fix is a dedicated closer goroutine that waits for all producers via a WaitGroup, then closes:
flowchart LR
subgraph Producers
G1["go fn()"]
G2["go fn()"]
G3["go fn()"]
end
G1 -->|wg.Done| WG[("WaitGroup")]
G2 -->|wg.Done| WG
G3 -->|wg.Done| WG
G1 -.->|err only| EC[("errCh (buffered)")]
G2 -.->|err only| EC
G3 -.->|err only| EC
WG -->|wg.Wait| CL["closer goroutine"]
CL -->|close| EC
EC --> RX["return <-errCh"]
errCh := make(chan error, len(h.funcs))var wg sync.WaitGroup
for _, f := range h.funcs { wg.Add(1) go func(fn func() error) { defer wg.Done() if err := fn(); err != nil { errCh <- err } }(f)}
// Close the channel once every goroutine completes.go func() { wg.Wait() close(errCh)}()
// Return the first error, or nil if the channel closes without one.return <-errChThis is fail-fast fan-in: the single <-errCh either returns the first error pushed by a failing func, or — if every func succeeded so nobody sent — blocks until the closer goroutine runs close(errCh), at which point the receive yields the zero value (nil). The closer goroutine is the linchpin: without it, the <-errCh in the all-succeed case would block forever (nobody sends, nobody closes) and leak.
Why does hooks.go need a closer goroutine but rule_change.go doesn’t? Because rule_change.go counts (for range len(cohort)) — it always reads exactly as many values as it sent, so the channel never needs to signal “done.” hooks.go instead wants a single receive that resolves to “first error or nil-on-completion,” and the only way a single <-errCh can mean “everyone finished with no error” is for the channel to be closed — hence the wg.Wait(); close(errCh) goroutine. Different termination contracts, different machinery.
sync.WaitGroup and the Go 1.25 Go method
Section titled “sync.WaitGroup and the Go 1.25 Go method”A WaitGroup is a counter: Add(n) increments, Done() decrements (usually defer-ed), and Wait() blocks until the count returns to zero. It’s the standard way to join a set of goroutines.
The pre-1.25 form (still everywhere in the codebase) is the three-line ritual:
var wg sync.WaitGroupfor _, shardProblems := range problemsByShard { wg.Add(1) go func(problems []types.Problem) { defer wg.Done() re.processShardProblems(ctx, problems[0].ShardKey, problems) }(shardProblems)}wg.Wait()Go 1.25 added WaitGroup.Go, which does the Add(1) / go / defer Done() for you in one call. It still launches a goroutine — it is not synchronous. A shard-status fan-out uses it:
var ( mu sync.Mutex wg sync.WaitGroup)for _, p := range poolers { wg.Go(func() { rpcCtx, cancel := context.WithTimeout(ctx, timeouts.RemoteOperationTimeout) defer cancel() resp, err := c.rpcClient.Status(rpcCtx, p, &multipoolermanagerdatapb.StatusRequest{}) // ... guarded by mu when writing shared results })}wg.Go(fn) is exactly wg.Add(1); go func(){ defer wg.Done(); fn() }(). You can also pass a method value directly — discovery does gd.wg.Go(gd.processNotifications), launching the method as a goroutine.
errgroup: fan-out with first-error propagation
Section titled “errgroup: fan-out with first-error propagation”golang.org/x/sync/errgroup is a WaitGroup that also collects errors and (optionally) cancels a shared context on the first failure. g.Go(func() error {...}) runs a task; g.Wait() blocks until all return and yields the first non-nil error (or nil). errgroup.WithContext(parent) derives a child context that is cancelled the instant any task errors, so siblings can bail out.
A state manager fans the same state-change event out to every component and stops at the first failure:
g, ctx := errgroup.WithContext(ctx)for _, c := range ssm.components { g.Go(func() error { return c.OnStateChange(ctx, poolerType, servingStatus) })}if err := g.Wait(); err != nil { return err}Note that the first line shadows ctx with the derived one, then passes that derived ctx into each OnStateChange. So if component A fails, the derived ctx is cancelled and components B and C — if they honor ctx — abort their in-flight work promptly. That last clause is the whole point:
Sometimes you want errgroup’s parallel-with-combined-error behavior but don’t need the cancellation context. A topology client’s Close() discards the derived context with _:
g, _ := errgroup.WithContext(context.TODO())
g.Go(func() error { if ts.globalTopo != nil { if err := ts.globalTopo.Close(); err != nil { return fmt.Errorf("failed to close global topo: %w", err) } ts.globalTopo = nil } return nil})
g.Go(func() error { // ... close all cell connections ... return nil})err := g.Wait()Discarding the context (g, _ :=) is reasonable here because closing topology connections is short, must run to completion regardless (you want every connection closed even if one errors), and there’s no in-flight operation worth cancelling. Cancelling siblings would be actively wrong — a global-topo close failure should not prevent the cell connections from closing. The context.TODO() is a placeholder signaling “no real context applies.” (Errors are still wrapped with %w; see errors.)
Goroutine lifecycle and leak avoidance
Section titled “Goroutine lifecycle and leak avoidance”Every long-lived goroutine needs an owner and an exit path. The contract this codebase follows: a long-lived goroutine selects on a cancellable context (or a stop channel), and the owner, on shutdown, cancels and then waits. Cancelling alone tells the goroutine to stop; only Wait() confirms it actually has.
A HealthStream is the textbook example. It owns a parent context, its cancel function, and a WaitGroup:
mu sync.Mutexstreams map[topoclient.ComponentID]*streamEntry
// Parent context; cancelled by Stop().ctx context.Contextcancel context.CancelFuncwg sync.WaitGroupStart launches one goroutine per pooler with wg.Go, and the goroutine removes its own map entry via defer before returning:
ctx, cancel := context.WithCancel(hs.ctx) // per-pooler child contextentry := &streamEntry{cancel: cancel}hs.streams[poolerID] = entry
hs.wg.Go(func() { defer func() { hs.mu.Lock() delete(hs.streams, poolerID) hs.mu.Unlock() }() hs.runStream(ctx, poolerID, entry)})Two layers of context: a parent hs.ctx (cancelled to stop everything) and a per-pooler child (cancelled to stop just one stream). The defer-driven self-deletion means the goroutine cleans up its own bookkeeping no matter how it exits, under the same mutex that Stop() uses — so there’s no window where a dead goroutine’s entry lingers.
Shutdown is the canonical cancel-then-wait:
func (hs *HealthStream) Shutdown() { hs.cancel() hs.wg.Wait()}A self-contained watchdog goroutine
Section titled “A self-contained watchdog goroutine”Inside runStream, an inner goroutine implements a staleness watchdog: if no health message arrives within a timeout, it cancels the stream’s context, which makes the blocking stream.Recv() return an error and triggers a reconnect. It combines everything above — a buffered(1) reset channel, the timer drain dance, and a three-way select:
resetCh := make(chan time.Duration, 1)go func() { current := initialStaleness timer := time.NewTimer(current) defer timer.Stop() for { select { case d := <-resetCh: current = d if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(current) case <-timer.C: hs.logger.WarnContext(ctx, "health stream stale", "pooler_id", poolerID) cancelWatchdog() return case <-watchdogCtx.Done(): return } }}()Three branches: a reset (rearm the timer with the new staleness value), a fire (no message in time → cancel the watchdog context, which kills the gRPC stream → return), or the watchdog context being cancelled externally (the stream ended for another reason → return). The recv loop feeds it via the same non-blocking buffered(1) send you saw earlier — resetting the watchdog cannot block the recv loop:
select {case resetCh <- timeout:default: // a reset is already pending; the watchdog will pick it up}The “cancel a context to unblock a blocking Recv” trick is worth internalizing: many blocking I/O calls in Go (gRPC streams, DB queries, HTTP requests) take a context, and cancelling it is how you interrupt them from another goroutine. That’s how this watchdog converts “no data for 90s” into “terminate the stream cleanly.”
Worker / event-loop serialization: synchronization by communication
Section titled “Worker / event-loop serialization: synchronization by communication”A classic alternative to “share state under a mutex” is “give the state to one goroutine and send it requests over a channel.” Go’s motto: do not communicate by sharing memory; share memory by communicating. A LISTEN/NOTIFY Listener is built this way — one reader goroutine and one event-loop writer share a single TCP connection without locking the connection, because only the event loop ever touches it:
flowchart LR
C1["caller A"] -->|request + done| RQ[("requests channel")]
C2["caller B"] -->|request + done| RQ
RQ --> EL["event-loop goroutine<br/>(sole owner)"]
EL -->|exclusive access| RES["TCP conn + subscription state"]
EL -.->|close(req.done)| C1
EL -.->|close(req.done)| C2
type request struct { typ requestType channel string subCh chan *sqltypes.Notification done chan struct{} // closed when the request is processed}Callers submit a request on the serialized requests channel and then block on its done channel until the single event-loop goroutine has processed it:
req := request{typ: reqUnsubscribe, channel: channel, subCh: subCh, done: make(chan struct{})}select {case l.requests <- req: <-req.done // wait for the event loop to process & close(req.done)case <-l.stopped: // listener is shutting down — give up}The done chan struct{} is a per-request completion signal: the event loop runs close(req.done) after applying the command, which unblocks this caller. The outer select against l.stopped ensures the caller never blocks forever if the listener has shut down — a second clean exit path. State (subscription refcounts, the connection) lives entirely inside the event-loop goroutine, so there’s no mutex around it: the channel handoff is the synchronization. Use this pattern when a single resource (a connection, a file, a piece of mutable state) needs serialized access and a lock would force every caller to reason about lock ordering.
Bounding concurrency: semaphore.Weighted vs a buffered channel
Section titled “Bounding concurrency: semaphore.Weighted vs a buffered channel”A buffered channel of capacity N is the simplest counting semaphore (acquire = send, release = receive). This codebase uses the more explicit golang.org/x/sync/semaphore.Weighted when it wants a global resource limit with a non-blocking try-acquire on a hot path. The buffer service caps total in-flight buffered requests this way:
// bufferSizeSema limits the total number of buffered requests globally.bufferSizeSema *semaphore.Weighted// ...bufferSizeSema: semaphore.NewWeighted(int64(config.Size.Get())),On the latency-sensitive admit path it uses TryAcquire, which returns false immediately instead of blocking when full (so it can evict the oldest entry to make room rather than stalling the caller):
if !b.bufferSizeSema.TryAcquire(1) { // Buffer is full. Evict the oldest entry globally to make room. // ...}Slots are returned with b.bufferSizeSema.Release(1) when an entry drains or is evicted. Weighted also supports Acquire(ctx, n) (blocking, context-aware) and weighted acquisitions (n > 1), which a plain buffered channel can’t express cleanly.
Capstone: a channel hand-off race (the connection-pool waitlist)
Section titled “Capstone: a channel hand-off race (the connection-pool waitlist)”The connection pool’s waitlist is the advanced case — it shows why a naive timeout path can lose a connection. A waiter blocks in a select over three things: the pool closing, its context expiring, and an unbuffered channel on which a returner will hand it a connection:
select {case <-closeChan: // ... try to remove ourselves from the list ... if removed { return nil, ErrPoolClosed } // if we couldn't remove ourselves, another goroutine is handing us a connection return <-elem.Value.conn, nil
case <-ctx.Done(): // ... try to remove ourselves from the list ... if removed { return nil, context.Cause(ctx) } // another goroutine is handing us a connection return <-elem.Value.conn, nil
case conn := <-elem.Value.conn: return conn, nil}The race: a waiter’s context can expire at the same instant a returner picks this waiter and sends a connection on elem.Value.conn. The waiter wakes on ctx.Done() and tries to remove itself from the waitlist. If it succeeds, no returner committed to it — return the timeout error. But if it fails to remove itself (removed == false), a returner has already pulled this element off the list and is about to send (or has sent) a connection on the waiter’s channel. The returner’s send:
target.Value.conn <- connruntime.Gosched()is on an unbuffered channel, so it blocks until someone receives. If the timed-out waiter just returned an error and walked away, that returner would block forever (leak) and the connection would be lost (never returned to the pool). So the waiter must return <-elem.Value.conn — receive the in-flight connection it’s being handed, even though it timed out. The runtime.Gosched() after the send yields the returner’s goroutine to the scheduler, giving the just-woken waiter a chance to run and complete the receive promptly.
Checkpoints
Section titled “Checkpoints”Why is the results channel in dispatchRecruit created as make(chan recruitResult, len(cohort)) rather than unbuffered?
The collector can return early as soon as it builds a valid proposal, abandoning the channel. With a buffer sized to the number of producers, every recruit goroutine can deposit its result and exit without a waiting receiver. An unbuffered channel would block — and thus leak — every producer whose result is never read.After calling close(stopCh), what happens to ten goroutines all blocked on a receive from it, and why must stop() be the only caller of close(stopCh)?
All ten unblock at once — a receive on a closed channel returns the zero value immediately and forever, so close is a one-shot broadcast. stop() must be the sole closer because closing an already-closed channel panics; the buffer’s lifecycle guarantees stop() runs once.HealthStream.Shutdown() calls hs.cancel() then hs.wg.Wait(). Why isn’t cancel() alone enough?
cancel() only requests that the stream goroutines stop; it returns immediately while they may still be mid-execution. wg.Wait() blocks until they have actually exited, so the caller can safely tear down shared resources without racing live goroutines.Why does ErrorHooks.Fire() need a wg.Wait(); close(errCh) goroutine, but collectRecruitsAndBuildProposal does not?
Fire() resolves to a single receive on errCh that must mean “first error, or nil when everyone finished.” The only way one receive can detect “all done, no error” is for the channel to be closed — and no individual producer can close it (double-close panics), so a closer goroutine waits on the WaitGroup and closes. collectRecruits… instead counts (for range len(cohort)), reading exactly as many values as it sent, so the channel never needs a close signal.Exercises
Section titled “Exercises”-
In
timeout_thread.go, trace every wayrun()can return or block, and confirm there is no path where it parks forever with no possible producer/closer. Then explain precisely whynotifyChismake(chan struct{}, 1)— capacity 1, not 0 and not 10. -
Grep for the Go 1.25
wg.Go(and.Go(funcmethod across the services. List at least four non-test call sites and, for each, name (a) what cancels the goroutine (which context or stop channel) and (b) where it is joined (whichWait()). Flag any site whose owner does not wait for it. -
Compare the fan-in in
tools/event/hooks.go(ErrorHooks.Fire:errCh+ closer goroutine + single receive) withconsensus/rule_change.go(collectRecruitsAndBuildProposal: buffered channel +for range len(cohort)). Write a paragraph explaining why one needs the closer goroutine and the other does not, in terms of each consumer’s termination contract. -
In
connpool/waitlist.go, explain the race that makesreturn <-elem.Value.connnecessary in both thecloseChanandctx.Done()selectcases. What two bad things happen if you returned an error there instead, given that the returner sends on an unbuffered channel? -
Grep for
errgroup.WithContextacross the codebase. For each call site, state whether the derived context is kept (g, ctx :=) or discarded (g, _ :=), and justify why discarding it is acceptable intopoclient/store.go’sClose().
Continue to context — cancellation, deadlines, ctx.Done() in select, and the WithCancel/WithTimeout patterns that every long-lived goroutine on this page relies on.
See also
Section titled “See also”- sync & the memory model —
sync.WaitGroup,sync.Mutex,sync.Pool,semaphore.Weighted,runtime.Gosched, and the happens-before guarantees channels provide. - errors —
errgroupfirst-error propagation andfmt.Errorf("...: %w", err)wrapping insideg.Goclosures. - stdlib & idioms —
time.Timer/Ticker, thechan struct{}signal idiom, andselect/default. - consensus & failover — the
multiorchrecovery engine,rule_changefan-out, andHealthStreamin their real failover context. - service anatomy — how services own background goroutines and wire start/shutdown lifecycles.
- idioms & gotchas — distilled goroutine-leak and timer-reset gotchas.
- orientation — repo map for locating
go/servicesandgo/common.