diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9b03f6bee69..c078f7ac1e6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1546,6 +1546,11 @@ func (js *jetStream) metaSnapshot() []byte { } func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { + // We're about to process the snapshot, we can clear inflight status early. + if mg := js.getMetaGroup(); mg != nil { + mg.ClearInflightSnapshot() + } + var wsas []writeableStreamAssignment if len(buf) > 0 { jse, err := s2.Decode(nil, buf) @@ -4959,6 +4964,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea for _, e := range ce.Entries { if e.Type == EntrySnapshot { if !isLeader { + // We're about to process the snapshot, we can clear inflight status early. + if n := o.node; n != nil { + n.ClearInflightSnapshot() + } + // No-op needed? state, err := decodeConsumerState(e.Data) if err != nil { @@ -8287,6 +8297,14 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() + defer func() { + // Only clear inflight if successful, or we're not going to reset cluster state. + // Otherwise, we don't want to become candidate/leader for the stream while resetting cluster state. + if e == nil || !isClusterResetErr(e) { + n.ClearInflightSnapshot() + } + }() + // Bug that would cause this to be empty on stream update. if subject == _EMPTY_ { return errCatchupCorruptSnapshot diff --git a/server/raft.go b/server/raft.go index 76597d8d660..d54404cace5 100644 --- a/server/raft.go +++ b/server/raft.go @@ -70,6 +70,7 @@ type RaftNode interface { ApplyQ() *ipQueue[*CommittedEntry] PauseApply() error ResumeApply() + ClearInflightSnapshot() LeadChangeC() <-chan bool QuitC() <-chan struct{} Created() time.Time @@ -200,6 +201,8 @@ type raft struct { hcommit uint64 // The commit at the time that applies were paused pobserver bool // Whether we were an observer at the time that applies were paused + inflightSnapshot bool // Indicates a snapshot is inflight, i.e. it's in the apply queue. Blocks us from becoming candidate. + prop *ipQueue[*Entry] // Proposals entry *ipQueue[*appendEntry] // Append entries resp *ipQueue[*appendEntryResponse] // Append entries responses @@ -1039,6 +1042,13 @@ func (n *raft) ResumeApply() { } } +// ClearInflightSnapshot resets flag of a snapshot being inflight. +func (n *raft) ClearInflightSnapshot() { + n.Lock() + defer n.Unlock() + n.inflightSnapshot = false +} + // Applied is a callback that must be called by the upper layer when it // has successfully applied the committed entries that it received from the // apply queue. It will return the number of entries and an estimation of the @@ -1134,7 +1144,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Check that a catchup isn't already taking place. If it is then we won't // allow installing snapshots until it is done. - if len(n.progress) > 0 || n.paused { + if len(n.progress) > 0 || n.inflightSnapshot { return errCatchupsRunning } @@ -1281,6 +1291,7 @@ func (n *raft) setupLastSnapshot() { n.commit = snap.lastIndex n.applied = snap.lastIndex n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) + n.inflightSnapshot = true if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } @@ -3478,6 +3489,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) + n.inflightSnapshot = true n.Unlock() return } @@ -4228,7 +4240,8 @@ func (n *raft) switchToCandidate() { defer n.Unlock() // If we are catching up or are in observer mode we can not switch. - if n.observer || n.paused { + // Or we're a follower and are about to or actively processing a snapshot. + if n.observer || n.paused || n.inflightSnapshot { return } diff --git a/server/raft_test.go b/server/raft_test.go index 1a6c5e83878..1a31cb274ad 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1499,3 +1499,41 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { require_NoError(t, err) require_Equal(t, len(files), 1) } + +func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample snapshot entry, the content doesn't matter. + snapshotEntries := []*Entry{ + newEntry(EntrySnapshot, nil), + newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})), + } + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeTriggerCatchup := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) + + // Switch follower into catchup. + n.processAppendEntry(aeTriggerCatchup, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async. + n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub) + + // Try to switch to candidate, it should be blocked since the snapshot is not processed yet. + n.switchToCandidate() + require_Equal(t, n.State(), Follower) + + // Simulate snapshot being processed by the upper layer. + n.ClearInflightSnapshot() + + // Retry becoming candidate, snapshot is processed so can now do so. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + +}