Skip to content

Commit

Permalink
Fix desync after errCatchupAbortedNoLeader
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Oct 10, 2024
1 parent c6c5407 commit c391623
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 60 deletions.
5 changes: 3 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2848,7 +2848,7 @@ func (mset *stream) resetClusteredState(err error) bool {
}

if node != nil {
if err == errCatchupTooManyRetries {
if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries {
// Don't delete all state, could've just been temporarily unable to reach the leader.
node.Stop()
} else {
Expand Down Expand Up @@ -8276,6 +8276,7 @@ var (
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupAbortedNoLeader = errors.New("catchup failed, no leader")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

Expand Down Expand Up @@ -8379,7 +8380,7 @@ RETRY:
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name())
return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader: %w", mset.account(), mset.name(), errCatchupAbortedNoLeader)
}

// If we have a sub clear that here.
Expand Down
151 changes: 94 additions & 57 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4090,77 +4090,114 @@ func TestJetStreamClusterConsumerReplicasAfterScale(t *testing.T) {
require_Equal(t, len(ci.Cluster.Replicas), 2)
}

func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
tests := []struct {
title string
onErrorCondition func(server *Server, mset *stream)
}{
{
title: "TooManyRetries",
onErrorCondition: func(server *Server, mset *stream) {
// Too many retries while processing snapshot is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
require_True(t, isClusterResetErr(errCatchupTooManyRetries))
mset.resetClusteredState(errCatchupTooManyRetries)
},
},
{
title: "AbortedNoLeader",
onErrorCondition: func(server *Server, mset *stream) {
for _, n := range server.raftNodes {
rn := n.(*raft)
if rn.accName == "$G" {
rn.updateLeader(noLeader)
}
}

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Processing a snapshot while there's no leader elected is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
var snap StreamReplicatedState
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
err := mset.processSnapshot(&snap)
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
require_True(t, isClusterResetErr(err))
mset.resetClusteredState(err)
},
},
}

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Publish 10 messages.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+1))
}
streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()

outdatedServerName := servers[0]
clusterResetServerName := servers[1]
servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})

outdatedServer := c.serverByName(outdatedServerName)
outdatedServer.Shutdown()
outdatedServer.WaitForShutdown()
// Publish 10 messages.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+1))
}

// Publish 10 more messages, one server will be behind.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+11))
}
outdatedServerName := servers[0]
clusterResetServerName := servers[1]

// We will not need the client anymore.
nc.Close()
outdatedServer := c.serverByName(outdatedServerName)
outdatedServer.Shutdown()
outdatedServer.WaitForShutdown()

// Shutdown stream leader so one server remains.
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()
// Publish 10 more messages, one server will be behind.
for i := 0; i < 10; i++ {
pubAck, err := js.Publish("foo", []byte("ok"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, uint64(i+11))
}

clusterResetServer := c.serverByName(clusterResetServerName)
acc, err := clusterResetServer.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
// We will not need the client anymore.
nc.Close()

// Too many retries while processing snapshot is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
require_True(t, isClusterResetErr(errCatchupTooManyRetries))
mset.resetClusteredState(errCatchupTooManyRetries)
// Shutdown stream leader so one server remains.
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()

// Stream leader stays offline, we only start the server with missing stream data.
// We expect that the reset server must not allow the outdated server to become leader, as that would result in desync.
c.restartServer(outdatedServer)
c.waitOnStreamLeader(globalAccountName, "TEST")
clusterResetServer := c.serverByName(clusterResetServerName)
acc, err := clusterResetServer.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// Run error condition.
test.onErrorCondition(clusterResetServer, mset)

// Outdated server must NOT become the leader.
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
// Stream leader stays offline, we only start the server with missing stream data.
// We expect that the reset server must not allow the outdated server to become leader, as that would result in desync.
c.restartServer(outdatedServer)
c.waitOnStreamLeader(globalAccountName, "TEST")

// Outdated server must NOT become the leader.
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
})
}
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func isOutOfSpaceErr(err error) bool {
var errFirstSequenceMismatch = errors.New("first sequence mismatch")

func isClusterResetErr(err error) bool {
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries
return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries
}

// Copy all fields.
Expand Down

0 comments on commit c391623

Please sign in to comment.