Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: Wait for goroutines to shutdown when recreating group #5832

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2099,8 +2099,32 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
}

// Check if we already have this assigned.
retry:
if node := s.lookupRaftNode(rg.Name); node != nil {
if node.State() == Closed {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// We're waiting for this node to finish shutting down before we replace it.
js.mu.Unlock()
node.WaitForStop()
js.mu.Lock()
goto retry
}
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
// Check and see if the group has the same peers. If not then we
// will update the known peers, which will send a peerstate if leader.
groupPeerIDs := append([]string{}, rg.Peers...)
var samePeers bool
if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) {
nodePeerIDs := make([]string, 0, len(nodePeers))
for _, n := range nodePeers {
nodePeerIDs = append(nodePeerIDs, n.ID)
}
slices.Sort(groupPeerIDs)
slices.Sort(nodePeerIDs)
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
}
if !samePeers {
node.UpdateKnownPeers(groupPeerIDs)
}
rg.node = node
js.mu.Unlock()
return nil
Expand Down
3 changes: 3 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3914,6 +3914,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)
node.WaitForStop()

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
Expand Down Expand Up @@ -5805,6 +5806,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
s.rnMu.RLock()
defer s.rnMu.RUnlock()
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
Expand Down
181 changes: 84 additions & 97 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type RaftNode interface {
QuitC() <-chan struct{}
Created() time.Time
Stop()
WaitForStop()
Delete()
Wipe()
RecreateInternalSubs() error
}

Expand Down Expand Up @@ -128,12 +128,13 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
wg sync.WaitGroup // Wait for running goroutines to exit on shutdown

wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
Expand Down Expand Up @@ -398,7 +399,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.RecreateInternalSubs(); err != nil {
n.shutdown(false)
n.shutdown()
return nil, err
}

Expand Down Expand Up @@ -526,6 +527,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}

// Start the run goroutine for the Raft state machine.
n.wg.Add(1)
s.startGoRoutine(n.run, labels)

return n, nil
Expand Down Expand Up @@ -681,8 +683,8 @@ func (s *Server) unregisterRaftNode(group string) {

// Returns how many Raft nodes are running in this server instance.
func (s *Server) numRaftNodes() int {
s.rnMu.Lock()
defer s.rnMu.Unlock()
s.rnMu.RLock()
defer s.rnMu.RUnlock()
return len(s.raftNodes)
}

Expand Down Expand Up @@ -1716,95 +1718,35 @@ func (n *raft) Created() time.Time {
}

func (n *raft) Stop() {
n.shutdown(false)
n.shutdown()
}

func (n *raft) Delete() {
n.shutdown(true)
}

func (n *raft) shutdown(shouldDelete bool) {
n.Lock()

// Returned swap value is the previous state. It looks counter-intuitive
// to do this atomic operation with the lock held, but we have to do so in
// order to make sure that switchState() is not already running. If it is
// then it can potentially update the n.state back to a non-closed state,
// allowing shutdown() to be called again. If that happens then the below
// close(n.quit) will panic from trying to close an already-closed channel.
if n.state.Swap(int32(Closed)) == int32(Closed) {
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
if shouldDelete {
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
}
n.Unlock()
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
n.c = nil
}

s, g, wal := n.s, n.group, n.wal

// Unregistering ipQueues do not prevent them from push/pop
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
q.unregister()
func (n *raft) WaitForStop() {
if n.state.Load() == int32(Closed) {
n.wg.Wait()
}
sd := n.sd
n.Unlock()
}

s.unregisterRaftNode(g)
func (n *raft) Delete() {
n.shutdown()
n.wg.Wait()

if wal != nil {
if shouldDelete {
wal.Delete()
} else {
wal.Stop()
}
}
n.Lock()
defer n.Unlock()

if shouldDelete {
// Delete all our peer state and vote state and any snapshots.
os.RemoveAll(sd)
n.debug("Deleted")
} else {
n.debug("Shutdown")
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
n.debug("Deleted")
}

// Wipe will force an on disk state reset and then call Delete().
// Useful in case we have been stopped before this point.
func (n *raft) Wipe() {
n.RLock()
wal := n.wal
n.RUnlock()
// Delete our underlying storage.
if wal != nil {
wal.Delete()
func (n *raft) shutdown() {
// First call to Stop or Delete should close the quit chan
// to notify the runAs goroutines to stop what they're doing.
if n.state.Swap(int32(Closed)) != int32(Closed) {
close(n.quit)
}
// Now call delete.
n.Delete()
}

const (
Expand Down Expand Up @@ -1924,6 +1866,7 @@ func (n *raft) resetElectWithLock(et time.Duration) {
func (n *raft) run() {
s := n.s
defer s.grWG.Done()
defer n.wg.Done()

// We want to wait for some routing to be enabled, so we will wait for
// at least a route, leaf or gateway connection to be established before
Expand Down Expand Up @@ -1956,6 +1899,7 @@ func (n *raft) run() {
// Send nil entry to signal the upper layers we are done doing replay/restore.
n.apply.push(nil)

runner:
for s.isRunning() {
switch n.State() {
case Follower:
Expand All @@ -1965,9 +1909,47 @@ func (n *raft) run() {
case Leader:
n.runAsLeader()
case Closed:
return
break runner
}
}

// If we've reached this point then we're shutting down, either because
// the server is stopping or because the Raft group is closing/closed.
n.Lock()
defer n.Unlock()

if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
n.c = nil
}

// Unregistering ipQueues do not prevent them from push/pop
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
q.unregister()
}

n.s.unregisterRaftNode(n.group)

if wal := n.wal; wal != nil {
wal.Stop()
}

n.debug("Shutdown")
}

func (n *raft) debug(format string, args ...any) {
Expand Down Expand Up @@ -2062,7 +2044,6 @@ func (n *raft) runAsFollower() {
n.processAppendEntries()
case <-n.s.quitCh:
// The server is shutting down.
n.shutdown(false)
return
case <-n.quit:
// The Raft node is shutting down.
Expand Down Expand Up @@ -2475,7 +2456,6 @@ func (n *raft) runAsLeader() {
for n.State() == Leader {
select {
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -2671,7 +2651,6 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64
for n.Leader() {
select {
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -2800,7 +2779,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.progress[ar.peer] = indexUpdates
n.Unlock()

n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) })
n.wg.Add(1)
n.s.startGoRoutine(func() {
defer n.wg.Done()
n.runCatchup(ar, indexUpdates)
})
}

func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
Expand Down Expand Up @@ -3062,7 +3045,6 @@ func (n *raft) runAsCandidate() {
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.s.quitCh:
n.shutdown(false)
return
case <-n.quit:
return
Expand Down Expand Up @@ -4166,15 +4148,20 @@ func (n *raft) updateLeadChange(isLeader bool) {

// Lock should be held.
func (n *raft) switchState(state RaftState) {
retry:
pstate := n.State()
if pstate == Closed {
return
}

// Set our state. If something else has changed our state
// then retry, this will either be a Stop or Delete call.
if !n.state.CompareAndSwap(int32(pstate), int32(state)) {
goto retry
}

// Reset the election timer.
n.resetElectionTimeout()
// Set our state.
n.state.Store(int32(state))

if pstate == Leader && state != Leader {
n.updateLeadChange(false)
Expand Down
19 changes: 18 additions & 1 deletion server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,9 @@ func TestNRGSystemClientCleanupFromAccount(t *testing.T) {
for _, sm := range rg {
sm.node().Stop()
}
for _, sm := range rg {
sm.node().WaitForStop()
}
}
finish := numClients()
require_Equal(t, start, finish)
Expand Down Expand Up @@ -988,7 +991,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) {
require_NoError(t, err)

// Stop the leader so it moves to another one.
n.shutdown(false)
n.shutdown()

// Wait for another leader to be picked
rg.waitOnLeader()
Expand Down Expand Up @@ -1650,3 +1653,17 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T)
require_NoError(t, err)
require_True(t, n.catchup == nil)
}

func TestNRGMultipleStopsDontPanic(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

defer func() {
p := recover()
require_True(t, p == nil)
}()

for i := 0; i < 10; i++ {
n.Stop()
}
}