Skip to content

Commit

Permalink
NRG: Wait for goroutines to shutdown when recreating group
Browse files Browse the repository at this point in the history
This should fix some logical races where multiple sets of goroutines
can fight over the same store directory, i.e. when shutting down and
recreating a group rapidly.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 18, 2024
1 parent 83c77b4 commit 61b9a2e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
6 changes: 6 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,7 +2050,13 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
}

// Check if we already have this assigned.
retry:
if node := s.lookupRaftNode(rg.Name); node != nil {
if node.State() == Closed {
// We're waiting for this node to finish shutting down before we replace it.
node.WaitForStop()
goto retry
}
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
rg.node = node
js.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5779,6 +5779,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
s.rnMu.RLock()
defer s.rnMu.RUnlock()
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
Expand Down
41 changes: 30 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type RaftNode interface {
QuitC() <-chan struct{}
Created() time.Time
Stop()
WaitForStop()
Delete()
Wipe()
RecreateInternalSubs() error
Expand Down Expand Up @@ -128,12 +129,13 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
wg sync.WaitGroup // Wait for running goroutines to exit on shutdown

wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
Expand Down Expand Up @@ -521,6 +523,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
s.registerRaftNode(n.group, n)

// Start the run goroutine for the Raft state machine.
n.wg.Add(1)
s.startGoRoutine(n.run, labels)

return n, nil
Expand Down Expand Up @@ -676,8 +679,8 @@ func (s *Server) unregisterRaftNode(group string) {

// Returns how many Raft nodes are running in this server instance.
func (s *Server) numRaftNodes() int {
s.rnMu.Lock()
defer s.rnMu.Unlock()
s.rnMu.RLock()
defer s.rnMu.RUnlock()
return len(s.raftNodes)
}

Expand Down Expand Up @@ -1718,12 +1721,17 @@ func (n *raft) Stop() {
n.shutdown(false)
}

func (n *raft) WaitForStop() {
n.wg.Wait()
}

func (n *raft) Delete() {
n.shutdown(true)
}

func (n *raft) shutdown(shouldDelete bool) {
n.Lock()
defer n.Unlock()

// Returned swap value is the previous state. It looks counter-intuitive
// to do this atomic operation with the lock held, but we have to do so in
Expand All @@ -1734,12 +1742,12 @@ func (n *raft) shutdown(shouldDelete bool) {
if n.state.Swap(int32(Closed)) == int32(Closed) {
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
if shouldDelete {
n.wg.Wait()
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
}
n.Unlock()
return
}

Expand All @@ -1758,6 +1766,13 @@ func (n *raft) shutdown(shouldDelete bool) {
n.c = nil
}

// Wait for goroutines to shut down before trying to clean up
// the log below, otherwise we might end up deleting the store
// from underneath running goroutines.
n.Unlock()
n.wg.Wait()
n.Lock()

s, g, wal := n.s, n.group, n.wal

// Unregistering ipQueues do not prevent them from push/pop
Expand All @@ -1771,7 +1786,6 @@ func (n *raft) shutdown(shouldDelete bool) {
q.unregister()
}
sd := n.sd
n.Unlock()

s.unregisterRaftNode(g)

Expand Down Expand Up @@ -1922,6 +1936,7 @@ func (n *raft) resetElectWithLock(et time.Duration) {
// the entire life of the Raft node once started.
func (n *raft) run() {
s := n.s
defer n.wg.Done()
defer s.grWG.Done()

// We want to wait for some routing to be enabled, so we will wait for
Expand Down Expand Up @@ -2799,7 +2814,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.progress[ar.peer] = indexUpdates
n.Unlock()

n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) })
n.wg.Add(1)
n.s.startGoRoutine(func() {
defer n.wg.Done()
n.runCatchup(ar, indexUpdates)
})
}

func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
Expand Down

0 comments on commit 61b9a2e

Please sign in to comment.