From 5f1a4b66efb0a02962476cf554323d9fd817ddc5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 26 Aug 2024 22:29:21 +0100 Subject: [PATCH 1/3] NRG: Wait for goroutines to shutdown when recreating group This should fix some logical races where multiple sets of goroutines can fight over the same store directory, i.e. when shutting down and recreating a group rapidly. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 8 ++++++ server/jetstream_cluster_3_test.go | 2 ++ server/raft.go | 41 ++++++++++++++++++++++-------- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index baf15307d52..a109b76a83d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2099,7 +2099,15 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor } // Check if we already have this assigned. +retry: if node := s.lookupRaftNode(rg.Name); node != nil { + if node.State() == Closed { + // We're waiting for this node to finish shutting down before we replace it. + js.mu.Unlock() + node.WaitForStop() + js.mu.Lock() + goto retry + } s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) rg.node = node js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 446938ba54f..0c58aeb05d6 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5805,6 +5805,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { // Should only be meta NRG left. require_True(t, s.numRaftNodes() == 1) + s.rnMu.RLock() + defer s.rnMu.RUnlock() require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } diff --git a/server/raft.go b/server/raft.go index caf002095db..45dd31c03b2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -74,6 +74,7 @@ type RaftNode interface { QuitC() <-chan struct{} Created() time.Time Stop() + WaitForStop() Delete() Wipe() RecreateInternalSubs() error @@ -128,12 +129,13 @@ func (state RaftState) String() string { type raft struct { sync.RWMutex - created time.Time // Time that the group was created - accName string // Account name of the asset this raft group is for - acc *Account // Account that NRG traffic will be sent/received in - group string // Raft group - sd string // Store directory - id string // Node ID + created time.Time // Time that the group was created + accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in + group string // Raft group + sd string // Store directory + id string // Node ID + wg sync.WaitGroup // Wait for running goroutines to exit on shutdown wal WAL // WAL store (filestore or memstore) wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage @@ -526,6 +528,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } // Start the run goroutine for the Raft state machine. + n.wg.Add(1) s.startGoRoutine(n.run, labels) return n, nil @@ -681,8 +684,8 @@ func (s *Server) unregisterRaftNode(group string) { // Returns how many Raft nodes are running in this server instance. func (s *Server) numRaftNodes() int { - s.rnMu.Lock() - defer s.rnMu.Unlock() + s.rnMu.RLock() + defer s.rnMu.RUnlock() return len(s.raftNodes) } @@ -1719,12 +1722,17 @@ func (n *raft) Stop() { n.shutdown(false) } +func (n *raft) WaitForStop() { + n.wg.Wait() +} + func (n *raft) Delete() { n.shutdown(true) } func (n *raft) shutdown(shouldDelete bool) { n.Lock() + defer n.Unlock() // Returned swap value is the previous state. It looks counter-intuitive // to do this atomic operation with the lock held, but we have to do so in @@ -1735,12 +1743,12 @@ func (n *raft) shutdown(shouldDelete bool) { if n.state.Swap(int32(Closed)) == int32(Closed) { // If we get called again with shouldDelete, in case we were called first with Stop() cleanup if shouldDelete { + n.wg.Wait() if wal := n.wal; wal != nil { wal.Delete() } os.RemoveAll(n.sd) } - n.Unlock() return } @@ -1759,6 +1767,13 @@ func (n *raft) shutdown(shouldDelete bool) { n.c = nil } + // Wait for goroutines to shut down before trying to clean up + // the log below, otherwise we might end up deleting the store + // from underneath running goroutines. + n.Unlock() + n.wg.Wait() + n.Lock() + s, g, wal := n.s, n.group, n.wal // Unregistering ipQueues do not prevent them from push/pop @@ -1772,7 +1787,6 @@ func (n *raft) shutdown(shouldDelete bool) { q.unregister() } sd := n.sd - n.Unlock() s.unregisterRaftNode(g) @@ -1923,6 +1937,7 @@ func (n *raft) resetElectWithLock(et time.Duration) { // the entire life of the Raft node once started. func (n *raft) run() { s := n.s + defer n.wg.Done() defer s.grWG.Done() // We want to wait for some routing to be enabled, so we will wait for @@ -2800,7 +2815,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.progress[ar.peer] = indexUpdates n.Unlock() - n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) }) + n.wg.Add(1) + n.s.startGoRoutine(func() { + defer n.wg.Done() + n.runCatchup(ar, indexUpdates) + }) } func (n *raft) loadEntry(index uint64) (*appendEntry, error) { From e4e3deea0111fef2f9734d4a891500f452cb5def Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 1 Oct 2024 10:57:17 +0100 Subject: [PATCH 2/3] NRG: Update group peers if mismatched Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a109b76a83d..ae90a719300 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2109,6 +2109,22 @@ retry: goto retry } s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) + // Check and see if the group has the same peers. If not then we + // will update the known peers, which will send a peerstate if leader. + groupPeerIDs := append([]string{}, rg.Peers...) + var samePeers bool + if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) { + nodePeerIDs := make([]string, 0, len(nodePeers)) + for _, n := range nodePeers { + nodePeerIDs = append(nodePeerIDs, n.ID) + } + slices.Sort(groupPeerIDs) + slices.Sort(nodePeerIDs) + samePeers = slices.Equal(groupPeerIDs, nodePeerIDs) + } + if !samePeers { + node.UpdateKnownPeers(groupPeerIDs) + } rg.node = node js.mu.Unlock() return nil From 56221998a00234f546c9efd79aaaf4865bf9d88c Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 5 Nov 2024 11:45:48 +0000 Subject: [PATCH 3/3] NRG: Refactor shutdown, update `switchState` to CAS Signed-off-by: Neil Twigg --- server/jetstream_cluster_3_test.go | 1 + server/raft.go | 160 ++++++++++++----------------- server/raft_test.go | 19 +++- 3 files changed, 83 insertions(+), 97 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 0c58aeb05d6..6fc1102baa3 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3914,6 +3914,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) { node.InstallSnapshot(mset.stateSnapshot()) // Stop the stream mset.stop(false, false) + node.WaitForStop() if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 { t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes) diff --git a/server/raft.go b/server/raft.go index 45dd31c03b2..6577a60cb36 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,7 +76,6 @@ type RaftNode interface { Stop() WaitForStop() Delete() - Wipe() RecreateInternalSubs() error } @@ -400,7 +399,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.RecreateInternalSubs(); err != nil { - n.shutdown(false) + n.shutdown() return nil, err } @@ -1719,106 +1718,35 @@ func (n *raft) Created() time.Time { } func (n *raft) Stop() { - n.shutdown(false) + n.shutdown() } func (n *raft) WaitForStop() { - n.wg.Wait() + if n.state.Load() == int32(Closed) { + n.wg.Wait() + } } func (n *raft) Delete() { - n.shutdown(true) -} + n.shutdown() + n.wg.Wait() -func (n *raft) shutdown(shouldDelete bool) { n.Lock() defer n.Unlock() - // Returned swap value is the previous state. It looks counter-intuitive - // to do this atomic operation with the lock held, but we have to do so in - // order to make sure that switchState() is not already running. If it is - // then it can potentially update the n.state back to a non-closed state, - // allowing shutdown() to be called again. If that happens then the below - // close(n.quit) will panic from trying to close an already-closed channel. - if n.state.Swap(int32(Closed)) == int32(Closed) { - // If we get called again with shouldDelete, in case we were called first with Stop() cleanup - if shouldDelete { - n.wg.Wait() - if wal := n.wal; wal != nil { - wal.Delete() - } - os.RemoveAll(n.sd) - } - return - } - - close(n.quit) - if c := n.c; c != nil { - var subs []*subscription - c.mu.Lock() - for _, sub := range c.subs { - subs = append(subs, sub) - } - c.mu.Unlock() - for _, sub := range subs { - n.unsubscribe(sub) - } - c.closeConnection(InternalClient) - n.c = nil - } - - // Wait for goroutines to shut down before trying to clean up - // the log below, otherwise we might end up deleting the store - // from underneath running goroutines. - n.Unlock() - n.wg.Wait() - n.Lock() - - s, g, wal := n.s, n.group, n.wal - - // Unregistering ipQueues do not prevent them from push/pop - // just will remove them from the central monitoring map - queues := []interface { - unregister() - drain() - }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} - for _, q := range queues { - q.drain() - q.unregister() - } - sd := n.sd - - s.unregisterRaftNode(g) - - if wal != nil { - if shouldDelete { - wal.Delete() - } else { - wal.Stop() - } - } - - if shouldDelete { - // Delete all our peer state and vote state and any snapshots. - os.RemoveAll(sd) - n.debug("Deleted") - } else { - n.debug("Shutdown") + if wal := n.wal; wal != nil { + wal.Delete() } + os.RemoveAll(n.sd) + n.debug("Deleted") } -// Wipe will force an on disk state reset and then call Delete(). -// Useful in case we have been stopped before this point. -func (n *raft) Wipe() { - n.RLock() - wal := n.wal - n.RUnlock() - // Delete our underlying storage. - if wal != nil { - wal.Delete() +func (n *raft) shutdown() { + // First call to Stop or Delete should close the quit chan + // to notify the runAs goroutines to stop what they're doing. + if n.state.Swap(int32(Closed)) != int32(Closed) { + close(n.quit) } - // Now call delete. - n.Delete() } const ( @@ -1937,8 +1865,8 @@ func (n *raft) resetElectWithLock(et time.Duration) { // the entire life of the Raft node once started. func (n *raft) run() { s := n.s - defer n.wg.Done() defer s.grWG.Done() + defer n.wg.Done() // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before @@ -1971,6 +1899,7 @@ func (n *raft) run() { // Send nil entry to signal the upper layers we are done doing replay/restore. n.apply.push(nil) +runner: for s.isRunning() { switch n.State() { case Follower: @@ -1980,9 +1909,47 @@ func (n *raft) run() { case Leader: n.runAsLeader() case Closed: - return + break runner } } + + // If we've reached this point then we're shutting down, either because + // the server is stopping or because the Raft group is closing/closed. + n.Lock() + defer n.Unlock() + + if c := n.c; c != nil { + var subs []*subscription + c.mu.Lock() + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + n.c = nil + } + + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + queues := []interface { + unregister() + drain() + }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} + for _, q := range queues { + q.drain() + q.unregister() + } + + n.s.unregisterRaftNode(n.group) + + if wal := n.wal; wal != nil { + wal.Stop() + } + + n.debug("Shutdown") } func (n *raft) debug(format string, args ...any) { @@ -2077,7 +2044,6 @@ func (n *raft) runAsFollower() { n.processAppendEntries() case <-n.s.quitCh: // The server is shutting down. - n.shutdown(false) return case <-n.quit: // The Raft node is shutting down. @@ -2490,7 +2456,6 @@ func (n *raft) runAsLeader() { for n.State() == Leader { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -2686,7 +2651,6 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 for n.Leader() { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -3081,7 +3045,6 @@ func (n *raft) runAsCandidate() { // Ignore proposals received from before the state change. n.prop.drain() case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -4185,15 +4148,20 @@ func (n *raft) updateLeadChange(isLeader bool) { // Lock should be held. func (n *raft) switchState(state RaftState) { +retry: pstate := n.State() if pstate == Closed { return } + // Set our state. If something else has changed our state + // then retry, this will either be a Stop or Delete call. + if !n.state.CompareAndSwap(int32(pstate), int32(state)) { + goto retry + } + // Reset the election timer. n.resetElectionTimeout() - // Set our state. - n.state.Store(int32(state)) if pstate == Leader && state != Leader { n.updateLeadChange(false) diff --git a/server/raft_test.go b/server/raft_test.go index 587d00a36d9..be26deae95d 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -661,6 +661,9 @@ func TestNRGSystemClientCleanupFromAccount(t *testing.T) { for _, sm := range rg { sm.node().Stop() } + for _, sm := range rg { + sm.node().WaitForStop() + } } finish := numClients() require_Equal(t, start, finish) @@ -988,7 +991,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { require_NoError(t, err) // Stop the leader so it moves to another one. - n.shutdown(false) + n.shutdown() // Wait for another leader to be picked rg.waitOnLeader() @@ -1650,3 +1653,17 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T) require_NoError(t, err) require_True(t, n.catchup == nil) } + +func TestNRGMultipleStopsDontPanic(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + defer func() { + p := recover() + require_True(t, p == nil) + }() + + for i := 0; i < 10; i++ { + n.Stop() + } +}