diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index baf15307d5..ae90a71930 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2099,8 +2099,32 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor } // Check if we already have this assigned. +retry: if node := s.lookupRaftNode(rg.Name); node != nil { + if node.State() == Closed { + // We're waiting for this node to finish shutting down before we replace it. + js.mu.Unlock() + node.WaitForStop() + js.mu.Lock() + goto retry + } s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) + // Check and see if the group has the same peers. If not then we + // will update the known peers, which will send a peerstate if leader. + groupPeerIDs := append([]string{}, rg.Peers...) + var samePeers bool + if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) { + nodePeerIDs := make([]string, 0, len(nodePeers)) + for _, n := range nodePeers { + nodePeerIDs = append(nodePeerIDs, n.ID) + } + slices.Sort(groupPeerIDs) + slices.Sort(nodePeerIDs) + samePeers = slices.Equal(groupPeerIDs, nodePeerIDs) + } + if !samePeers { + node.UpdateKnownPeers(groupPeerIDs) + } rg.node = node js.mu.Unlock() return nil diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 446938ba54..6fc1102baa 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3914,6 +3914,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) { node.InstallSnapshot(mset.stateSnapshot()) // Stop the stream mset.stop(false, false) + node.WaitForStop() if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 { t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes) @@ -5805,6 +5806,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { // Should only be meta NRG left. require_True(t, s.numRaftNodes() == 1) + s.rnMu.RLock() + defer s.rnMu.RUnlock() require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } diff --git a/server/raft.go b/server/raft.go index caf002095d..6577a60cb3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -74,8 +74,8 @@ type RaftNode interface { QuitC() <-chan struct{} Created() time.Time Stop() + WaitForStop() Delete() - Wipe() RecreateInternalSubs() error } @@ -128,12 +128,13 @@ func (state RaftState) String() string { type raft struct { sync.RWMutex - created time.Time // Time that the group was created - accName string // Account name of the asset this raft group is for - acc *Account // Account that NRG traffic will be sent/received in - group string // Raft group - sd string // Store directory - id string // Node ID + created time.Time // Time that the group was created + accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in + group string // Raft group + sd string // Store directory + id string // Node ID + wg sync.WaitGroup // Wait for running goroutines to exit on shutdown wal WAL // WAL store (filestore or memstore) wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage @@ -398,7 +399,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.RecreateInternalSubs(); err != nil { - n.shutdown(false) + n.shutdown() return nil, err } @@ -526,6 +527,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } // Start the run goroutine for the Raft state machine. + n.wg.Add(1) s.startGoRoutine(n.run, labels) return n, nil @@ -681,8 +683,8 @@ func (s *Server) unregisterRaftNode(group string) { // Returns how many Raft nodes are running in this server instance. func (s *Server) numRaftNodes() int { - s.rnMu.Lock() - defer s.rnMu.Unlock() + s.rnMu.RLock() + defer s.rnMu.RUnlock() return len(s.raftNodes) } @@ -1716,95 +1718,35 @@ func (n *raft) Created() time.Time { } func (n *raft) Stop() { - n.shutdown(false) + n.shutdown() } -func (n *raft) Delete() { - n.shutdown(true) -} - -func (n *raft) shutdown(shouldDelete bool) { - n.Lock() - - // Returned swap value is the previous state. It looks counter-intuitive - // to do this atomic operation with the lock held, but we have to do so in - // order to make sure that switchState() is not already running. If it is - // then it can potentially update the n.state back to a non-closed state, - // allowing shutdown() to be called again. If that happens then the below - // close(n.quit) will panic from trying to close an already-closed channel. - if n.state.Swap(int32(Closed)) == int32(Closed) { - // If we get called again with shouldDelete, in case we were called first with Stop() cleanup - if shouldDelete { - if wal := n.wal; wal != nil { - wal.Delete() - } - os.RemoveAll(n.sd) - } - n.Unlock() - return - } - - close(n.quit) - if c := n.c; c != nil { - var subs []*subscription - c.mu.Lock() - for _, sub := range c.subs { - subs = append(subs, sub) - } - c.mu.Unlock() - for _, sub := range subs { - n.unsubscribe(sub) - } - c.closeConnection(InternalClient) - n.c = nil - } - - s, g, wal := n.s, n.group, n.wal - - // Unregistering ipQueues do not prevent them from push/pop - // just will remove them from the central monitoring map - queues := []interface { - unregister() - drain() - }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} - for _, q := range queues { - q.drain() - q.unregister() +func (n *raft) WaitForStop() { + if n.state.Load() == int32(Closed) { + n.wg.Wait() } - sd := n.sd - n.Unlock() +} - s.unregisterRaftNode(g) +func (n *raft) Delete() { + n.shutdown() + n.wg.Wait() - if wal != nil { - if shouldDelete { - wal.Delete() - } else { - wal.Stop() - } - } + n.Lock() + defer n.Unlock() - if shouldDelete { - // Delete all our peer state and vote state and any snapshots. - os.RemoveAll(sd) - n.debug("Deleted") - } else { - n.debug("Shutdown") + if wal := n.wal; wal != nil { + wal.Delete() } + os.RemoveAll(n.sd) + n.debug("Deleted") } -// Wipe will force an on disk state reset and then call Delete(). -// Useful in case we have been stopped before this point. -func (n *raft) Wipe() { - n.RLock() - wal := n.wal - n.RUnlock() - // Delete our underlying storage. - if wal != nil { - wal.Delete() +func (n *raft) shutdown() { + // First call to Stop or Delete should close the quit chan + // to notify the runAs goroutines to stop what they're doing. + if n.state.Swap(int32(Closed)) != int32(Closed) { + close(n.quit) } - // Now call delete. - n.Delete() } const ( @@ -1924,6 +1866,7 @@ func (n *raft) resetElectWithLock(et time.Duration) { func (n *raft) run() { s := n.s defer s.grWG.Done() + defer n.wg.Done() // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before @@ -1956,6 +1899,7 @@ func (n *raft) run() { // Send nil entry to signal the upper layers we are done doing replay/restore. n.apply.push(nil) +runner: for s.isRunning() { switch n.State() { case Follower: @@ -1965,9 +1909,47 @@ func (n *raft) run() { case Leader: n.runAsLeader() case Closed: - return + break runner + } + } + + // If we've reached this point then we're shutting down, either because + // the server is stopping or because the Raft group is closing/closed. + n.Lock() + defer n.Unlock() + + if c := n.c; c != nil { + var subs []*subscription + c.mu.Lock() + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) } + c.closeConnection(InternalClient) + n.c = nil + } + + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + queues := []interface { + unregister() + drain() + }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} + for _, q := range queues { + q.drain() + q.unregister() } + + n.s.unregisterRaftNode(n.group) + + if wal := n.wal; wal != nil { + wal.Stop() + } + + n.debug("Shutdown") } func (n *raft) debug(format string, args ...any) { @@ -2062,7 +2044,6 @@ func (n *raft) runAsFollower() { n.processAppendEntries() case <-n.s.quitCh: // The server is shutting down. - n.shutdown(false) return case <-n.quit: // The Raft node is shutting down. @@ -2475,7 +2456,6 @@ func (n *raft) runAsLeader() { for n.State() == Leader { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -2671,7 +2651,6 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 for n.Leader() { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -2800,7 +2779,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.progress[ar.peer] = indexUpdates n.Unlock() - n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) }) + n.wg.Add(1) + n.s.startGoRoutine(func() { + defer n.wg.Done() + n.runCatchup(ar, indexUpdates) + }) } func (n *raft) loadEntry(index uint64) (*appendEntry, error) { @@ -3062,7 +3045,6 @@ func (n *raft) runAsCandidate() { // Ignore proposals received from before the state change. n.prop.drain() case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -4166,15 +4148,20 @@ func (n *raft) updateLeadChange(isLeader bool) { // Lock should be held. func (n *raft) switchState(state RaftState) { +retry: pstate := n.State() if pstate == Closed { return } + // Set our state. If something else has changed our state + // then retry, this will either be a Stop or Delete call. + if !n.state.CompareAndSwap(int32(pstate), int32(state)) { + goto retry + } + // Reset the election timer. n.resetElectionTimeout() - // Set our state. - n.state.Store(int32(state)) if pstate == Leader && state != Leader { n.updateLeadChange(false) diff --git a/server/raft_test.go b/server/raft_test.go index 587d00a36d..be26deae95 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -661,6 +661,9 @@ func TestNRGSystemClientCleanupFromAccount(t *testing.T) { for _, sm := range rg { sm.node().Stop() } + for _, sm := range rg { + sm.node().WaitForStop() + } } finish := numClients() require_Equal(t, start, finish) @@ -988,7 +991,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { require_NoError(t, err) // Stop the leader so it moves to another one. - n.shutdown(false) + n.shutdown() // Wait for another leader to be picked rg.waitOnLeader() @@ -1650,3 +1653,17 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T) require_NoError(t, err) require_True(t, n.catchup == nil) } + +func TestNRGMultipleStopsDontPanic(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + defer func() { + p := recover() + require_True(t, p == nil) + }() + + for i := 0; i < 10; i++ { + n.Stop() + } +}