diff --git a/server/consumer.go b/server/consumer.go index ebe97f65b5..a91c192544 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1563,6 +1563,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { return false } +const ( + defaultConsumerNotActiveStartInterval = 30 * time.Second + defaultConsumerNotActiveMaxInterval = 5 * time.Minute +) + +var ( + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval +) + func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1628,12 +1638,8 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - const ( - startInterval = 30 * time.Second - maxInterval = 5 * time.Minute - ) - jitter := time.Duration(rand.Int63n(int64(startInterval))) - interval := startInterval + jitter + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { @@ -1648,7 +1654,7 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - if interval < maxInterval { + if interval < consumerNotActiveMaxInterval { interval *= 2 ticker.Reset(interval) } diff --git a/server/filestore.go b/server/filestore.go index c17a4795e6..236ef62be4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -10023,14 +10023,22 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { // sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is // handled automatically by this function, so don't wrap calls to it in dios. func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { + if fs.fcfg.SyncAlways { + return writeFileWithSync(name, data, perm) + } <-dios defer func() { dios <- struct{}{} }() - flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC - if fs.fcfg.SyncAlways { - flags |= os.O_SYNC - } + return os.WriteFile(name, data, perm) +} + +func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { + <-dios + defer func() { + dios <- struct{}{} + }() + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC f, err := os.OpenFile(name, flags, perm) if err != nil { return err diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1ad3fa52f1..e8456c0948 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1431,10 +1431,6 @@ func (js *jetStream) monitorCluster() { aq.recycle(&ces) case isLeader = <-lch: - // For meta layer synchronize everyone to our state on becoming leader. - if isLeader && n.ApplyQ().len() == 0 { - n.SendSnapshot(js.metaSnapshot()) - } // Process the change. js.processLeaderChange(isLeader) if isLeader { @@ -2129,8 +2125,32 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor } // Check if we already have this assigned. +retry: if node := s.lookupRaftNode(rg.Name); node != nil { + if node.State() == Closed { + // We're waiting for this node to finish shutting down before we replace it. + js.mu.Unlock() + node.WaitForStop() + js.mu.Lock() + goto retry + } s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) + // Check and see if the group has the same peers. If not then we + // will update the known peers, which will send a peerstate if leader. + groupPeerIDs := append([]string{}, rg.Peers...) + var samePeers bool + if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) { + nodePeerIDs := make([]string, 0, len(nodePeers)) + for _, n := range nodePeers { + nodePeerIDs = append(nodePeerIDs, n.ID) + } + slices.Sort(groupPeerIDs) + slices.Sort(nodePeerIDs) + samePeers = slices.Equal(groupPeerIDs, nodePeerIDs) + } + if !samePeers { + node.UpdateKnownPeers(groupPeerIDs) + } rg.node = node js.mu.Unlock() return nil @@ -8959,17 +8979,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // mset.store never changes after being set, don't need lock. mset.store.FastState(&state) - // Reset notion of first if this request wants sequences before our starting sequence - // and we would have nothing to send. If we have partial messages still need to send skips for those. - // We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state. - if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq { - s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d", - mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq) - if state.LastSeq > sreq.LastSeq { - sreq.LastSeq = state.LastSeq - } - } - // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) @@ -9133,25 +9142,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - // Check for a condition where our state's first is now past the last that we could have sent. - // If so reset last and continue sending. - var state StreamState - mset.mu.RLock() - mset.store.FastState(&state) - mset.mu.RUnlock() - if last < state.FirstSeq { - last = state.LastSeq - } - // Recheck our exit condition. - if seq == last { - if drOk && dr.First > 0 { - sendDR() - } - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) - // EOF - s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) - return false - } + s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + // EOF + s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) + return false } select { case <-remoteQuitCh: diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 9a3d8abbb3..d24351a18a 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6647,12 +6647,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { return nil }) - // Make sure we only sent 1002 sync catchup msgs. - // This is for the new messages, the delete range, and the EOF. + // Make sure we only sent 2 sync catchup msgs. + // This is for the delete range, and the EOF. nmsgs, _, _ := sub.Pending() - if nmsgs != 1002 { - t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs) + if nmsgs != 2 { + t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs) } + + msg, err := sub.NextMsg(0) + require_NoError(t, err) + mbuf := msg.Data[1:] + dr, err := decodeDeleteRange(mbuf) + require_NoError(t, err) + require_Equal(t, dr.First, 1001) + require_Equal(t, dr.Num, 1000) + + msg, err = sub.NextMsg(0) + require_NoError(t, err) + require_Equal(t, len(msg.Data), 0) } func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index cfd7596a76..a5b32c0e7b 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { } func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { time.Sleep(2 * time.Second) // Restart first and wait so that we know it will try cleanup without a metaleader. + // It will fail as there's no metaleader at that time, it should keep retrying on an interval. c.restartServer(rs) time.Sleep(time.Second) @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { defer nc.Close() subj := fmt.Sprintf(JSApiConsumerListT, "TEST") - checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { - m, err := nc.Request(subj, nil, time.Second) + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + // Request will take at most 4 seconds if some consumers can't be found. + m, err := nc.Request(subj, nil, 5*time.Second) if err != nil { return err } @@ -3910,6 +3917,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) { node.InstallSnapshot(mset.stateSnapshot()) // Stop the stream mset.stop(false, false) + node.WaitForStop() if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 { t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes) @@ -5801,6 +5809,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { // Should only be meta NRG left. require_True(t, s.numRaftNodes() == 1) + s.rnMu.RLock() + defer s.rnMu.RUnlock() require_True(t, s.lookupRaftNode(sgn) == nil) require_True(t, s.lookupRaftNode(ogn) == nil) } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index a8d5505c68..fe587d1d05 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,10 +18,11 @@ package server import ( "context" - "encoding/binary" "encoding/json" "errors" "fmt" + "io" + "io/fs" "math/rand" "os" "path" @@ -2465,85 +2466,6 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) { c.waitOnClusterReady() c.waitOnAllCurrent() - getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { - t.Helper() - srv := c.streamLeader(accountName, streamName) - if srv == nil { - return nil - } - jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) - require_NoError(t, err) - for _, acc := range jsz.AccountDetails { - if acc.Name == accountName { - for _, stream := range acc.Streams { - if stream.Name == streamName { - return &stream - } - } - } - } - t.Error("Could not find account details") - return nil - } - - checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { - t.Helper() - - leaderSrv := c.streamLeader(accountName, streamName) - if leaderSrv == nil { - return fmt.Errorf("no leader server found for stream %q", streamName) - } - streamLeader := getStreamDetails(t, c, accountName, streamName) - if streamLeader == nil { - return fmt.Errorf("no leader found for stream %q", streamName) - } - var errs []error - for _, srv := range c.servers { - if srv == leaderSrv { - // Skip self - continue - } - acc, err := srv.LookupAccount(accountName) - require_NoError(t, err) - stream, err := acc.lookupStream(streamName) - require_NoError(t, err) - state := stream.state() - - if state.Msgs != streamLeader.State.Msgs { - err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", - streamName, leaderSrv, streamLeader.State.Msgs, - srv, state.Msgs, - ) - errs = append(errs, err) - } - if state.FirstSeq != streamLeader.State.FirstSeq { - err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.FirstSeq, - srv, state.FirstSeq, - ) - errs = append(errs, err) - } - if state.LastSeq != streamLeader.State.LastSeq { - err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.LastSeq, - srv, state.LastSeq, - ) - errs = append(errs, err) - } - if state.NumDeleted != streamLeader.State.NumDeleted { - err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.NumDeleted, - srv, state.NumDeleted, - ) - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil - } - err = checkState(t, c, "$G", "KV_inconsistency") require_NoError(t, err) } @@ -2602,84 +2524,6 @@ func TestJetStreamClusterKeyValueSync(t *testing.T) { var counter int64 var errorCounter int64 - getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { - t.Helper() - srv := c.streamLeader(accountName, streamName) - if srv == nil { - return nil - } - jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) - require_NoError(t, err) - for _, acc := range jsz.AccountDetails { - if acc.Name == accountName { - for _, stream := range acc.Streams { - if stream.Name == streamName { - return &stream - } - } - } - } - t.Error("Could not find account details") - return nil - } - checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { - t.Helper() - - leaderSrv := c.streamLeader(accountName, streamName) - if leaderSrv == nil { - return fmt.Errorf("no leader server found for stream %q", streamName) - } - streamLeader := getStreamDetails(t, c, accountName, streamName) - if streamLeader == nil { - return fmt.Errorf("no leader found for stream %q", streamName) - } - var errs []error - for _, srv := range c.servers { - if srv == leaderSrv { - // Skip self - continue - } - acc, err := srv.LookupAccount(accountName) - require_NoError(t, err) - stream, err := acc.lookupStream(streamName) - require_NoError(t, err) - state := stream.state() - - if state.Msgs != streamLeader.State.Msgs { - err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", - streamName, leaderSrv, streamLeader.State.Msgs, - srv, state.Msgs, - ) - errs = append(errs, err) - } - if state.FirstSeq != streamLeader.State.FirstSeq { - err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.FirstSeq, - srv, state.FirstSeq, - ) - errs = append(errs, err) - } - if state.LastSeq != streamLeader.State.LastSeq { - err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.LastSeq, - srv, state.LastSeq, - ) - errs = append(errs, err) - } - if state.NumDeleted != streamLeader.State.NumDeleted { - err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n", - streamName, leaderSrv, streamLeader.State.NumDeleted, - srv, state.NumDeleted, streamLeader.State, state, - ) - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil - } - checkMsgsEqual := func(t *testing.T, accountName, streamName string) error { // Gather all the streams replicas and compare contents. msets := make(map[*Server]*stream) @@ -3727,9 +3571,7 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { for _, n := range server.raftNodes { rn := n.(*raft) if rn.accName == "$G" { - rn.Lock() rn.updateLeader(noLeader) - rn.Unlock() } } @@ -3819,6 +3661,191 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { } } +func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + _, _, applied := mset.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + err = mset.node.InstallSnapshot(mset.stateSnapshotLocked()) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + ss, err := DecodeStreamState(snap.data) + require_NoError(t, err) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + } + snap, err := mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being stored, but not calling Applied yet. + err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano()) + require_NoError(t, err) + + // Simulate the stream being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = mset.stop(false, false) + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} + +func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + err = msgs[0].AckSync() + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + o := mset.lookupConsumer("CONSUMER") + if o == nil { + return errors.New("consumer not found") + } + _, _, applied := o.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + snapBytes, err := o.store.EncodedState() + require_NoError(t, err) + err = o.node.InstallSnapshot(snapBytes) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + state, err := decodeConsumerState(snap.data) + require_NoError(t, err) + require_Equal(t, state.Delivered.Consumer, 1) + require_Equal(t, state.Delivered.Stream, 1) + } + snap, err := o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being delivered, but not calling Applied yet. + err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano()) + require_NoError(t, err) + + // Simulate the consumer being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = o.stop() + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} + func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -3836,7 +3863,6 @@ func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) // Reconnect to the leader. leader := c.streamLeader(globalAccountName, "TEST") nc.Close() - nc, js = jsClientConnect(t, leader) defer nc.Close() @@ -3994,215 +4020,144 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } -func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { +func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() - nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ + si, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, Replicas: 3, }) require_NoError(t, err) - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ - Durable: "CONSUMER", - Replicas: 3, - AckPolicy: nats.AckExplicitPolicy, + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader }) - require_NoError(t, err) - - // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1. - _, err = js.Publish("foo", nil) - require_NoError(t, err) - sub, err := js.PullSubscribe("foo", "CONSUMER") - require_NoError(t, err) - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - err = msgs[0].AckSync() - require_NoError(t, err) - - // We don't need the client anymore. + // Stop followers so further publishes will not have quorum. + followerName1 := servers[0] + followerName2 := servers[1] + followerServer1 := c.serverByName(followerName1) + followerServer2 := c.serverByName(followerName2) + followerServer1.Shutdown() + followerServer2.Shutdown() + followerServer1.WaitForShutdown() + followerServer2.WaitForShutdown() + // Although this request will time out, it will be added to the stream leader's WAL. + _, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second)) + require_NotNil(t, err) + require_Equal(t, err, nats.ErrTimeout) + // Now shut down the leader as well. nc.Close() - - lookupConsumer := func(s *Server) *consumer { - t.Helper() - mset, err := s.lookupAccount(globalAccountName) - require_NoError(t, err) - acc, err := mset.lookupStream("TEST") - require_NoError(t, err) - o := acc.lookupConsumer("CONSUMER") - require_NotNil(t, o) - return o - } - - // Grab current consumer leader before moving all into observer mode. - cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") - for _, s := range c.servers { - // Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader. - o := lookupConsumer(s) - o.node.SetObserver(true) - if s != cl { - // For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot. - err = o.node.PauseApply() - require_NoError(t, err) - } - } - - updateDeliveredBuffer := func() []byte { - var b [4*binary.MaxVarintLen64 + 1]byte - b[0] = byte(updateDeliveredOp) - n := 1 - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 1) - n += binary.PutVarint(b[n:], time.Now().UnixNano()) - return b[:n] - } - - updateAcksBuffer := func() []byte { - var b [2*binary.MaxVarintLen64 + 1]byte - b[0] = byte(updateAcksOp) - n := 1 - n += binary.PutUvarint(b[n:], 100) - n += binary.PutUvarint(b[n:], 100) - return b[:n] - } - - // Store an uncommitted entry into our WAL, which will be committed and applied later. - co := lookupConsumer(cl) - rn := co.node.(*raft) - rn.Lock() - entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} - ae := encode(t, rn.buildAppendEntry(entries)) - err = rn.storeToWAL(ae) - minPindex := rn.pindex - rn.Unlock() + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() + // Only restart the (previous) followers. + followerServer1 = c.restartServer(followerServer1) + c.restartServer(followerServer2) + c.waitOnStreamLeader(globalAccountName, "TEST") + nc, js = jsClientConnect(t, followerServer1) + defer nc.Close() + // Publishing a message will now have quorum. + pubAck, err := js.Publish("foo", []byte("first, this is a retry")) require_NoError(t, err) - - // Simulate leader change, we do this so we can check what happens in the upper layer logic. - rn.leadc <- true - rn.SetObserver(false) - - // Since upper layer is async, we don't know whether it will or will not act on the leader change. - // Wait for some time to check if it does. - time.Sleep(2 * time.Second) - rn.RLock() - maxPindex := rn.pindex - rn.RUnlock() - - r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") - ro := lookupConsumer(r) - rn = ro.node.(*raft) - - checkFor(t, 5*time.Second, time.Second, func() error { - rn.RLock() - defer rn.RUnlock() - if rn.pindex < maxPindex { - return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex) - } - return nil - }) - - // We should only have 'Normal' entries. - // If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state. - var state StreamState - rn.wal.FastState(&state) - for seq := minPindex; seq <= maxPindex; seq++ { - ae, err = rn.loadEntry(seq) + require_Equal(t, pubAck.Sequence, 1) + // Bring up the previous stream leader. + c.restartServer(streamLeaderServer) + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + // Check all servers ended up with the last published message, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) require_NoError(t, err) - for _, entry := range ae.entries { - require_Equal(t, entry.Type, EntryNormal) - } + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.Bytes, 55) } } -func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { +func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() - nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Retention: nats.WorkQueuePolicy, - Replicas: 3, + Name: "TEST", + Subjects: []string{"foo.>"}, + Replicas: 3, }) + nc.Close() require_NoError(t, err) - - _, err = js.Publish("foo", nil) - require_NoError(t, err) - - // Wait for all servers to have applied everything. - var maxApplied uint64 - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { - maxApplied = 0 - for _, s := range c.servers { - acc, err := s.lookupAccount(globalAccountName) - if err != nil { - return err - } - mset, err := acc.lookupStream("TEST") - if err != nil { - return err - } - _, _, applied := mset.node.Progress() - if maxApplied == 0 { - maxApplied = applied - } else if applied < maxApplied { - return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) - } else if applied > maxApplied { - return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + // Pick one server that will only store a part of the messages in its WAL. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + ts := time.Now().UnixNano() + // Manually add 3 append entries to each node's WAL, except for one node who is one behind. + var scratch [1024]byte + for _, s := range c.servers { + for _, n := range s.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + for i := uint64(0); i < 3; i++ { + // One server will be one behind and need to catchup. + if s.Name() == rs.Name() && i >= 2 { + break + } + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + rn.Lock() + ae := rn.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = rn.storeToWAL(ae) + rn.Unlock() + require_NoError(t, err) + } } } - return nil - }) - - // Install a snapshot on a follower. - s := c.randomNonStreamLeader(globalAccountName, "TEST") - acc, err := s.lookupAccount(globalAccountName) - require_NoError(t, err) - mset, err := acc.lookupStream("TEST") - require_NoError(t, err) - err = mset.node.InstallSnapshot(mset.stateSnapshotLocked()) - require_NoError(t, err) - - // Validate the snapshot reflects applied. - validateStreamState := func(snap *snapshot) { - t.Helper() - require_Equal(t, snap.lastIndex, maxApplied) - ss, err := DecodeStreamState(snap.data) + } + // Restart all. + c.stopAll() + c.restartAll() + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + rs = c.serverByName(rs.Name()) + // Check all servers ended up with all published messages, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) require_NoError(t, err) - require_Equal(t, ss.FirstSeq, 1) - require_Equal(t, ss.LastSeq, 1) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + require_Equal(t, state.Msgs, 3) + require_Equal(t, state.Bytes, 99) + } + // Check that the first two published messages came from our WAL, and + // the last came from a catchup by another leader. + for _, n := range rs.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + ae, err := rn.loadEntry(2) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + ae, err = rn.loadEntry(3) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + ae, err = rn.loadEntry(4) + require_NoError(t, err) + require_True(t, ae.leader != rn.ID()) + } } - snap, err := mset.node.(*raft).loadLastSnapshot() - require_NoError(t, err) - validateStreamState(snap) - - // Simulate a message being stored, but not calling Applied yet. - err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano()) - require_NoError(t, err) - - // Simulate the stream being stopped before we're able to call Applied. - // If we'd install a snapshot during this, which would be a race condition, - // we'd store a snapshot with state that's ahead of applied. - err = mset.stop(false, false) - require_NoError(t, err) - - // Validate the snapshot is the same as before. - snap, err = mset.node.(*raft).loadLastSnapshot() - require_NoError(t, err) - validateStreamState(snap) } -func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { +func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -4210,98 +4165,69 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { defer nc.Close() _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Retention: nats.WorkQueuePolicy, - Replicas: 3, - }) - require_NoError(t, err) - - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ - Durable: "CONSUMER", - Replicas: 3, - AckPolicy: nats.AckExplicitPolicy, + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, }) require_NoError(t, err) - // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up. - _, err = js.Publish("foo", nil) - require_NoError(t, err) - sub, err := js.PullSubscribe("foo", "CONSUMER") - require_NoError(t, err) - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - err = msgs[0].AckSync() - require_NoError(t, err) - - // Wait for all servers to have applied everything. - var maxApplied uint64 - checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { - maxApplied = 0 - for _, s := range c.servers { - acc, err := s.lookupAccount(globalAccountName) + copyDir := func(dst, src string) error { + srcFS := os.DirFS(src) + return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { if err != nil { return err } - mset, err := acc.lookupStream("TEST") + newPath := path.Join(dst, p) + if d.IsDir() { + return os.MkdirAll(newPath, defaultDirPerms) + } + r, err := srcFS.Open(p) if err != nil { return err } - o := mset.lookupConsumer("CONSUMER") - if o == nil { - return errors.New("consumer not found") - } - _, _, applied := o.node.Progress() - if maxApplied == 0 { - maxApplied = applied - } else if applied < maxApplied { - return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) - } else if applied > maxApplied { - return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + defer r.Close() + + w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) + if err != nil { + return err } - } - return nil - }) + defer w.Close() + _, err = io.Copy(w, r) + return err + }) + } - // Install a snapshot on a follower. - s := c.randomNonStreamLeader(globalAccountName, "TEST") - acc, err := s.lookupAccount(globalAccountName) - require_NoError(t, err) - mset, err := acc.lookupStream("TEST") - require_NoError(t, err) - o := mset.lookupConsumer("CONSUMER") - require_NotNil(t, o) - snapBytes, err := o.store.EncodedState() - require_NoError(t, err) - err = o.node.InstallSnapshot(snapBytes) - require_NoError(t, err) + // Simulate being hard killed by: + // 1. copy directories before shutdown + copyToSrcMap := make(map[string]string) + for _, s := range c.servers { + sd := s.StoreDir() + copySd := path.Join(t.TempDir(), JetStreamStoreDir) + err = copyDir(copySd, sd) + require_NoError(t, err) + copyToSrcMap[copySd] = sd + } - // Validate the snapshot reflects applied. - validateStreamState := func(snap *snapshot) { - t.Helper() - require_Equal(t, snap.lastIndex, maxApplied) - state, err := decodeConsumerState(snap.data) + // 2. stop all + nc.Close() + c.stopAll() + + // 3. revert directories to before shutdown + for cp, dest := range copyToSrcMap { + err = os.RemoveAll(dest) + require_NoError(t, err) + err = copyDir(dest, cp) require_NoError(t, err) - require_Equal(t, state.Delivered.Consumer, 1) - require_Equal(t, state.Delivered.Stream, 1) } - snap, err := o.node.(*raft).loadLastSnapshot() - require_NoError(t, err) - validateStreamState(snap) - // Simulate a message being delivered, but not calling Applied yet. - err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano()) - require_NoError(t, err) + // 4. restart + c.restartAll() + c.waitOnAllCurrent() - // Simulate the consumer being stopped before we're able to call Applied. - // If we'd install a snapshot during this, which would be a race condition, - // we'd store a snapshot with state that's ahead of applied. - err = o.stop() - require_NoError(t, err) + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() - // Validate the snapshot is the same as before. - snap, err = o.node.(*raft).loadLastSnapshot() + // Stream should exist still and not be removed after hard killing all servers, so expect no error. + _, err = js.StreamInfo("TEST") require_NoError(t, err) - validateStreamState(snap) } diff --git a/server/norace_test.go b/server/norace_test.go index e9cf7be692..a0bfac991d 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { } func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "GHOST", 3) defer c.shutdown() diff --git a/server/raft.go b/server/raft.go index 267297ab23..aaf8a8d14f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -74,8 +74,8 @@ type RaftNode interface { QuitC() <-chan struct{} Created() time.Time Stop() + WaitForStop() Delete() - Wipe() } type WAL interface { @@ -127,11 +127,12 @@ func (state RaftState) String() string { type raft struct { sync.RWMutex - created time.Time // Time that the group was created - accName string // Account name of the asset this raft group is for - group string // Raft group - sd string // Store directory - id string // Node ID + created time.Time // Time that the group was created + accName string // Account name of the asset this raft group is for + group string // Raft group + sd string // Store directory + id string // Node ID + wg sync.WaitGroup // Wait for running goroutines to exit on shutdown wal WAL // WAL store (filestore or memstore) wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage @@ -198,15 +199,14 @@ type raft struct { hcommit uint64 // The commit at the time that applies were paused pobserver bool // Whether we were an observer at the time that applies were paused - prop *ipQueue[*proposedEntry] // Proposals - entry *ipQueue[*appendEntry] // Append entries - resp *ipQueue[*appendEntryResponse] // Append entries responses - apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) - reqs *ipQueue[*voteRequest] // Vote requests - votes *ipQueue[*voteResponse] // Vote responses - stepdown *ipQueue[string] // Stepdown requests - leadc chan bool // Leader changes - quit chan struct{} // Raft group shutdown + prop *ipQueue[*proposedEntry] // Proposals + entry *ipQueue[*appendEntry] // Append entries + resp *ipQueue[*appendEntryResponse] // Append entries responses + apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) + reqs *ipQueue[*voteRequest] // Vote requests + votes *ipQueue[*voteResponse] // Vote responses + leadc chan bool // Leader changes + quit chan struct{} // Raft group shutdown } type proposedEntry struct { @@ -396,7 +396,6 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), - stepdown: newIPQueue[string](s, qpfx+"stepdown"), accName: accName, leadc: make(chan bool, 32), observer: cfg.Observer, @@ -482,11 +481,6 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel } } } - } else if n.pterm == 0 && n.pindex == 0 { - // We have recovered no state, either through our WAL or snapshots, - // so inherit from term from our tav.idx file and pindex from our last sequence. - n.pterm = n.term - n.pindex = state.LastSeq } // Make sure to track ourselves. @@ -504,7 +498,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.createInternalSubs(); err != nil { - n.shutdown(false) + n.shutdown() return nil, err } @@ -541,6 +535,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } // Start the run goroutine for the Raft state machine. + n.wg.Add(1) s.startGoRoutine(n.run, labels) return n, nil @@ -593,8 +588,8 @@ func (s *Server) unregisterRaftNode(group string) { // Returns how many Raft nodes are running in this server instance. func (s *Server) numRaftNodes() int { - s.rnMu.Lock() - defer s.rnMu.Unlock() + s.rnMu.RLock() + defer s.rnMu.RUnlock() return len(s.raftNodes) } @@ -884,7 +879,7 @@ func (n *raft) PauseApply() error { // If we are currently a candidate make sure we step down. if n.State() == Candidate { - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) } n.debug("Pausing our apply channel") @@ -1047,31 +1042,23 @@ func (n *raft) InstallSnapshot(data []byte) error { } if n.applied == 0 { + n.debug("Not snapshotting as there are no applied entries") return errNoSnapAvailable } - n.debug("Installing snapshot of %d bytes", len(data)) - - var term uint64 + term := n.pterm if ae, _ := n.loadEntry(n.applied); ae != nil { - // Use the term from the most recently applied entry if possible. term = ae.term - } else if ae, _ = n.loadFirstEntry(); ae != nil { - // Otherwise see if we can find the term from the first entry. - term = ae.term - } else { - // Last resort is to use the last pterm that we knew of. - term = n.pterm } - snap := &snapshot{ + n.debug("Installing snapshot of %d bytes", len(data)) + + return n.installSnapshot(&snapshot{ lastTerm: term, lastIndex: n.applied, peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), data: data, - } - - return n.installSnapshot(snap) + }) } // Install the snapshot. @@ -1081,11 +1068,7 @@ func (n *raft) installSnapshot(snap *snapshot) error { sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) sfile := filepath.Join(snapDir, sn) - <-dios - err := os.WriteFile(sfile, n.encodeSnapshot(snap), defaultFilePerms) - dios <- struct{}{} - - if err != nil { + if err := writeFileWithSync(sfile, n.encodeSnapshot(snap), defaultFilePerms); err != nil { // We could set write err here, but if this is a temporary situation, too many open files etc. // we want to retry and snapshots are not fatal. return err @@ -1272,6 +1255,21 @@ func (n *raft) Leader() bool { return n.State() == Leader } +// stepdown immediately steps down the Raft node to the +// follower state. This will take the lock itself. +func (n *raft) stepdown(newLeader string) { + n.Lock() + defer n.Unlock() + n.stepdownLocked(newLeader) +} + +// stepdownLocked immediately steps down the Raft node to the +// follower state. This requires the lock is already held. +func (n *raft) stepdownLocked(newLeader string) { + n.debug("Stepping down") + n.switchToFollowerLocked(newLeader) +} + // isCatchingUp returns true if a catchup is currently taking place. func (n *raft) isCatchingUp() bool { n.RLock() @@ -1479,8 +1477,6 @@ func (n *raft) StepDown(preferred ...string) error { n.vote = noVote n.writeTermVote() - stepdown := n.stepdown - prop := n.prop n.Unlock() if len(preferred) > 0 && maybeLeader == noLeader { @@ -1488,15 +1484,18 @@ func (n *raft) StepDown(preferred ...string) error { } // If we have a new leader selected, transfer over to them. + // Send the append entry directly rather than via the proposals queue, + // as we will switch to follower state immediately and will blow away + // the contents of the proposal queue in the process. if maybeLeader != noLeader { - n.debug("Selected %q for new leader", maybeLeader) - prop.push(newProposedEntry(newEntry(EntryLeaderTransfer, []byte(maybeLeader)), _EMPTY_)) - } else { - // Force us to stepdown here. - n.debug("Stepping down") - stepdown.push(noLeader) + n.debug("Selected %q for new leader, stepping down due to leadership transfer", maybeLeader) + ae := newEntry(EntryLeaderTransfer, []byte(maybeLeader)) + n.sendAppendEntry([]*Entry{ae}) } + // Force us to stepdown here. + n.stepdown(noLeader) + return nil } @@ -1625,95 +1624,35 @@ func (n *raft) Created() time.Time { } func (n *raft) Stop() { - n.shutdown(false) + n.shutdown() } -func (n *raft) Delete() { - n.shutdown(true) -} - -func (n *raft) shutdown(shouldDelete bool) { - n.Lock() - - // Returned swap value is the previous state. It looks counter-intuitive - // to do this atomic operation with the lock held, but we have to do so in - // order to make sure that switchState() is not already running. If it is - // then it can potentially update the n.state back to a non-closed state, - // allowing shutdown() to be called again. If that happens then the below - // close(n.quit) will panic from trying to close an already-closed channel. - if n.state.Swap(int32(Closed)) == int32(Closed) { - // If we get called again with shouldDelete, in case we were called first with Stop() cleanup - if shouldDelete { - if wal := n.wal; wal != nil { - wal.Delete() - } - os.RemoveAll(n.sd) - } - n.Unlock() - return - } - - close(n.quit) - if c := n.c; c != nil { - var subs []*subscription - c.mu.Lock() - for _, sub := range c.subs { - subs = append(subs, sub) - } - c.mu.Unlock() - for _, sub := range subs { - n.unsubscribe(sub) - } - c.closeConnection(InternalClient) - n.c = nil - } - - s, g, wal := n.s, n.group, n.wal - - // Unregistering ipQueues do not prevent them from push/pop - // just will remove them from the central monitoring map - queues := []interface { - unregister() - drain() - }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} - for _, q := range queues { - q.drain() - q.unregister() +func (n *raft) WaitForStop() { + if n.state.Load() == int32(Closed) { + n.wg.Wait() } - sd := n.sd - n.Unlock() +} - s.unregisterRaftNode(g) +func (n *raft) Delete() { + n.shutdown() + n.wg.Wait() - if wal != nil { - if shouldDelete { - wal.Delete() - } else { - wal.Stop() - } - } + n.Lock() + defer n.Unlock() - if shouldDelete { - // Delete all our peer state and vote state and any snapshots. - os.RemoveAll(sd) - n.debug("Deleted") - } else { - n.debug("Shutdown") + if wal := n.wal; wal != nil { + wal.Delete() } + os.RemoveAll(n.sd) + n.debug("Deleted") } -// Wipe will force an on disk state reset and then call Delete(). -// Useful in case we have been stopped before this point. -func (n *raft) Wipe() { - n.RLock() - wal := n.wal - n.RUnlock() - // Delete our underlying storage. - if wal != nil { - wal.Delete() +func (n *raft) shutdown() { + // First call to Stop or Delete should close the quit chan + // to notify the runAs goroutines to stop what they're doing. + if n.state.Swap(int32(Closed)) != int32(Closed) { + close(n.quit) } - // Now call delete. - n.Delete() } const ( @@ -1834,6 +1773,7 @@ func (n *raft) resetElectWithLock(et time.Duration) { func (n *raft) run() { s := n.s defer s.grWG.Done() + defer n.wg.Done() // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before @@ -1866,6 +1806,7 @@ func (n *raft) run() { // Send nil entry to signal the upper layers we are done doing replay/restore. n.apply.push(nil) +runner: for s.isRunning() { switch n.State() { case Follower: @@ -1875,9 +1816,47 @@ func (n *raft) run() { case Leader: n.runAsLeader() case Closed: - return + break runner } } + + // If we've reached this point then we're shutting down, either because + // the server is stopping or because the Raft group is closing/closed. + n.Lock() + defer n.Unlock() + + if c := n.c; c != nil { + var subs []*subscription + c.mu.Lock() + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + n.c = nil + } + + // Unregistering ipQueues do not prevent them from push/pop + // just will remove them from the central monitoring map + queues := []interface { + unregister() + drain() + }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} + for _, q := range queues { + q.drain() + q.unregister() + } + + n.s.unregisterRaftNode(n.group) + + if wal := n.wal; wal != nil { + wal.Stop() + } + + n.debug("Shutdown") } func (n *raft) debug(format string, args ...any) { @@ -1963,7 +1942,7 @@ func (n *raft) processAppendEntries() { // runAsFollower is called by run and will block for as long as the node is // running in the follower state. func (n *raft) runAsFollower() { - for { + for n.State() == Follower { elect := n.electTimer() select { @@ -1972,7 +1951,6 @@ func (n *raft) runAsFollower() { n.processAppendEntries() case <-n.s.quitCh: // The server is shutting down. - n.shutdown(false) return case <-n.quit: // The Raft node is shutting down. @@ -2005,22 +1983,17 @@ func (n *raft) runAsFollower() { n.debug("Ignoring old vote response, we have stepped down") n.votes.popOne() case <-n.resp.ch: - // We're receiving append entry responses from the network, probably because - // we have only just stepped down and they were already in flight. Ignore them. - n.resp.popOne() + // Ignore append entry responses received from before the state change. + n.resp.drain() + case <-n.prop.ch: + // Ignore proposals received from before the state change. + n.prop.drain() case <-n.reqs.ch: // We've just received a vote request from the network. // Because of drain() it is possible that we get nil from popOne(). if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - // We've received a stepdown request, start following the new leader if - // we can. - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } } } } @@ -2376,7 +2349,7 @@ func (n *raft) runAsLeader() { fsub, err := n.subscribe(psubj, n.handleForwardedProposal) if err != nil { n.warn("Error subscribing to forwarded proposals: %v", err) - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() return } @@ -2384,7 +2357,7 @@ func (n *raft) runAsLeader() { if err != nil { n.warn("Error subscribing to forwarded remove peer proposals: %v", err) n.unsubscribe(fsub) - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() return } @@ -2410,7 +2383,6 @@ func (n *raft) runAsLeader() { for n.State() == Leader { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -2431,15 +2403,6 @@ func (n *raft) runAsLeader() { n.doRemovePeerAsLeader(string(b.Data)) } entries = append(entries, b.Entry) - // If this is us sending out a leadership transfer stepdown inline here. - if b.Type == EntryLeaderTransfer { - // Send out what we have and switch to follower. - n.sendAppendEntry(entries) - n.prop.recycle(&es) - n.debug("Stepping down due to leadership transfer") - n.switchToFollower(noLeader) - return - } // Increment size. sz += len(b.Data) + 1 // If below thresholds go ahead and send. @@ -2470,7 +2433,7 @@ func (n *raft) runAsLeader() { } case <-lq.C: if n.lostQuorum() { - n.switchToFollower(noLeader) + n.stepdown(noLeader) return } case <-n.votes.ch: @@ -2480,7 +2443,7 @@ func (n *raft) runAsLeader() { continue } if vresp.term > n.Term() { - n.switchToFollower(noLeader) + n.stepdown(noLeader) return } n.trackPeer(vresp.peer) @@ -2489,11 +2452,6 @@ func (n *raft) runAsLeader() { if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } case <-n.entry.ch: n.processAppendEntries() } @@ -2627,7 +2585,6 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 for n.Leader() { select { case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -2664,7 +2621,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) { snap, err := n.loadLastSnapshot() if err != nil { // We need to stepdown here when this happens. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) // We need to reset our state here as well. n.resetWAL() return 0, err @@ -2730,7 +2687,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err) if err == ErrStoreEOF { // If we are here we are seeing a request for an item beyond our state, meaning we should stepdown. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() arPool.Put(ar) return @@ -2742,7 +2699,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { // If we are here we are seeing a request for an item we do not have, meaning we should stepdown. // This is possible on a reset of our WAL but the other side has a snapshot already. // If we do not stepdown this can cycle. - n.stepdown.push(noLeader) + n.stepdownLocked(noLeader) n.Unlock() arPool.Put(ar) return @@ -2756,7 +2713,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.progress[ar.peer] = indexUpdates n.Unlock() - n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) }) + n.wg.Add(1) + n.s.startGoRoutine(func() { + defer n.wg.Done() + n.runCatchup(ar, indexUpdates) + }) } func (n *raft) loadEntry(index uint64) (*appendEntry, error) { @@ -2795,7 +2756,7 @@ func (n *raft) applyCommit(index uint64) error { if err != ErrStoreClosed && err != ErrStoreEOF { n.warn("Got an error loading %d index: %v - will reset", index, err) if n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdownLocked(n.selectNextLeader()) } // Reset and cancel any catchup. n.resetWAL() @@ -2872,7 +2833,7 @@ func (n *raft) applyCommit(index uint64) error { // If this is us and we are the leader we should attempt to stepdown. if peer == n.id && n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdownLocked(n.selectNextLeader()) } // Remove from string intern map. @@ -3003,16 +2964,18 @@ func (n *raft) runAsCandidate() { n.ID(): {}, } - for { + for n.State() == Candidate { elect := n.electTimer() select { case <-n.entry.ch: n.processAppendEntries() case <-n.resp.ch: - // Ignore - n.resp.popOne() + // Ignore append entry responses received from before the state change. + n.resp.drain() + case <-n.prop.ch: + // Ignore proposals received from before the state change. + n.prop.drain() case <-n.s.quitCh: - n.shutdown(false) return case <-n.quit: return @@ -3046,8 +3009,8 @@ func (n *raft) runAsCandidate() { n.term = vresp.term n.vote = noVote n.writeTermVote() - n.stepdown.push(noLeader) n.lxfer = false + n.stepdownLocked(noLeader) n.Unlock() } case <-n.reqs.ch: @@ -3055,11 +3018,6 @@ func (n *raft) runAsCandidate() { if voteReq, ok := n.reqs.popOne(); ok { n.processVoteRequest(voteReq) } - case <-n.stepdown.ch: - if newLeader, ok := n.stepdown.popOne(); ok { - n.switchToFollower(newLeader) - return - } } } } @@ -3171,7 +3129,7 @@ func (n *raft) truncateWAL(term, index uint64) { } } // Set after we know we have truncated properly. - n.term, n.pterm, n.pindex = term, term, index + n.pterm, n.pindex = term, index } // Reset our WAL. This is equivalent to truncating all data from the log. @@ -3220,7 +3178,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.writeTermVote() } n.debug("Received append entry from another leader, stepping down to %q", ae.leader) - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } else { // Let them know we are the leader. ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) @@ -3237,19 +3195,18 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // another node has taken on the leader role already, so we should convert // to a follower of that node instead. if n.State() == Candidate { - // Ignore old terms, otherwise we might end up stepping down incorrectly. - // Needs to be ahead of our pterm (last log index), as an isolated node - // could have bumped its vote term up considerably past this point. - if ae.term >= n.pterm { + // If we have a leader in the current term or higher, we should stepdown, + // write the term and vote if the term of the request is higher. + if ae.term >= n.term { // If the append entry term is newer than the current term, erase our // vote. if ae.term > n.term { + n.term = ae.term n.vote = noVote + n.writeTermVote() } n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader) - n.term = ae.term - n.writeTermVote() - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } } @@ -3304,7 +3261,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If this term is greater than ours. if ae.term > n.term { - n.pterm = ae.pterm n.term = ae.term n.vote = noVote if isNew { @@ -3312,8 +3268,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } if n.State() != Follower { n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) - n.stepdown.push(ae.leader) + n.stepdownLocked(ae.leader) } + } else if ae.term < n.term && !catchingUp && isNew { + n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, ae.term) + ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) + n.Unlock() + n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) + arPool.Put(ar) + return } if isNew && n.leader != ae.leader && n.State() == Follower { @@ -3324,29 +3287,44 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } - if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { +RETRY: + if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { - n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse - var success bool - if eae, _ := n.loadEntry(ae.pindex); eae == nil { + + if ae.pindex < n.commit { + // If we have already committed this entry, just mark success. + success = true + } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { success = true + } else if ae.pindex == n.pindex { + // Check if only our terms do not match here. + // Make sure pterms match and we take on the leader's. + // This prevents constant spinning. + n.truncateWAL(ae.pterm, ae.pindex) } else { n.resetWAL() } + } else if eae.term == ae.pterm { + // If terms match we can delete all entries past this one, and then continue storing the current entry. + n.truncateWAL(eae.term, eae.pindex+1) + goto RETRY } else { - // If terms mismatched, or we got an error loading, delete that entry and all others past it. + // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) } - // Cancel regardless. - n.cancelCatchup() + // Cancel regardless if unsuccessful. + if !success { + n.cancelCatchup() + } // Create response. ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) @@ -3369,16 +3347,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } - // Check if only our terms do not match here. - if ae.pindex == n.pindex { - // Make sure pterms match and we take on the leader's. - // This prevents constant spinning. - n.truncateWAL(ae.pterm, ae.pindex) - n.cancelCatchup() - n.Unlock() - return - } - if ps, err := decodePeerState(ae.entries[1].Data); err == nil { n.processPeerState(ps) // Also need to copy from client's buffer. @@ -3418,21 +3386,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) n.Unlock() return - - } else { - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) - // Reset our term. - n.term = n.pterm - if ae.pindex > n.pindex { - // Setup our state for catching up. - inbox := n.createCatchup(ae) - ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) - n.Unlock() - n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) - arPool.Put(ar) - return - } } + + // Setup our state for catching up. + n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + inbox := n.createCatchup(ae) + ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + n.Unlock() + n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) + arPool.Put(ar) + return } // Save to our WAL if we have entries. @@ -3571,9 +3534,8 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { n.term = ar.term n.vote = noVote n.writeTermVote() - n.warn("Detected another leader with higher term, will stepdown and reset") - n.stepdown.push(noLeader) - n.resetWAL() + n.warn("Detected another leader with higher term, will stepdown") + n.stepdownLocked(noLeader) n.Unlock() arPool.Put(ar) } else if ar.reply != _EMPTY_ { @@ -3620,7 +3582,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { if index := ae.pindex + 1; index != seq { n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex) if n.State() == Leader { - n.stepdown.push(n.selectNextLeader()) + n.stepdownLocked(n.selectNextLeader()) } // Reset and cancel any catchup. n.resetWAL() @@ -3814,12 +3776,7 @@ func writePeerState(sd string, ps *peerState) error { if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } - - <-dios - err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms) - dios <- struct{}{} - - return err + return writeFileWithSync(psf, encodePeerState(ps), defaultFilePerms) } func readPeerState(sd string) (ps *peerState, err error) { @@ -3843,12 +3800,7 @@ func writeTermVote(sd string, wtv []byte) error { if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { return err } - - <-dios - err := os.WriteFile(psf, wtv, defaultFilePerms) - dios <- struct{}{} - - return err + return writeFileWithSync(psf, wtv, defaultFilePerms) } // readTermVote will read the largest term and who we voted from to stable storage. @@ -4020,9 +3972,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { if n.State() != Follower { n.debug("Stepping down from %s, detected higher term: %d vs %d", strings.ToLower(n.State().String()), vr.term, n.term) - n.stepdown.push(noLeader) - n.term = vr.term + n.stepdownLocked(noLeader) } + n.cancelCatchup() + n.term = vr.term n.vote = noVote n.writeTermVote() } @@ -4124,20 +4077,26 @@ func (n *raft) updateLeadChange(isLeader bool) { // Lock should be held. func (n *raft) switchState(state RaftState) { +retry: pstate := n.State() if pstate == Closed { return } + // Set our state. If something else has changed our state + // then retry, this will either be a Stop or Delete call. + if !n.state.CompareAndSwap(int32(pstate), int32(state)) { + goto retry + } + // Reset the election timer. n.resetElectionTimeout() - // Set our state. - n.state.Store(int32(state)) if pstate == Leader && state != Leader { n.updateLeadChange(false) - // Drain the response queue. + // Drain the append entry response and proposal queues. n.resp.drain() + n.prop.drain() } else if state == Leader && pstate != Leader { if len(n.pae) > 0 { n.pae = make(map[uint64]*appendEntry) @@ -4154,13 +4113,17 @@ const ( ) func (n *raft) switchToFollower(leader string) { + n.Lock() + defer n.Unlock() + + n.switchToFollowerLocked(leader) +} + +func (n *raft) switchToFollowerLocked(leader string) { if n.State() == Closed { return } - n.Lock() - defer n.Unlock() - n.debug("Switching to follower") n.lxfer = false @@ -4177,7 +4140,9 @@ func (n *raft) switchToCandidate() { defer n.Unlock() // If we are catching up or are in observer mode we can not switch. - if n.observer || n.paused { + // Avoid petitioning to become leader if we're behind on applies. + if n.observer || n.paused || n.applied < n.commit { + n.resetElect(minElectionTimeout / 4) return } diff --git a/server/raft_test.go b/server/raft_test.go index 27a1aae4d3..7573ae482b 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -14,6 +14,8 @@ package server import ( + "bytes" + "errors" "fmt" "math" "math/rand" @@ -185,6 +187,21 @@ func TestNRGRecoverFromFollowingNoLeader(t *testing.T) { require_NotEqual(t, rg.leader().node().Term(), term) } +func TestNRGInlineStepdown(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + // When StepDown() completes, we should not be the leader. Before, + // this would not be guaranteed as the stepdown could be processed + // some time later. + n := rg.leader().node().(*raft) + require_NoError(t, n.StepDown()) + require_NotEqual(t, n.State(), Leader) +} + func TestNRGObserverMode(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -221,6 +238,53 @@ func TestNRGObserverMode(t *testing.T) { } } +func TestNRGAEFromOldLeader(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + // Listen out for catchup requests. + ch := make(chan *nats.Msg, 16) + _, err := nc.ChanSubscribe(fmt.Sprintf(raftCatchupReply, ">"), ch) + require_NoError(t, err) + + // Start next term so that we can reuse term 1 in the next step. + leader := rg.leader().node().(*raft) + leader.StepDown() + time.Sleep(time.Millisecond * 100) + rg.waitOnLeader() + require_Equal(t, leader.Term(), 2) + leader = rg.leader().node().(*raft) + + // Send an append entry with an outdated term. Beforehand, doing + // so would have caused a WAL reset and then would have triggered + // a Raft-level catchup. + ae := &appendEntry{ + term: 1, + pindex: 0, + leader: leader.id, + reply: nc.NewRespInbox(), + } + payload, err := ae.encode(nil) + require_NoError(t, err) + resp, err := nc.Request(leader.asubj, payload, time.Second) + require_NoError(t, err) + + // Wait for the response, the server should have rejected it. + ar := leader.decodeAppendEntryResponse(resp.Data) + require_NotNil(t, ar) + require_Equal(t, ar.success, false) + + // No catchup should happen at this point because no reset should + // have happened. + require_NoChanRead(t, ch, time.Second*2) +} + // TestNRGSimpleElection tests that a simple election succeeds. It is // simple because the group hasn't processed any entries and hasn't // suffered any interruptions of any kind, therefore there should be @@ -277,6 +341,12 @@ func TestNRGSimpleElection(t *testing.T) { re := decodeVoteResponse(msg.Data) require_True(t, re != nil) + // Ignore old vote responses that could be in-flight. + if re.term < vr.term { + i-- + continue + } + // The vote should have been granted. require_Equal(t, re.granted, true) @@ -296,6 +366,30 @@ func TestNRGSimpleElection(t *testing.T) { } } +func TestNRGSwitchStateClearsQueues(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + s := c.servers[0] // RunBasicJetStreamServer not available + + n := &raft{ + prop: newIPQueue[*proposedEntry](s, "prop"), + resp: newIPQueue[*appendEntryResponse](s, "resp"), + leadc: make(chan bool, 1), // for switchState + } + n.state.Store(int32(Leader)) + require_Equal(t, n.prop.len(), 0) + require_Equal(t, n.resp.len(), 0) + + n.prop.push(&proposedEntry{}) + n.resp.push(&appendEntryResponse{}) + require_Equal(t, n.prop.len(), 1) + require_Equal(t, n.resp.len(), 1) + + n.switchState(Follower) + require_Equal(t, n.prop.len(), 0) + require_Equal(t, n.resp.len(), 0) +} + func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -373,6 +467,14 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { defer nc.Close() rg := c.createRaftGroup("TEST", 3, newStateAdder) + + // Because the election timer is quite high, we want to kick a node into + // campaigning before it naturally needs to, otherwise the test takes a + // long time just to pick a leader. + for _, n := range rg { + n.node().Campaign() + break + } rg.waitOnLeader() leader := rg.leader().node().(*raft) follower := rg.nonLeader().node().(*raft) @@ -444,7 +546,7 @@ func TestNRGInvalidTAVDoesntPanic(t *testing.T) { c.waitOnAllCurrent() } -func TestNRGCandidateStepsDownAfterAE(t *testing.T) { +func TestNRGAssumeHighTermAfterCandidateIsolation(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() c.waitOnLeader() @@ -455,32 +557,38 @@ func TestNRGCandidateStepsDownAfterAE(t *testing.T) { rg := c.createRaftGroup("TEST", 3, newStateAdder) rg.waitOnLeader() - // Pick a random follower node. Bump the term up by a considerable + // Bump the term up on one of the follower nodes by a considerable // amount and force it into the candidate state. This is what happens // after a period of time in isolation. - n := rg.nonLeader().node().(*raft) - n.Lock() - n.term += 100 - n.switchState(Candidate) - n.Unlock() + follower := rg.nonLeader().node().(*raft) + follower.Lock() + follower.term += 100 + follower.switchState(Candidate) + follower.Unlock() + + follower.requestVote() + time.Sleep(time.Millisecond * 100) + + // The candidate will shortly send a vote request. When that happens, + // the rest of the nodes in the cluster should move up to that term, + // even though they will not grant the vote. + nterm := follower.term + for _, n := range rg { + require_Equal(t, n.node().Term(), nterm) + } - // Have the leader push through something on the current term just - // for good measure, although the heartbeats probably work too. + // Have the leader send out a proposal, which will force the candidate + // back into follower state. + rg.waitOnLeader() rg.leader().(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 1) - // Wait for the leader to receive the next append entry from the - // current leader. What should happen is that the node steps down - // and starts following the leader, as nothing in the log of the - // follower is newer than the term of the leader. - checkFor(t, time.Second, 50*time.Millisecond, func() error { - if n.State() == Candidate { - return fmt.Errorf("shouldn't still be candidate state") - } - if nterm, lterm := n.Term(), rg.leader().node().Term(); nterm != lterm { - return fmt.Errorf("follower term %d should match leader term %d", nterm, lterm) - } - return nil - }) + // The candidate should have switched to a follower on a term equal to + // or newer than the candidate had. + for _, n := range rg { + require_NotEqual(t, n.node().State(), Candidate) + require_True(t, n.node().Term() >= nterm) + } } // Test to make sure this does not cause us to truncate our wal or enter catchup state. @@ -559,6 +667,9 @@ func TestNRGSystemClientCleanupFromAccount(t *testing.T) { for _, sm := range rg { sm.node().Stop() } + for _, sm := range rg { + sm.node().WaitForStop() + } } finish := numClients() require_Equal(t, start, finish) @@ -603,6 +714,664 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) { checkState(false, false) } +func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + // Bump the term up a few times. + for i := 0; i < 3; i++ { + rg.leader().node().StepDown() + time.Sleep(time.Millisecond * 50) // Needed because stepdowns not synchronous + rg.waitOnLeader() + } + + leader := rg.leader().node().(*raft) + follower := rg.nonLeader().node().(*raft) + + // Sanity check that we are where we expect to be. + require_Equal(t, leader.term, 4) + require_Equal(t, follower.term, 4) + + // At this point the active term is 4 and pterm is 4, force the + // term up to 9. This won't bump the pterm. + rg.lockAll() + for _, n := range rg { + n.node().(*raft).term += 5 + } + rg.unlockAll() + + // Build an AE that has a term newer than the pterm but older than + // the term. Give it to the follower in candidate state. + ae := newAppendEntry(leader.id, 6, leader.commit, leader.pterm, leader.pindex, nil) + follower.switchToCandidate() + follower.processAppendEntry(ae, nil) + + // The candidate must not have reverted back to term 6. + require_NotEqual(t, follower.term, 6) +} + +func TestNRGTermDoesntRollBackToPtermOnCatchup(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + // Propose some entries so that we have entries in the log that have pterm 1. + lsm := rg.leader().(*stateAdder) + for i := 0; i < 5; i++ { + lsm.proposeDelta(1) + rg.waitOnTotal(t, int64(i)+1) + } + + // Check that everyone is where they are supposed to be. + rg.lockAll() + for _, n := range rg { + rn := n.node().(*raft) + require_Equal(t, rn.term, 1) + require_Equal(t, rn.pterm, 1) + require_Equal(t, rn.pindex, 6) + } + rg.unlockAll() + + // Force a stepdown so that we move up to term 2. + rg.leader().node().(*raft).switchToFollower(noLeader) + rg.waitOnLeader() + leader := rg.leader().node().(*raft) + + // Now make sure everyone has moved up to term 2. Additionally we're + // going to prune back the follower logs to term 1 as this is what will + // create the right conditions for the catchup. + rg.lockAll() + for _, n := range rg { + rn := n.node().(*raft) + require_Equal(t, rn.term, 2) + + if !rn.Leader() { + rn.truncateWAL(1, 6) + require_Equal(t, rn.term, 2) // rn.term must stay the same + require_Equal(t, rn.pterm, 1) + require_Equal(t, rn.pindex, 6) + } + } + // This will make followers run a catchup. + ae := newAppendEntry(leader.id, leader.term, leader.commit, leader.pterm, leader.pindex, nil) + rg.unlockAll() + + arInbox := nc.NewRespInbox() + arCh := make(chan *nats.Msg, 2) + _, err := nc.ChanSubscribe(arInbox, arCh) + require_NoError(t, err) + + // In order to trip this condition, we need to send an append entry that + // will trick the followers into running a catchup. In the process they + // were setting the term back to pterm which is incorrect. + b, err := ae.encode(nil) + require_NoError(t, err) + require_NoError(t, nc.PublishMsg(&nats.Msg{ + Subject: fmt.Sprintf(raftAppendSubj, "TEST"), + Reply: arInbox, + Data: b, + })) + + // Wait for both followers to respond to the append entry and then verify + // that none of the nodes should have reverted back to term 1. + require_ChanRead(t, arCh, time.Second*5) // First follower + require_ChanRead(t, arCh, time.Second*5) // Second follower + for _, n := range rg { + require_NotEqual(t, n.node().Term(), 1) + } +} + +func TestNRGNoResetOnAppendEntryResponse(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + c.waitOnAllCurrent() + + leader := rg.leader().node().(*raft) + follower := rg.nonLeader().node().(*raft) + lsm := rg.leader().(*stateAdder) + + // Subscribe for append entries that aren't heartbeats and respond to + // each of them as though it's a non-success and with a higher term. + // The higher term in this case is what would cause the leader previously + // to reset the entire log which it shouldn't do. + _, err := nc.Subscribe(fmt.Sprintf(raftAppendSubj, "TEST"), func(msg *nats.Msg) { + if ae, err := follower.decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 { + ar := newAppendEntryResponse(ae.term+1, ae.commit, follower.id, false) + require_NoError(t, msg.Respond(ar.encode(nil))) + } + }) + require_NoError(t, err) + + // Generate an append entry that the subscriber above can respond to. + c.waitOnAllCurrent() + lsm.proposeDelta(5) + rg.waitOnTotal(t, 5) + + // The was-leader should now have stepped down, make sure that it + // didn't blow away its log in the process. + rg.lockAll() + defer rg.unlockAll() + require_Equal(t, leader.State(), Follower) + require_NotEqual(t, leader.pterm, 0) + require_NotEqual(t, leader.pindex, 0) +} + +func TestNRGCandidateDontStepdownDueToLeaderOfPreviousTerm(t *testing.T) { + // This test relies on nodes not hitting their election timer too often. + origMinTimeout, origMaxTimeout, origHBInterval := minElectionTimeout, maxElectionTimeout, hbInterval + minElectionTimeout, maxElectionTimeout, hbInterval = time.Second*5, time.Second*10, time.Second + defer func() { + minElectionTimeout, maxElectionTimeout, hbInterval = origMinTimeout, origMaxTimeout, origHBInterval + }() + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + c.waitOnLeader() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var ( + candidatePterm uint64 = 50 + candidatePindex uint64 = 70 + candidateTerm uint64 = 100 + ) + + // Create a candidate that has received entries while they were a follower in a previous term + candidate := rg.nonLeader().node().(*raft) + candidate.Lock() + candidate.switchState(Candidate) + candidate.pterm = candidatePterm + candidate.pindex = candidatePindex + candidate.term = candidateTerm + candidate.Unlock() + + // Leader term is behind candidate + leader := rg.leader().node().(*raft) + leader.Lock() + leader.term = candidatePterm + leader.pterm = candidatePterm + leader.pindex = candidatePindex + leader.Unlock() + + // Subscribe to the append entry subject. + sub, err := nc.SubscribeSync(leader.asubj) + require_NoError(t, err) + + // Get the first append entry that we receive, should be heartbeat from leader of prev term + msg, err := sub.NextMsg(5 * time.Second) + require_NoError(t, err) + + // Stop nodes from progressing so we can check state + rg.lockAll() + defer rg.unlockAll() + + // Decode the append entry + ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply) + require_NoError(t, err) + + // Check that the append entry is from the leader + require_Equal(t, ae.leader, leader.id) + // Check that it came from the leader before it updated its term with the response from the candidate + require_Equal(t, ae.term, candidatePterm) + + // Check that the candidate hasn't stepped down + require_Equal(t, candidate.State(), Candidate) + // Check that the candidate's term is still ahead of the leader's term + require_True(t, candidate.term > ae.term) +} + +func TestNRGRemoveLeaderPeerDeadlockBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + n := rg.leader().node().(*raft) + leader := n.ID() + + // Propose to remove the leader as a peer. Will lead to a deadlock with bug. + require_NoError(t, n.ProposeRemovePeer(leader)) + rg.waitOnLeader() + + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + nl := n.GroupLeader() + if nl != leader { + return nil + } + return errors.New("Leader has not moved") + }) +} + +func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { + tests := []struct { + title string + modify func(rg smGroup) + }{ + { + // state equals, only need to remove the entry + title: "equal", + modify: func(rg smGroup) {}, + }, + { + // state diverged, need to replace the entry + title: "diverged", + modify: func(rg smGroup) { + rg.leader().(*stateAdder).proposeDelta(11) + }, + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var err error + var scratch [1024]byte + + // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. + n := rg.leader().node().(*raft) + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + n.Lock() + ae := n.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = n.storeToWAL(ae) + n.Unlock() + require_NoError(t, err) + + // Stop the leader so it moves to another one. + n.shutdown() + + // Wait for another leader to be picked + rg.waitOnLeader() + + // Make a modification, specific to this test. + test.modify(rg) + + // Restart the previous leader that contains the stored AppendEntry without quorum. + for _, a := range rg { + if a.node().ID() == n.ID() { + sa := a.(*stateAdder) + sa.restart() + break + } + } + + // The previous leader's WAL should truncate to remove the AppendEntry only it has. + // Eventually all WALs for all peers must match. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var expected [][]byte + for _, a := range rg { + an := a.node().(*raft) + var state StreamState + an.wal.FastState(&state) + if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { + return fmt.Errorf("WAL is different: too many entries") + } + // Loop over all entries in the WAL, checking if the contents for all RAFT nodes are equal. + for index := state.FirstSeq; index <= state.LastSeq; index++ { + ae, err := an.loadEntry(index) + if err != nil { + return err + } + seq := int(index) + if len(expected) < seq { + expected = append(expected, ae.buf) + } else if !bytes.Equal(expected[seq-1], ae.buf) { + return fmt.Errorf("WAL is different: stored bytes differ") + } + } + } + return nil + }) + }) + } +} + +func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + l := rg.leader().node().(*raft) + l.Lock() + l.term = 20 + l.Unlock() + + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + l.Lock() + ae := l.buildAppendEntry(entries) + l.Unlock() + + for _, f := range rg { + if f.node().ID() != l.ID() { + fn := f.node().(*raft) + fn.processAppendEntry(ae, fn.aesub) + require_Equal(t, fn.term, 20) // Follower's term gets upped as expected. + } + } + + // Lower the term, simulating the followers receiving a message from an old term/leader. + ae.term = 3 + for _, f := range rg { + if f.node().ID() != l.ID() { + fn := f.node().(*raft) + fn.processAppendEntry(ae, fn.aesub) + require_Equal(t, fn.term, 20) // Follower should reject and the term stays the same. + + fn.Lock() + fn.resetWAL() + fn.Unlock() + fn.processAppendEntry(ae, fn.aesub) + require_Equal(t, fn.term, 20) // Follower should reject again, even after reset, term stays the same. + } + } +} + +func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, for first leader + aeInitial := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeUncommitted := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, after leader change + aeMissed := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeInitial, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // We get one entry that has quorum (but we don't know that yet), so it stays uncommitted for a bit. + n.processAppendEntry(aeUncommitted, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We get one entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We've just had a leader election, and we missed one message from the previous leader. + // We should truncate the last message. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 2) // n.pindex + + // We now notice the leader indicated a different entry at the (no quorum) index, should save that. + n.processAppendEntry(aeMissed, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + + // We now get the entry that initially triggered us to catchup, it should be added. + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 4) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(4) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). + n.processAppendEntry(aeHeartbeat3, n.aesub) + require_Equal(t, n.commit, 4) + require_True(t, n.catchup == nil) +} + +func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, for first leader + aeInitial := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeNoQuorum1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, after leader change + aeMissed1 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMissed2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeInitial, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // We get one entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We get another entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We've just had a leader election, and we missed messages from the previous leader. + // We should truncate the last message. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 2) // n.pindex + + // We now notice the leader indicated a different entry at the (no quorum) index. We should truncate again. + n.processAppendEntry(aeMissed2, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 1) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex + + // We now get caught up with the missed messages. + n.processAppendEntry(aeMissed1, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup != nil) + + n.processAppendEntry(aeMissed2, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + + // We now get the entry that initially triggered us to catchup, it should be added. + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 4) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(4) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). + n.processAppendEntry(aeHeartbeat3, n.aesub) + require_Equal(t, n.commit, 4) + require_True(t, n.catchup == nil) +} + +func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Deliver a message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 2) + + // Deliver another message. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 2) + + // Heartbeat, makes sure we commit. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 3) +} + +func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // First catchup message has the incorrect pterm, stop catchup and re-trigger later with the correct pterm. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pterm, 1) + require_Equal(t, n.pindex, 0) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Now we get the message again and can continue to store it. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Now heartbeat is able to commit the entry. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) +} + func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -648,3 +1417,217 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { require_NoError(t, err) require_Equal(t, len(files), 1) } + +func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample snapshot entry, the content doesn't matter. + snapshotEntries := []*Entry{ + newEntry(EntrySnapshot, nil), + newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})), + } + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeTriggerCatchup := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) + + // Switch follower into catchup. + n.processAppendEntry(aeTriggerCatchup, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async. + n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 1) + + // Try to switch to candidate, it should be blocked since the snapshot is not processed yet. + n.switchToCandidate() + require_Equal(t, n.State(), Follower) + + // Simulate snapshot being processed by the upper layer. + n.Applied(1) + + // Retry becoming candidate, snapshot is processed so can now do so. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) +} + +func TestNRGDontSwitchToCandidateWithMultipleInflightSnapshots(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample snapshot entry, the content doesn't matter. + snapshotEntries := []*Entry{ + newEntry(EntrySnapshot, nil), + newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})), + } + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeSnapshot1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: snapshotEntries}) + aeSnapshot2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + + // Simulate snapshots being sent to us. + n.processAppendEntry(aeSnapshot1, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 0) + require_Equal(t, n.applied, 0) + + n.processAppendEntry(aeSnapshot2, n.aesub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_Equal(t, n.applied, 0) + + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 2) + require_Equal(t, n.applied, 0) + + for i := uint64(1); i <= 2; i++ { + // Try to switch to candidate, it should be blocked since the snapshot is not processed yet. + n.switchToCandidate() + require_Equal(t, n.State(), Follower) + + // Simulate snapshot being processed by the upper layer. + n.Applied(i) + } + + // Retry becoming candidate, all snapshots processed so can now do so. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) +} + +func TestNRGRecoverPindexPtermOnlyIfLogNotEmpty(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + gn := rg[0].(*stateAdder) + rn := rg[0].node().(*raft) + + // Delete the msgs and snapshots, leaving the only remaining trace + // of the term in the TAV file. + store := filepath.Join(gn.cfg.Store) + require_NoError(t, rn.wal.Truncate(0)) + require_NoError(t, os.RemoveAll(filepath.Join(store, "msgs"))) + require_NoError(t, os.RemoveAll(filepath.Join(store, "snapshots"))) + + for _, gn := range rg { + gn.stop() + } + rg[0].restart() + rn = rg[0].node().(*raft) + + // Both should be zero as, without any snapshots or log entries, + // the log is considered empty and therefore we account as such. + require_Equal(t, rn.pterm, 0) + require_Equal(t, rn.pindex, 0) +} + +func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeCatchupTrigger := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + + // Truncate to simulate we missed one message and need to catchup. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Process first message as part of the catchup. + catchupSub := n.catchup.sub + n.processAppendEntry(aeMsg1, catchupSub) + require_True(t, n.catchup != nil) + + // Receiving a vote request should cancel our catchup. + // Otherwise, we could receive catchup messages after this that provides the previous leader with quorum. + // If the new leader doesn't have these entries, the previous leader would desync since it would commit them. + err := n.processVoteRequest(&voteRequest{2, 1, 1, nats0, "reply"}) + require_NoError(t, err) + require_True(t, n.catchup == nil) +} + +func TestNRGMultipleStopsDontPanic(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + defer func() { + p := recover() + require_True(t, p == nil) + }() + + for i := 0; i < 10; i++ { + n.Stop() + } +} + +func TestNRGTruncateDownToCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, we are leader + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + + // Timeline, after leader change + aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) + + // Simply receive first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 0) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Receive second message, which commits the first message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We receive an entry from another leader, should truncate down to commit / remove the second message. + // After doing so, we should also be able to immediately store the message after. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat moves commit up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 2) +}