diff --git a/pkg/base/config.go b/pkg/base/config.go index 5bb86a2f4984..30b286b809c8 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -344,6 +344,10 @@ type RaftConfig struct { // unless overridden. RaftElectionTimeoutTicks int + // RefreshPendingCommandsWhenLeader specifies whether Raft leaders repropose + // pending commands every electionTicks. + RefreshPendingCommandsWhenLeader bool + // RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader // lease active duration should be of the raft election timeout. RangeLeaseRaftElectionTimeoutMultiplier float64 diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 61997004e87c..9eb4809d906a 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -1126,6 +1127,139 @@ func TestRefreshPendingCommands(t *testing.T) { } } +// Test that when a Raft group is not able to establish a quorum, its Raft log +// does not grow without bound. It tests two different scenerios where this used +// to be possible (see #27772): +// 1. The leader proposes a command and cannot establish a quorum. The leader +// continually re-proposes the command. +// 2. The follower proposes a command and forwards it to the leader, who cannot +// establish a quorum. The follower continually re-proposes and forwards the +// command to the leader. +func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig(nil) + // Drop the raft tick interval to permit more proposal refreshes. + sc.RaftTickInterval = 50 * time.Millisecond + // Disable leader transfers during leaseholder changes so that we + // can easily create leader-not-leaseholder scenarios. + sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true + // Disable periodic gossip tasks which can move the range 1 lease + // unexpectedly. + sc.TestingKnobs.DisablePeriodicGossips = true + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 5) + + const rangeID = roachpb.RangeID(1) + mtc.replicateRange(rangeID, 1, 2, 3, 4) + + // Raft leadership is kept on node 0. + leaderRepl, err := mtc.Store(0).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + // Put some data in the range so we'll have something to test for. + incArgs := incrementArgs([]byte("a"), 5) + if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + t.Fatal(err) + } + + // Wait for all nodes to catch up. + mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5}) + + // Test proposing on leader and proposing on follower. Neither should result + // in unbounded raft log growth. + testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) { + // Restart any nodes that are down. + for _, s := range []int{2, 3, 4} { + if mtc.Store(s) == nil { + mtc.restartStore(s) + } + } + + // Determine which node to propose on. Transfer lease to that node. + var propIdx, otherIdx int + if !proposeOnFollower { + propIdx, otherIdx = 0, 1 + } else { + propIdx, otherIdx = 1, 0 + } + propNode := mtc.stores[propIdx].TestSender() + mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx) + testutils.SucceedsSoon(t, func() error { + // Lease transfers may not be immediately observed by the new + // leaseholder. Wait until the new leaseholder is aware. + repl, err := mtc.Store(propIdx).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + if lease, _ := repl.GetLease(); lease.Replica != repDesc { + return errors.Errorf("lease not transferred yet; found %v", lease) + } + return nil + }) + + // Stop enough nodes to prevent a quorum. + for _, s := range []int{2, 3, 4} { + mtc.stopStore(s) + } + + // Determine the current raft log size. + initLogSize := leaderRepl.GetRaftLogSize() + + // While a majority nodes are down, write some data. + putRes := make(chan *roachpb.Error) + go func() { + putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */)) + _, err := client.SendWrapped(context.Background(), propNode, putArgs) + putRes <- err + }() + + // Wait for a bit and watch for Raft log growth. + wait := time.After(1 * time.Second) + ticker := time.Tick(100 * time.Millisecond) + Loop: + for { + select { + case <-wait: + break Loop + case <-ticker: + // Verify that the leader is node 0. + status := leaderRepl.RaftStatus() + if status == nil || status.RaftState != raft.StateLeader { + t.Fatalf("raft leader should be node 0, but got status %+v", status) + } + + // Check raft log size. + const logSizeLimit = 32 << 10 // 32 KB + curlogSize := leaderRepl.GetRaftLogSize() + logSize := curlogSize - initLogSize + logSizeStr := humanizeutil.IBytes(logSize) + if logSize > logSizeLimit { + t.Fatalf("raft log size grew to %s", logSizeStr) + } + t.Logf("raft log size grew to %s", logSizeStr) + case err := <-putRes: + t.Fatalf("write finished with quorum unavailable; err=%v", err) + } + } + + // Start enough nodes to establish a quorum. + mtc.restartStore(2) + + // The write should now succeed. + if err := <-putRes; err != nil { + t.Fatal(err) + } + }) +} + // TestStoreRangeUpReplicate verifies that the replication queue will notice // under-replicated ranges and replicate them. Also tests that preemptive // snapshots which contain sideloaded proposals don't panic the receiving end. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 3c5f1bb7ca9d..2c66bdef72df 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -382,8 +382,13 @@ type Replica struct { // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData - internalRaftGroup *raft.RawNode + proposals map[storagebase.CmdIDKey]*ProposalData + // proposalsForwarded is maintained by Raft leaders and stores in-flight + // commands that were forwarded to the leader during its current term. + // The set allows leaders to detect duplicate forwarded commands and + // avoid re-proposing the same forwarded command multiple times. + proposalsForwarded map[storagebase.CmdIDKey]struct{} + internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has // been created from a preemptive snapshot (i.e. before being added to the // Raft group). The replica ID will be non-zero whenever the replica is @@ -692,6 +697,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( r.cmdQMu.Unlock() r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.proposalsForwarded = map[storagebase.CmdIDKey]struct{}{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -851,6 +857,9 @@ func (r *Replica) cancelPendingCommandsLocked() { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } + for cmdID := range r.mu.proposalsForwarded { + delete(r.mu.proposalsForwarded, cmdID) + } } // cleanupFailedProposalLocked cleans up after a proposal that has failed. It @@ -3747,6 +3756,33 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { // other replica is not quiesced, so we don't need to wake the leader. r.unquiesceLocked() r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID) + if req.Message.Type == raftpb.MsgProp { + // A proposal was forwarded to this replica. + if r.mu.replicaID == r.mu.leaderID { + // This replica is the leader. Record that the proposal + // was seen and drop the proposal if it was already seen. + // This prevents duplicate forwarded proposals from each + // being appended to a leader's raft log. + allSeen := true + for _, e := range req.Message.Entries { + switch e.Type { + case raftpb.EntryNormal: + cmdID, _ := DecodeRaftCommand(e.Data) + if _, ok := r.mu.proposalsForwarded[cmdID]; !ok { + r.mu.proposalsForwarded[cmdID] = struct{}{} + allSeen = false + } + case raftpb.EntryConfChange: + allSeen = false + default: + log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) + } + } + if allSeen { + return false /* unquiesceAndWakeLeader */, nil + } + } + } err := raftGroup.Step(req.Message) if err == raft.ErrProposalDropped { // A proposal was forwarded to this replica but we couldn't propose it. @@ -3855,6 +3891,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } + // Clear the forwarded proposal set. No-op if not previously the leader. + for cmdID := range r.mu.proposalsForwarded { + delete(r.mu.proposalsForwarded, cmdID) + } leaderID = roachpb.ReplicaID(rd.SoftState.Lead) } @@ -4213,6 +4253,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { r.mu.ticks++ r.mu.internalRaftGroup.Tick() if !r.store.TestingKnobs().DisableRefreshReasonTicks && + (r.mu.replicaID != r.mu.leaderID || r.store.cfg.RefreshPendingCommandsWhenLeader) && r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 { // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go @@ -4220,6 +4261,13 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { // RaftElectionTimeoutTicks to refreshProposalsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. + // + // However, we don't refresh proposals if we are the leader because + // doing so would be useless. The commands tracked by a leader replica + // were either all proposed when the replica was a leader or were + // re-proposed when the replica became a leader. Either way, they are + // guaranteed to be in the leader's Raft log so re-proposing won't do + // anything. r.refreshProposalsLocked( r.store.cfg.RaftElectionTimeoutTicks, reasonTicks, ) @@ -4314,6 +4362,9 @@ func (r *Replica) maybeTransferRaftLeader( if !r.isLeaseValidRLocked(l, now) { return } + if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { + return + } if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit { log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID) r.store.metrics.RangeRaftLeaderTransfers.Inc(1) @@ -5024,6 +5075,9 @@ func (r *Replica) processRaftCommand( delete(r.mu.proposals, idKey) } + // Delete the entry for a forwarded proposal set. + delete(r.mu.proposalsForwarded, idKey) + leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 9cd31af75205..5290b9a9f4a2 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -310,7 +310,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) { r.txnWaitQueue.Clear(true /* disable */) } - if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) { + if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) && + !r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { // If this replica is the raft leader but it is not the new lease holder, // then try to transfer the raft leadership to match the lease. We like it // when leases and raft leadership are collocated because that facilitates diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 88d57dbe9e06..c549457d40b5 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -8127,6 +8127,9 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { cfg := TestStoreConfig(nil) // Disable ticks which would interfere with the manual ticking in this test. cfg.RaftTickInterval = math.MaxInt32 + // Enable so that pending commands are reproposed even though the replica + // will become the Raft leader. + cfg.RefreshPendingCommandsWhenLeader = true stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) tc.StartWithStoreConfig(t, stopper, cfg) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 34d384a68190..4d855bab5f3f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -759,6 +759,9 @@ type StoreTestingKnobs struct { DisableScanner bool // DisablePeriodicGossips disables periodic gossiping. DisablePeriodicGossips bool + // DisableLeaderFollowsLeaseholder disables attempts to transfer raft + // leadership when it diverges from the range's leaseholder. + DisableLeaderFollowsLeaseholder bool // DisableRefreshReasonTicks disables refreshing pending commands when a new // leader is discovered. DisableRefreshReasonNewLeader bool