diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c693b1747631..8a408a0fd1ad 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1207,12 +1207,6 @@ type asyncReady struct { // It is not required to consume or store SoftState. *raft.SoftState - // ReadStates can be used for node to serve linearizable read requests locally - // when its applied index is greater than the index in ReadState. - // Note that the readState will be returned when raft receives msgReadIndex. - // The returned is only valid for the request that requested to read. - ReadStates []raft.ReadState - // Messages specifies outbound messages to other peers and to local storage // threads. These messages can be sent in any order. // @@ -1224,9 +1218,8 @@ type asyncReady struct { // makeAsyncReady constructs an asyncReady from the provided Ready. func makeAsyncReady(rd raft.Ready) asyncReady { return asyncReady{ - SoftState: rd.SoftState, - ReadStates: rd.ReadStates, - Messages: rd.Messages, + SoftState: rd.SoftState, + Messages: rd.Messages, } } diff --git a/pkg/raft/BUILD.bazel b/pkg/raft/BUILD.bazel index d4cdc81d1711..cd4b2e2ed41d 100644 --- a/pkg/raft/BUILD.bazel +++ b/pkg/raft/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "node.go", "raft.go", "rawnode.go", - "read_only.go", "status.go", "storage.go", "types.go", diff --git a/pkg/raft/node.go b/pkg/raft/node.go index c6a4d90773cb..9bd6de72cbe5 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -68,12 +68,6 @@ type Ready struct { // Messages slice. pb.HardState - // ReadStates can be used for node to serve linearizable read requests locally - // when its applied index is greater than the index in ReadState. - // Note that the readState will be returned when raft receives msgReadIndex. - // The returned is only valid for the request that requested to read. - ReadStates []ReadState - // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. // @@ -213,19 +207,8 @@ type Node interface { // from the leader recently). However, 3 can not campaign unilaterally, a // quorum have to agree that the leader is dead, which avoids disrupting the // leader if individual nodes are wrong about it being dead. - // - // This does nothing with ReadOnlyLeaseBased, since it would allow a new - // leader to be elected without the old leader knowing. ForgetLeader(ctx context.Context) error - // ReadIndex request a read state. The read state will be set in the ready. - // Read state has a read index. Once the application advances further than the read - // index, any linearizable read requests issued before the read request can be - // processed safely. The read state will have the same rctx attached. - // Note that request can be lost without notice, therefore it is user's job - // to ensure read index retries. - ReadIndex(ctx context.Context, rctx []byte) error - // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. @@ -607,7 +590,3 @@ func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) func (n *node) ForgetLeader(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgForgetLeader}) } - -func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { - return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) -} diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 4d20fa424f8c..d17bf49d7003 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -193,51 +193,6 @@ func TestDisableProposalForwarding(t *testing.T) { require.Empty(t, r3.msgs) } -// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader -// gets forwarded to the new leader and 'send' method does not attach its term. -func TestNodeReadIndexToOldLeader(t *testing.T) { - r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - r3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - - nt := newNetwork(r1, r2, r3) - - // elect r1 as leader - nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup}) - - var testEntries = []raftpb.Entry{{Data: []byte("testdata")}} - - // send readindex request to r2(follower) - r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries}) - - // verify r2(follower) forwards this message to r1(leader) with term not set - require.Len(t, r2.msgs, 1) - readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg1, r2.msgs[0]) - - // send readindex request to r3(follower) - r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}) - - // verify r3(follower) forwards this message to r1(leader) with term not set as well. - require.Len(t, r3.msgs, 1) - readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg2, r3.msgs[0]) - - // now elect r3 as leader - nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup}) - - // let r1 steps the two messages previously we got from r2, r3 - r1.Step(readIndxMsg1) - r1.Step(readIndxMsg2) - - // verify r1(follower) forwards these messages again to r3(new leader) - require.Len(t, r1.msgs, 2) - readIndxMsg3 := raftpb.Message{From: 2, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg3, r1.msgs[0]) - readIndxMsg3 = raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg3, r1.msgs[1]) -} - // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 5c33949f5213..61168a9ce8a5 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -56,20 +56,6 @@ const ( numStates ) -type ReadOnlyOption int - -const ( - // ReadOnlySafe guarantees the linearizability of the read only request by - // communicating with the quorum. It is the default and suggested option. - ReadOnlySafe ReadOnlyOption = iota - // ReadOnlyLeaseBased ensures linearizability of the read only request by - // relying on the leader lease. It can be affected by clock drift. - // If the clock drift is unbounded, leader might keep the lease longer than it - // should (clock can move backward/pause without any bound). ReadIndex is not safe - // in that case. - ReadOnlyLeaseBased -) - // Possible values for CampaignType const ( // campaignPreElection represents the first phase of a normal election when @@ -231,19 +217,6 @@ type Config struct { // rejoins the cluster. PreVote bool - // ReadOnlyOption specifies how the read only request is processed. - // - // ReadOnlySafe guarantees the linearizability of the read only request by - // communicating with the quorum. It is the default and suggested option. - // - // ReadOnlyLeaseBased ensures linearizability of the read only request by - // relying on the leader lease. It can be affected by clock drift. - // If the clock drift is unbounded, leader might keep the lease longer than it - // should (clock can move backward/pause without any bound). ReadIndex is not safe - // in that case. - // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. - ReadOnlyOption ReadOnlyOption - // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger @@ -331,10 +304,6 @@ func (c *Config) validate() error { c.Logger = getLogger() } - if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { - return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") - } - return nil } @@ -344,8 +313,6 @@ type raft struct { Term uint64 Vote uint64 - readStates []ReadState - // the log raftLog *raftLog @@ -396,8 +363,6 @@ type raft struct { // term changes. uncommittedSize entryPayloadSize - readOnly *readOnly - // number of ticks since it reached last electionTimeout when it is leader // or candidate. // number of ticks since it reached last electionTimeout or received a @@ -424,12 +389,6 @@ type raft struct { step stepFunc logger Logger - - // pendingReadIndexMessages is used to store messages of type MsgReadIndex - // that can't be answered as new leader didn't committed any log in - // current term. Those will be handled as fast as first log is committed in - // current term. - pendingReadIndexMessages []pb.Message } func newRaft(c *Config) *raft { @@ -455,7 +414,6 @@ func newRaft(c *Config) *raft { logger: c.Logger, checkQuorum: c.CheckQuorum, preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, disableConfChangeValidation: c.DisableConfChangeValidation, stepDownOnRemoval: c.StepDownOnRemoval, @@ -528,11 +486,9 @@ func (r *raft) send(m pb.Message) { if m.Term != 0 { r.logger.Panicf("term should not be set when sending %s (was %d)", m.Type, m.Term) } - // do not attach term to MsgProp, MsgReadIndex - // proposals are a way to forward to the leader and - // should be treated as local message. - // MsgReadIndex is also forwarded to leader. - if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { + // Do not attach term to MsgProp. Proposals are a way to forward to the + // leader, and should be treated as local message. + if m.Type != pb.MsgProp { m.Term = r.Term } } @@ -682,7 +638,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { } // sendHeartbeat sends a heartbeat RPC to the given peer. -func (r *raft) sendHeartbeat(to uint64, ctx []byte) { +func (r *raft) sendHeartbeat(to uint64) { pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, @@ -692,10 +648,9 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // an unmatched index. commit := min(pr.Match, r.raftLog.committed) r.send(pb.Message{ - To: to, - Type: pb.MsgHeartbeat, - Commit: commit, - Context: ctx, + To: to, + Type: pb.MsgHeartbeat, + Commit: commit, }) pr.SentCommit(commit) } @@ -713,20 +668,11 @@ func (r *raft) bcastAppend() { // bcastHeartbeat sends RPC, without entries to all the peers. func (r *raft) bcastHeartbeat() { - lastCtx := r.readOnly.lastPendingRequestCtx() - if len(lastCtx) == 0 { - r.bcastHeartbeatWithCtx(nil) - } else { - r.bcastHeartbeatWithCtx([]byte(lastCtx)) - } -} - -func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { r.trk.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } - r.sendHeartbeat(id, ctx) + r.sendHeartbeat(id) }) } @@ -800,7 +746,6 @@ func (r *raft) reset(term uint64) { r.pendingConfIndex = 0 r.uncommittedSize = 0 - r.readOnly = newReadOnly(r.readOnly.option) } func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { @@ -1325,25 +1270,7 @@ func stepLeader(r *raft, m pb.Message) error { } r.bcastAppend() return nil - case pb.MsgReadIndex: - // only one voting member (the leader) in the cluster - if r.trk.IsSingleton() { - if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { - r.send(resp) - } - return nil - } - - // Postpone read only request when this leader has not committed - // any log entry at its term. - if !r.committedEntryInCurrentTerm() { - r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m) - return nil - } - - sendMsgReadIndexResponse(r, m) - return nil case pb.MsgForgetLeader: return nil // noop on leader } @@ -1522,9 +1449,6 @@ func stepLeader(r *raft, m pb.Message) error { } if r.maybeCommit() { - // committed index has progressed for the term, so it is safe - // to respond to pending read index requests - releasePendingReadIndexMessages(r) r.bcastAppend() } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { // This node may be missing the latest commit index, so send it. @@ -1571,20 +1495,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } - if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return nil - } - - if r.trk.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { - return nil - } - - rss := r.readOnly.advance(m) - for _, rs := range rss { - if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { - r.send(resp) - } - } case pb.MsgSnapStatus: if pr.State != tracker.StateSnapshot { return nil @@ -1724,10 +1634,6 @@ func stepFollower(r *raft, m pb.Message) error { m.To = r.lead r.send(m) case pb.MsgForgetLeader: - if r.readOnly.option == ReadOnlyLeaseBased { - r.logger.Error("ignoring MsgForgetLeader due to ReadOnlyLeaseBased") - return nil - } if r.lead != None { r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term) r.lead = None @@ -1738,19 +1644,6 @@ func stepFollower(r *raft, m pb.Message) error { // know we are not recovering from a partition so there is no need for the // extra round trip. r.hup(campaignTransfer) - case pb.MsgReadIndex: - if r.lead == None { - r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return nil - } - m.To = r.lead - r.send(m) - case pb.MsgReadIndexResp: - if len(m.Entries) != 1 { - r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return nil - } - r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } return nil } @@ -1811,7 +1704,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) - r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) + r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp}) } func (r *raft) handleSnapshot(m pb.Message) { @@ -2038,31 +1931,6 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } -// committedEntryInCurrentTerm return true if the peer has committed an entry in its term. -func (r *raft) committedEntryInCurrentTerm() bool { - // NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0, - // we won't see it as a match with r.Term. - return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term -} - -// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer -// itself, a blank value will be returned. -func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { - if req.From == None || req.From == r.id { - r.readStates = append(r.readStates, ReadState{ - Index: readIndex, - RequestCtx: req.Entries[0].Data, - }) - return pb.Message{} - } - return pb.Message{ - Type: pb.MsgReadIndexResp, - To: req.From, - Index: readIndex, - Entries: req.Entries, - } -} - // increaseUncommittedSize computes the size of the proposed entries and // determines whether they would push leader over its maxUncommittedSize limit. // If the new entries would exceed the limit, the method returns false. If not, @@ -2099,40 +1967,3 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) { r.uncommittedSize -= s } } - -func releasePendingReadIndexMessages(r *raft) { - if len(r.pendingReadIndexMessages) == 0 { - // Fast path for the common case to avoid a call to storage.LastIndex() - // via committedEntryInCurrentTerm. - return - } - if !r.committedEntryInCurrentTerm() { - r.logger.Error("pending MsgReadIndex should be released only after first commit in current term") - return - } - - msgs := r.pendingReadIndexMessages - r.pendingReadIndexMessages = nil - - for _, m := range msgs { - sendMsgReadIndexResponse(r, m) - } -} - -func sendMsgReadIndexResponse(r *raft, m pb.Message) { - // thinking: use an internally defined context instead of the user given context. - // We can express this in terms of the term and index instead of a user-supplied value. - // This would allow multiple reads to piggyback on the same message. - switch r.readOnly.option { - // If more than the local vote is needed, go through a full broadcast. - case ReadOnlySafe: - r.readOnly.addRequest(r.raftLog.committed, m) - // The local node automatically acks the request. - r.readOnly.recvAck(r.id, m.Entries[0].Data) - r.bcastHeartbeatWithCtx(m.Entries[0].Data) - case ReadOnlyLeaseBased: - if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { - r.send(resp) - } - } -} diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index e02fe8719441..1c96b9191c52 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -1191,39 +1191,6 @@ func TestHandleHeartbeatResp(t *testing.T) { require.Empty(t, msgs) } -// TestRaftFreesReadOnlyMem ensures raft will free read request from -// readOnly readIndexQueue and pendingReadIndex map. -// related issue: https://github.com/etcd-io/etcd/issues/7571 -func TestRaftFreesReadOnlyMem(t *testing.T) { - sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.becomeCandidate() - sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) - - ctx := []byte("ctx") - - // leader starts linearizable read request. - // more info: raft dissertation 6.4, step 2. - sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) - msgs := sm.readMessages() - require.Len(t, msgs, 1) - require.Equal(t, pb.MsgHeartbeat, msgs[0].Type) - require.Equal(t, ctx, msgs[0].Context) - require.Len(t, sm.readOnly.readIndexQueue, 1) - require.Len(t, sm.readOnly.pendingReadIndex, 1) - _, ok := sm.readOnly.pendingReadIndex[string(ctx)] - require.True(t, ok) - - // heartbeat responses from majority of followers (1 in this case) - // acknowledge the authority of the leader. - // more info: raft dissertation 6.4, step 3. - sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx}) - require.Empty(t, sm.readOnly.readIndexQueue) - require.Empty(t, sm.readOnly.pendingReadIndex) - _, ok = sm.readOnly.pendingReadIndex[string(ctx)] - require.False(t, ok) -} - // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { @@ -1906,212 +1873,6 @@ func TestDisruptiveFollowerPreVote(t *testing.T) { require.Equal(t, StateLeader, n1.state) } -func TestReadOnlyOptionSafe(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - - nt := newNetwork(a, b, c) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {c, 10, 31, []byte("ctx3")}, - {a, 10, 41, []byte("ctx4")}, - {b, 10, 51, []byte("ctx5")}, - {c, 10, 61, []byte("ctx6")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - assert.NotEmpty(t, r.readStates, "#%d", i) - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -func TestReadOnlyWithLearner(t *testing.T) { - s := newTestMemoryStorage(withPeers(1), withLearners(2)) - a := newTestLearnerRaft(1, 10, 1, s) - b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) - - nt := newNetwork(a, b) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {a, 10, 31, []byte("ctx3")}, - {b, 10, 41, []byte("ctx4")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - nextEnts(a, s) // append the entries on the leader - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - require.NotEmpty(t, r.readStates, "#%d", i) - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -func TestReadOnlyOptionLease(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - a.readOnly.option = ReadOnlyLeaseBased - b.readOnly.option = ReadOnlyLeaseBased - c.readOnly.option = ReadOnlyLeaseBased - a.checkQuorum = true - b.checkQuorum = true - c.checkQuorum = true - - nt := newNetwork(a, b, c) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {c, 10, 31, []byte("ctx3")}, - {a, 10, 41, []byte("ctx4")}, - {b, 10, 51, []byte("ctx5")}, - {c, 10, 61, []byte("ctx6")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message -// when it commits at least one log entry at it term. -func TestReadOnlyForNewLeader(t *testing.T) { - nodeConfigs := []struct { - id uint64 - committed uint64 - applied uint64 - compactIndex uint64 - }{ - {1, 1, 1, 0}, - {2, 2, 2, 2}, - {3, 2, 2, 2}, - } - peers := make([]stateMachine, 0) - for _, c := range nodeConfigs { - storage := newTestMemoryStorage(withPeers(1, 2, 3)) - require.NoError(t, storage.Append(index(1).terms(1, 1))) - storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed}) - if c.compactIndex != 0 { - storage.Compact(c.compactIndex) - } - cfg := newTestConfig(c.id, 10, 1, storage) - cfg.Applied = c.applied - raft := newRaft(cfg) - peers = append(peers, raft) - } - nt := newNetwork(peers...) - - // Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader. - nt.ignore(pb.MsgApp) - // Force peer a to become leader. - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - sm := nt.peers[1].(*raft) - require.Equal(t, StateLeader, sm.state) - - // Ensure peer a drops read only request. - var windex uint64 = 4 - wctx := []byte("ctx") - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) - require.Empty(t, sm.readStates) - - nt.recover() - - // Force peer a to commit a log entry at its term - for i := 0; i < sm.heartbeatTimeout; i++ { - sm.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - require.Equal(t, uint64(4), sm.raftLog.committed) - lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed)) - require.Equal(t, sm.Term, lastLogTerm) - - // Ensure peer a processed postponed read only request after it committed an entry at its term. - require.Len(t, sm.readStates, 1) - rs := sm.readStates[0] - require.Equal(t, windex, rs.Index) - require.Equal(t, wctx, rs.RequestCtx) - - // Ensure peer a accepts read only request after it committed an entry at its term. - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) - require.Len(t, sm.readStates, 2) - rs = sm.readStates[1] - require.Equal(t, windex, rs.Index) - require.Equal(t, wctx, rs.RequestCtx) -} - func TestLeaderAppResp(t *testing.T) { // initial progress: match = 0; next = 3 tests := []struct { diff --git a/pkg/raft/raftpb/raft.proto b/pkg/raft/raftpb/raft.proto index d37565e0cd72..d31b7bb39430 100644 --- a/pkg/raft/raftpb/raft.proto +++ b/pkg/raft/raftpb/raft.proto @@ -55,8 +55,6 @@ enum MessageType { MsgCheckQuorum = 12; MsgTransferLeader = 13; MsgTimeoutNow = 14; - MsgReadIndex = 15; - MsgReadIndexResp = 16; MsgPreVote = 17; MsgPreVoteResp = 18; MsgStorageAppend = 19; @@ -67,6 +65,8 @@ enum MessageType { // NOTE: when adding new message types, remember to update the isLocalMsg and // isResponseMsg arrays in raft/util.go and update the corresponding tests in // raft/util_test.go. + + reserved 15, 16; // used to be MsgReadIndex(Resp) } message Message { diff --git a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go index a8f2602f7dbb..395f59c2ff77 100644 --- a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go @@ -60,15 +60,6 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e arg.Scan(t, i, &cfg.MaxCommittedSizePerReady) case "disable-conf-change-validation": arg.Scan(t, i, &cfg.DisableConfChangeValidation) - case "read-only": - switch arg.Vals[i] { - case "safe": - cfg.ReadOnlyOption = raft.ReadOnlySafe - case "lease-based": - cfg.ReadOnlyOption = raft.ReadOnlyLeaseBased - default: - return fmt.Errorf("invalid read-only option %q", arg.Vals[i]) - } case "step-down-on-removal": arg.Scan(t, i, &cfg.StepDownOnRemoval) } diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 4a4279abac87..9a1bdd7d693e 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -160,9 +160,6 @@ func (rn *RawNode) readyWithoutAccept() Ready { if r.raftLog.hasNextUnstableSnapshot() { rd.Snapshot = *r.raftLog.nextUnstableSnapshot() } - if len(r.readStates) != 0 { - rd.ReadStates = r.readStates - } rd.MustSync = MustSync(r.hardState(), rn.prevHardSt, len(rd.Entries)) if rn.asyncStorageWrites { @@ -412,9 +409,6 @@ func (rn *RawNode) acceptReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } if !rn.asyncStorageWrites { if len(rn.stepsOnAdvance) != 0 { rn.raft.logger.Panicf("two accepted Ready structs without call to Advance") @@ -469,9 +463,6 @@ func (rn *RawNode) HasReady() bool { if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) { return true } - if len(r.readStates) != 0 { - return true - } return false } @@ -553,11 +544,3 @@ func (rn *RawNode) TransferLeader(transferee uint64) { func (rn *RawNode) ForgetLeader() error { return rn.raft.Step(pb.Message{Type: pb.MsgForgetLeader}) } - -// ReadIndex requests a read state. The read state will be set in ready. -// Read State has a read index. Once the application advances further than the read -// index, any linearizable read requests issued before the read request can be -// processed safely. The read state will have the same rctx attached. -func (rn *RawNode) ReadIndex(rctx []byte) { - _ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) -} diff --git a/pkg/raft/rawnode_test.go b/pkg/raft/rawnode_test.go index 097d7770eaa8..43e1cba1fa22 100644 --- a/pkg/raft/rawnode_test.go +++ b/pkg/raft/rawnode_test.go @@ -62,12 +62,7 @@ func (a *rawNodeAdapter) Ready() <-chan Ready { return nil } // Node takes more contexts. Easy enough to fix. -func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() } -func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { - a.RawNode.ReadIndex(rctx) - // RawNode swallowed the error in ReadIndex, it probably should not do that. - return nil -} +func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() } func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) @@ -507,53 +502,6 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { assert.Equal(t, ccdata2, entries[2].Data) } -// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message -// to the underlying raft. It also ensures that ReadState can be read out. -func TestRawNodeReadIndex(t *testing.T) { - var msgs []pb.Message - appendStep := func(r *raft, m pb.Message) error { - msgs = append(msgs, m) - return nil - } - wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - - s := newTestMemoryStorage(withPeers(1)) - c := newTestConfig(1, 10, 1, s) - rawNode, err := NewRawNode(c) - require.NoError(t, err) - - rawNode.raft.readStates = wrs - // ensure the ReadStates can be read out - assert.True(t, rawNode.HasReady()) - rd := rawNode.Ready() - assert.Equal(t, wrs, rd.ReadStates) - s.Append(rd.Entries) - rawNode.Advance(rd) - // ensure raft.readStates is reset after advance - assert.Nil(t, rawNode.raft.readStates) - - wrequestCtx := []byte("somedata2") - rawNode.Campaign() - for { - rd = rawNode.Ready() - s.Append(rd.Entries) - - if rd.SoftState.Lead == rawNode.raft.id { - rawNode.Advance(rd) - - // Once we are the leader, issue a ReadIndex request - rawNode.raft.step = appendStep - rawNode.ReadIndex(wrequestCtx) - break - } - rawNode.Advance(rd) - } - // ensure that MsgReadIndex message is sent to the underlying raft - require.Len(t, msgs, 1) - assert.Equal(t, pb.MsgReadIndex, msgs[0].Type) - assert.Equal(t, wrequestCtx, msgs[0].Entries[0].Data) -} - // TestBlockProposal from node_test.go has no equivalent in rawNode because there is // no leader check in RawNode. diff --git a/pkg/raft/read_only.go b/pkg/raft/read_only.go deleted file mode 100644 index 661138f64bc5..000000000000 --- a/pkg/raft/read_only.go +++ /dev/null @@ -1,124 +0,0 @@ -// This code has been modified from its original form by Cockroach Labs, Inc. -// All modifications are Copyright 2024 Cockroach Labs, Inc. -// -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package raft - -import pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" - -// ReadState provides state for read only query. -// It's caller's responsibility to call ReadIndex first before getting -// this state from ready, it's also caller's duty to differentiate if this -// state is what it requests through RequestCtx, eg. given a unique id as -// RequestCtx -type ReadState struct { - Index uint64 - RequestCtx []byte -} - -type readIndexStatus struct { - req pb.Message - index uint64 - // NB: this never records 'false', but it's more convenient to use this - // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If - // this becomes performance sensitive enough (doubtful), quorum.VoteResult - // can change to an API that is closer to that of CommittedIndex. - acks map[uint64]bool -} - -type readOnly struct { - option ReadOnlyOption - pendingReadIndex map[string]*readIndexStatus - readIndexQueue []string -} - -func newReadOnly(option ReadOnlyOption) *readOnly { - return &readOnly{ - option: option, - pendingReadIndex: make(map[string]*readIndexStatus), - } -} - -// addRequest adds a read only request into readonly struct. -// `index` is the commit index of the raft state machine when it received -// the read only request. -// `m` is the original read only request message from the local or remote node. -func (ro *readOnly) addRequest(index uint64, m pb.Message) { - s := string(m.Entries[0].Data) - if _, ok := ro.pendingReadIndex[s]; ok { - return - } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} - ro.readIndexQueue = append(ro.readIndexQueue, s) -} - -// recvAck notifies the readonly struct that the raft state machine received -// an acknowledgment of the heartbeat that attached with the read only request -// context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { - rs, ok := ro.pendingReadIndex[string(context)] - if !ok { - return nil - } - - rs.acks[id] = true - return rs.acks -} - -// advance advances the read only request queue kept by the readonly struct. -// It dequeues the requests until it finds the read only request that has -// the same context as the given `m`. -func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { - var ( - i int - found bool - ) - - ctx := string(m.Context) - var rss []*readIndexStatus - - for _, okctx := range ro.readIndexQueue { - i++ - rs, ok := ro.pendingReadIndex[okctx] - if !ok { - panic("cannot find corresponding read state from pending map") - } - rss = append(rss, rs) - if okctx == ctx { - found = true - break - } - } - - if found { - ro.readIndexQueue = ro.readIndexQueue[i:] - for _, rs := range rss { - delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) - } - return rss - } - - return nil -} - -// lastPendingRequestCtx returns the context of the last pending read only -// request in readonly struct. -func (ro *readOnly) lastPendingRequestCtx() string { - if len(ro.readIndexQueue) == 0 { - return "" - } - return ro.readIndexQueue[len(ro.readIndexQueue)-1] -} diff --git a/pkg/raft/testdata/forget_leader_read_only_lease_based.txt b/pkg/raft/testdata/forget_leader_read_only_lease_based.txt deleted file mode 100644 index 4d4bd1883605..000000000000 --- a/pkg/raft/testdata/forget_leader_read_only_lease_based.txt +++ /dev/null @@ -1,30 +0,0 @@ -log-level none ----- -ok - -add-nodes 3 voters=(1,2,3) index=10 checkquorum=true read-only=lease-based ----- -ok - -campaign 1 ----- -ok - -stabilize ----- -ok - -log-level debug ----- -ok - -# ForgetLeader fails with lease-based reads, as it's not safe. -forget-leader 2 ----- -ERROR ignoring MsgForgetLeader due to ReadOnlyLeaseBased - -raft-state ----- -1: StateLeader (Voter) Term:1 Lead:1 -2: StateFollower (Voter) Term:1 Lead:1 -3: StateFollower (Voter) Term:1 Lead:1 diff --git a/pkg/raft/util.go b/pkg/raft/util.go index d93778606f48..a1894eb24c13 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -46,7 +46,6 @@ var isResponseMsg = [...]bool{ pb.MsgVoteResp: true, pb.MsgHeartbeatResp: true, pb.MsgUnreachable: true, - pb.MsgReadIndexResp: true, pb.MsgPreVoteResp: true, pb.MsgStorageAppendResp: true, pb.MsgStorageApplyResp: true, @@ -117,9 +116,6 @@ func DescribeReady(rd Ready, f EntryFormatter) string { fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState)) buf.WriteByte('\n') } - if len(rd.ReadStates) > 0 { - fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates) - } if len(rd.Entries) > 0 { buf.WriteString("Entries:\n") fmt.Fprint(&buf, DescribeEntries(rd.Entries, f)) diff --git a/pkg/raft/util_test.go b/pkg/raft/util_test.go index 18a5b6054717..624a27928d06 100644 --- a/pkg/raft/util_test.go +++ b/pkg/raft/util_test.go @@ -90,8 +90,6 @@ func TestIsLocalMsg(t *testing.T) { {pb.MsgHeartbeat, false}, {pb.MsgHeartbeatResp, false}, {pb.MsgTimeoutNow, false}, - {pb.MsgReadIndex, false}, - {pb.MsgReadIndexResp, false}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, false}, {pb.MsgStorageAppend, true}, @@ -127,8 +125,6 @@ func TestIsResponseMsg(t *testing.T) { {pb.MsgHeartbeat, false}, {pb.MsgHeartbeatResp, true}, {pb.MsgTimeoutNow, false}, - {pb.MsgReadIndex, false}, - {pb.MsgReadIndexResp, true}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, true}, {pb.MsgStorageAppend, false},