diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4b74f765a19..2e30c476fb9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2050,7 +2050,13 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor } // Check if we already have this assigned. +retry: if node := s.lookupRaftNode(rg.Name); node != nil { + if node.State() == Closed { + // We're waiting for this node to finish shutting down before we replace it. + node.WaitForStop() + goto retry + } s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) rg.node = node js.mu.Unlock() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3317ccaab2c..f7824cb0dcf 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5779,6 +5779,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { // Should only be meta NRG left. require_True(t, s.numRaftNodes() == 1) + s.rnMu.RLock() + defer s.rnMu.RUnlock() require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } diff --git a/server/raft.go b/server/raft.go index 56e69e37960..3cb604af584 100644 --- a/server/raft.go +++ b/server/raft.go @@ -74,6 +74,7 @@ type RaftNode interface { QuitC() <-chan struct{} Created() time.Time Stop() + WaitForStop() Delete() Wipe() RecreateInternalSubs() error @@ -128,12 +129,13 @@ func (state RaftState) String() string { type raft struct { sync.RWMutex - created time.Time // Time that the group was created - accName string // Account name of the asset this raft group is for - acc *Account // Account that NRG traffic will be sent/received in - group string // Raft group - sd string // Store directory - id string // Node ID + created time.Time // Time that the group was created + accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in + group string // Raft group + sd string // Store directory + id string // Node ID + wg sync.WaitGroup // Wait for running goroutines to exit on shutdown wal WAL // WAL store (filestore or memstore) wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage @@ -521,6 +523,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.registerRaftNode(n.group, n) // Start the run goroutine for the Raft state machine. + n.wg.Add(1) s.startGoRoutine(n.run, labels) return n, nil @@ -676,8 +679,8 @@ func (s *Server) unregisterRaftNode(group string) { // Returns how many Raft nodes are running in this server instance. func (s *Server) numRaftNodes() int { - s.rnMu.Lock() - defer s.rnMu.Unlock() + s.rnMu.RLock() + defer s.rnMu.RUnlock() return len(s.raftNodes) } @@ -1718,12 +1721,17 @@ func (n *raft) Stop() { n.shutdown(false) } +func (n *raft) WaitForStop() { + n.wg.Wait() +} + func (n *raft) Delete() { n.shutdown(true) } func (n *raft) shutdown(shouldDelete bool) { n.Lock() + defer n.Unlock() // Returned swap value is the previous state. It looks counter-intuitive // to do this atomic operation with the lock held, but we have to do so in @@ -1734,12 +1742,12 @@ func (n *raft) shutdown(shouldDelete bool) { if n.state.Swap(int32(Closed)) == int32(Closed) { // If we get called again with shouldDelete, in case we were called first with Stop() cleanup if shouldDelete { + n.wg.Wait() if wal := n.wal; wal != nil { wal.Delete() } os.RemoveAll(n.sd) } - n.Unlock() return } @@ -1758,6 +1766,13 @@ func (n *raft) shutdown(shouldDelete bool) { n.c = nil } + // Wait for goroutines to shut down before trying to clean up + // the log below, otherwise we might end up deleting the store + // from underneath running goroutines. + n.Unlock() + n.wg.Wait() + n.Lock() + s, g, wal := n.s, n.group, n.wal // Unregistering ipQueues do not prevent them from push/pop @@ -1771,7 +1786,6 @@ func (n *raft) shutdown(shouldDelete bool) { q.unregister() } sd := n.sd - n.Unlock() s.unregisterRaftNode(g) @@ -1922,6 +1936,7 @@ func (n *raft) resetElectWithLock(et time.Duration) { // the entire life of the Raft node once started. func (n *raft) run() { s := n.s + defer n.wg.Done() defer s.grWG.Done() // We want to wait for some routing to be enabled, so we will wait for @@ -2799,7 +2814,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.progress[ar.peer] = indexUpdates n.Unlock() - n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) }) + n.wg.Add(1) + n.s.startGoRoutine(func() { + defer n.wg.Done() + n.runCatchup(ar, indexUpdates) + }) } func (n *raft) loadEntry(index uint64) (*appendEntry, error) {