Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix desync after errCatchupAbortedNoLeader #5986

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2848,7 +2848,7 @@ func (mset *stream) resetClusteredState(err error) bool {
}

if node != nil {
if err == errCatchupTooManyRetries {
if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries {
// Don't delete all state, could've just been temporarily unable to reach the leader.
node.Stop()
} else {
Expand Down Expand Up @@ -8276,6 +8276,7 @@ var (
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupAbortedNoLeader = errors.New("catchup aborted, no leader")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

Expand Down Expand Up @@ -8379,7 +8380,7 @@ RETRY:
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name())
return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name())
}

// If we have a sub clear that here.
Expand Down
151 changes: 94 additions & 57 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4090,77 +4090,114 @@ func TestJetStreamClusterConsumerReplicasAfterScale(t *testing.T) {
require_Equal(t, len(ci.Cluster.Replicas), 2)
}

func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
tests := []struct {
title string
onErrorCondition func(server *Server, mset *stream)
}{
{
title: "TooManyRetries",
onErrorCondition: func(server *Server, mset *stream) {
// Too many retries while processing snapshot is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
require_True(t, isClusterResetErr(errCatchupTooManyRetries))
mset.resetClusteredState(errCatchupTooManyRetries)
},
},
{
title: "AbortedNoLeader",
onErrorCondition: func(server *Server, mset *stream) {
for _, n := range server.raftNodes {
rn := n.(*raft)
if rn.accName == "$G" {
rn.updateLeader(noLeader)
}
}

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Processing a snapshot while there's no leader elected is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
var snap StreamReplicatedState
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
err := mset.processSnapshot(&snap)
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
require_True(t, isClusterResetErr(err))
mset.resetClusteredState(err)
},
},
}

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Publish 10 messages.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+1))
}
streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()

outdatedServerName := servers[0]
clusterResetServerName := servers[1]
servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})

outdatedServer := c.serverByName(outdatedServerName)
outdatedServer.Shutdown()
outdatedServer.WaitForShutdown()
// Publish 10 messages.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+1))
}

// Publish 10 more messages, one server will be behind.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+11))
}
outdatedServerName := servers[0]
clusterResetServerName := servers[1]

// We will not need the client anymore.
nc.Close()
outdatedServer := c.serverByName(outdatedServerName)
outdatedServer.Shutdown()
outdatedServer.WaitForShutdown()

// Shutdown stream leader so one server remains.
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()
// Publish 10 more messages, one server will be behind.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+11))
}

clusterResetServer := c.serverByName(clusterResetServerName)
acc, err := clusterResetServer.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
// We will not need the client anymore.
nc.Close()

// Too many retries while processing snapshot is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
require_True(t, isClusterResetErr(errCatchupTooManyRetries))
mset.resetClusteredState(errCatchupTooManyRetries)
// Shutdown stream leader so one server remains.
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()

// Stream leader stays offline, we only start the server with missing stream data.
// We expect that the reset server must not allow the outdated server to become leader, as that would result in desync.
c.restartServer(outdatedServer)
c.waitOnStreamLeader(globalAccountName, "TEST")
clusterResetServer := c.serverByName(clusterResetServerName)
acc, err := clusterResetServer.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// Run error condition.
test.onErrorCondition(clusterResetServer, mset)

// Outdated server must NOT become the leader.
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
// Stream leader stays offline, we only start the server with missing stream data.
// We expect that the reset server must not allow the outdated server to become leader, as that would result in desync.
c.restartServer(outdatedServer)
c.waitOnStreamLeader(globalAccountName, "TEST")

// Outdated server must NOT become the leader.
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
})
}
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool {
var errFirstSequenceMismatch = errors.New("first sequence mismatch")

func isClusterResetErr(err error) bool {
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries
}

// Copy all fields.
Expand Down