Skip to content

Commit

Permalink
[FIXED] Don't become candidate while processing stream snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Oct 30, 2024
1 parent e339076 commit 474a4b5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
18 changes: 18 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,11 @@ func (js *jetStream) metaSnapshot() []byte {
}

func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {
// We're about to process the snapshot, we can clear inflight status early.
if mg := js.getMetaGroup(); mg != nil {
mg.ClearInflightSnapshot()
}

var wsas []writeableStreamAssignment
if len(buf) > 0 {
jse, err := s2.Decode(nil, buf)
Expand Down Expand Up @@ -4959,6 +4964,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
for _, e := range ce.Entries {
if e.Type == EntrySnapshot {
if !isLeader {
// We're about to process the snapshot, we can clear inflight status early.
if n := o.node; n != nil {
n.ClearInflightSnapshot()
}

// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
Expand Down Expand Up @@ -8287,6 +8297,14 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()

defer func() {
// Only clear inflight if successful, or we're not going to reset cluster state.
// Otherwise, we don't want to become candidate/leader for the stream while resetting cluster state.
if e == nil || !isClusterResetErr(e) {
n.ClearInflightSnapshot()
}
}()

// Bug that would cause this to be empty on stream update.
if subject == _EMPTY_ {
return errCatchupCorruptSnapshot
Expand Down
17 changes: 15 additions & 2 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type RaftNode interface {
ApplyQ() *ipQueue[*CommittedEntry]
PauseApply() error
ResumeApply()
ClearInflightSnapshot()
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Expand Down Expand Up @@ -200,6 +201,8 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

inflightSnapshot bool // Indicates a snapshot is inflight, i.e. it's in the apply queue. Blocks us from becoming candidate.

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
Expand Down Expand Up @@ -1039,6 +1042,13 @@ func (n *raft) ResumeApply() {
}
}

// ClearInflightSnapshot resets flag of a snapshot being inflight.
func (n *raft) ClearInflightSnapshot() {
n.Lock()
defer n.Unlock()
n.inflightSnapshot = false
}

// Applied is a callback that must be called by the upper layer when it
// has successfully applied the committed entries that it received from the
// apply queue. It will return the number of entries and an estimation of the
Expand Down Expand Up @@ -1134,7 +1144,7 @@ func (n *raft) InstallSnapshot(data []byte) error {

// Check that a catchup isn't already taking place. If it is then we won't
// allow installing snapshots until it is done.
if len(n.progress) > 0 || n.paused {
if len(n.progress) > 0 || n.inflightSnapshot {
return errCatchupsRunning
}

Expand Down Expand Up @@ -1281,6 +1291,7 @@ func (n *raft) setupLastSnapshot() {
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
n.inflightSnapshot = true
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
Expand Down Expand Up @@ -3478,6 +3489,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.inflightSnapshot = true
n.Unlock()
return
}
Expand Down Expand Up @@ -4228,7 +4240,8 @@ func (n *raft) switchToCandidate() {
defer n.Unlock()

// If we are catching up or are in observer mode we can not switch.
if n.observer || n.paused {
// Or we're a follower and are about to or actively processing a snapshot.
if n.observer || n.paused || n.inflightSnapshot {
return
}

Expand Down
38 changes: 38 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,3 +1499,41 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
require_NoError(t, err)
require_Equal(t, len(files), 1)
}

func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample snapshot entry, the content doesn't matter.
snapshotEntries := []*Entry{
newEntry(EntrySnapshot, nil),
newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})),
}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeTriggerCatchup := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries})

// Switch follower into catchup.
n.processAppendEntry(aeTriggerCatchup, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex

// Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async.
n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub)

// Try to switch to candidate, it should be blocked since the snapshot is not processed yet.
n.switchToCandidate()
require_Equal(t, n.State(), Follower)

// Simulate snapshot being processed by the upper layer.
n.ClearInflightSnapshot()

// Retry becoming candidate, snapshot is processed so can now do so.
n.switchToCandidate()
require_Equal(t, n.State(), Candidate)

}

0 comments on commit 474a4b5

Please sign in to comment.