Skip to content

Commit

Permalink
De-flake RAFT adder tests (#6265)
Browse files Browse the repository at this point in the history
We didn't wait before for `smLoop(sm stateMachine)` to exit. This change
introduces a wait group so we don't have multiple go routines running.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 16, 2024
2 parents 7bd91dc + 83f573d commit b961672
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type stateMachine interface {
server() *Server
node() RaftNode
waitGroup() *sync.WaitGroup
// This will call forward as needed so can be called on any node.
propose(data []byte)
// When entries have been committed and can be applied.
Expand Down Expand Up @@ -157,9 +158,13 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
// Driver program for the state machine.
// Should be run in its own go routine.
func smLoop(sm stateMachine) {
s, n := sm.server(), sm.node()
s, n, wg := sm.server(), sm.node(), sm.waitGroup()
qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ()

// Wait group used to allow waiting until we exit from here.
wg.Add(1)
defer wg.Done()

for {
select {
case <-s.quitCh:
Expand All @@ -185,6 +190,7 @@ type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
wg sync.WaitGroup
cfg *RaftConfig
sum int64
lch chan bool
Expand All @@ -196,12 +202,19 @@ func (a *stateAdder) server() *Server {
defer a.Unlock()
return a.s
}

func (a *stateAdder) node() RaftNode {
a.Lock()
defer a.Unlock()
return a.n
}

func (a *stateAdder) waitGroup() *sync.WaitGroup {
a.Lock()
defer a.Unlock()
return &a.wg
}

func (a *stateAdder) propose(data []byte) {
a.Lock()
defer a.Unlock()
Expand Down Expand Up @@ -243,10 +256,10 @@ func (a *stateAdder) proposeDelta(delta int64) {

// Stop the group.
func (a *stateAdder) stop() {
a.Lock()
defer a.Unlock()
a.n.Stop()
a.n.WaitForStop()
n, wg := a.node(), a.waitGroup()
n.Stop()
n.WaitForStop()
wg.Wait()
}

// Restart the group
Expand Down

0 comments on commit b961672

Please sign in to comment.