diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8521103f35..4239f4aa45 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2848,7 +2848,7 @@ func (mset *stream) resetClusteredState(err error) bool { } if node != nil { - if err == errCatchupTooManyRetries { + if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries { // Don't delete all state, could've just been temporarily unable to reach the leader. node.Stop() } else { @@ -8276,6 +8276,7 @@ var ( errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away. errCatchupBadMsg = errors.New("bad catchup msg") errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg") + errCatchupAbortedNoLeader = errors.New("catchup aborted, no leader") errCatchupTooManyRetries = errors.New("catchup failed, too many retries") ) @@ -8379,7 +8380,7 @@ RETRY: releaseSyncOutSem() if n.GroupLeader() == _EMPTY_ { - return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name()) + return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name()) } // If we have a sub clear that here. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index a6b8342398..51028fec55 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4090,77 +4090,114 @@ func TestJetStreamClusterConsumerReplicasAfterScale(t *testing.T) { require_Equal(t, len(ci.Cluster.Replicas), 2) } -func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() +func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { + tests := []struct { + title string + onErrorCondition func(server *Server, mset *stream) + }{ + { + title: "TooManyRetries", + onErrorCondition: func(server *Server, mset *stream) { + // Too many retries while processing snapshot is considered a cluster reset. + // If a leader is temporarily unavailable we shouldn't blow away our state. + require_True(t, isClusterResetErr(errCatchupTooManyRetries)) + mset.resetClusteredState(errCatchupTooManyRetries) + }, + }, + { + title: "AbortedNoLeader", + onErrorCondition: func(server *Server, mset *stream) { + for _, n := range server.raftNodes { + rn := n.(*raft) + if rn.accName == "$G" { + rn.updateLeader(noLeader) + } + } - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + // Processing a snapshot while there's no leader elected is considered a cluster reset. + // If a leader is temporarily unavailable we shouldn't blow away our state. + var snap StreamReplicatedState + snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot + err := mset.processSnapshot(&snap) + require_True(t, errors.Is(err, errCatchupAbortedNoLeader)) + require_True(t, isClusterResetErr(err)) + mset.resetClusteredState(err) + }, + }, + } - si, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() - streamLeader := si.Cluster.Leader - streamLeaderServer := c.serverByName(streamLeader) - nc.Close() - nc, js = jsClientConnect(t, streamLeaderServer) - defer nc.Close() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() - servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { - return s == streamLeader - }) + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) - // Publish 10 messages. - for i := 0; i < 10; i++ { - pubAck, err := js.Publish("foo", []byte("ok")) - require_NoError(t, err) - require_Equal(t, pubAck.Sequence, uint64(i+1)) - } + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() - outdatedServerName := servers[0] - clusterResetServerName := servers[1] + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader + }) - outdatedServer := c.serverByName(outdatedServerName) - outdatedServer.Shutdown() - outdatedServer.WaitForShutdown() + // Publish 10 messages. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+1)) + } - // Publish 10 more messages, one server will be behind. - for i := 0; i < 10; i++ { - pubAck, err := js.Publish("foo", []byte("ok")) - require_NoError(t, err) - require_Equal(t, pubAck.Sequence, uint64(i+11)) - } + outdatedServerName := servers[0] + clusterResetServerName := servers[1] - // We will not need the client anymore. - nc.Close() + outdatedServer := c.serverByName(outdatedServerName) + outdatedServer.Shutdown() + outdatedServer.WaitForShutdown() - // Shutdown stream leader so one server remains. - streamLeaderServer.Shutdown() - streamLeaderServer.WaitForShutdown() + // Publish 10 more messages, one server will be behind. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+11)) + } - clusterResetServer := c.serverByName(clusterResetServerName) - acc, err := clusterResetServer.lookupAccount(globalAccountName) - require_NoError(t, err) - mset, err := acc.lookupStream("TEST") - require_NoError(t, err) + // We will not need the client anymore. + nc.Close() - // Too many retries while processing snapshot is considered a cluster reset. - // If a leader is temporarily unavailable we shouldn't blow away our state. - require_True(t, isClusterResetErr(errCatchupTooManyRetries)) - mset.resetClusteredState(errCatchupTooManyRetries) + // Shutdown stream leader so one server remains. + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() - // Stream leader stays offline, we only start the server with missing stream data. - // We expect that the reset server must not allow the outdated server to become leader, as that would result in desync. - c.restartServer(outdatedServer) - c.waitOnStreamLeader(globalAccountName, "TEST") + clusterResetServer := c.serverByName(clusterResetServerName) + acc, err := clusterResetServer.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + // Run error condition. + test.onErrorCondition(clusterResetServer, mset) - // Outdated server must NOT become the leader. - newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") - require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) + // Stream leader stays offline, we only start the server with missing stream data. + // We expect that the reset server must not allow the outdated server to become leader, as that would result in desync. + c.restartServer(outdatedServer) + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Outdated server must NOT become the leader. + newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") + require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) + }) + } } func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { diff --git a/server/store.go b/server/store.go index 867dc2c358..f31cf22083 100644 --- a/server/store.go +++ b/server/store.go @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool { var errFirstSequenceMismatch = errors.New("first sequence mismatch") func isClusterResetErr(err error) bool { - return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries + return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries } // Copy all fields.