Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Avoid scanning raft log in becomeLeader #9073

Merged
merged 1 commit into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ func (n *node) run(r *raft) {
}
case cc := <-n.confc:
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
Expand All @@ -344,7 +343,6 @@ func (n *node) run(r *raft) {
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down
6 changes: 5 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
rdyEntries = append(rdyEntries, e)
switch e.Type {
Expand All @@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
n.ApplyConfChange(cc)
applyConfChan <- struct{}{}
applied = true
}
}
n.Advance()
if applied {
applyConfChan <- struct{}{}
}
}
}
}()
Expand Down
36 changes: 20 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,13 @@ type raft struct {
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64

readOnly *readOnly

Expand Down Expand Up @@ -578,7 +583,7 @@ func (r *raft) reset(term uint64) {
}
})

r.pendingConf = false
r.pendingConfIndex = 0
r.readOnly = newReadOnly(r.readOnly.option)
}

Expand Down Expand Up @@ -682,12 +687,13 @@ func (r *raft) becomeLeader() {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}

nconf := numOfPendingConf(ents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that using pendingConfIndex only reduce calling numOfPendingConf here, can this reduce the performance too much?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not have a significant negative impact on performance. The only thing affected is the ability to propose new config changes, and the impact is small. The worst case scenario is when you have one up-to-date follower and one follower that is behind, then the leader dies and the up-to-date follower becomes the new leader.

Before, the new leader could immediately propose a config change, but that config change wouldn't be applied until the other follower catches up (acknowledging the log entries, but not necessarily applying them)

With this change, the follower must catch up before any config change can be proposed. So this only adds one round trip to membership changes proposed immediately after an election.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

if nconf > 1 {
panic("unexpected multiple uncommitted config entry")
}
if nconf == 1 {
r.pendingConf = true
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}

r.appendEntry(pb.Entry{Data: nil})
Expand Down Expand Up @@ -901,11 +907,13 @@ func stepLeader(r *raft, m pb.Message) {

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
Expand Down Expand Up @@ -1270,7 +1278,6 @@ func (r *raft) addLearner(id uint64) {
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
r.pendingConf = false
pr := r.getProgress(id)
if pr == nil {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
Expand Down Expand Up @@ -1306,7 +1313,6 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {

func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false

// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
Expand All @@ -1324,8 +1330,6 @@ func (r *raft) removeNode(id uint64) {
}
}

func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
if !isLearner {
delete(r.learnerPrs, id)
Expand Down
73 changes: 21 additions & 52 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2736,8 +2736,8 @@ func TestStepConfig(t *testing.T) {
if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1)
}
if !r.pendingConf {
t.Errorf("pendingConf = %v, want true", r.pendingConf)
if r.pendingConfIndex != index+1 {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
}
}

Expand All @@ -2751,7 +2751,7 @@ func TestStepIgnoreConfig(t *testing.T) {
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
pendingConfIndex := r.pendingConfIndex
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
ents, err := r.raftLog.entries(index+1, noLimit)
Expand All @@ -2761,72 +2761,50 @@ func TestStepIgnoreConfig(t *testing.T) {
if !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents)
}
if r.pendingConf != pendingConf {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
if r.pendingConfIndex != pendingConfIndex {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
}
}

// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex
// based on uncommitted entries.
func TestRecoverPendingConfig(t *testing.T) {
func TestNewLeaderPendingConfig(t *testing.T) {
tests := []struct {
entType pb.EntryType
wpending bool
addEntry bool
wpendingIndex uint64
}{
{pb.EntryNormal, false},
{pb.EntryConfChange, true},
{false, 0},
{true, 1},
}
for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: tt.entType})
if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate()
r.becomeLeader()
if r.pendingConf != tt.wpending {
t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
if r.pendingConfIndex != tt.wpendingIndex {
t.Errorf("#%d: pendingConfIndex = %d, want %d",
i, r.pendingConfIndex, tt.wpendingIndex)
}
}
}

// TestRecoverDoublePendingConfig tests that new leader will panic if
// there exist two uncommitted config entries.
func TestRecoverDoublePendingConfig(t *testing.T) {
func() {
defer func() {
if err := recover(); err == nil {
t.Errorf("expect panic, but nothing happens")
}
}()
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()
}

// TestAddNode tests that addNode could update pendingConf and nodes correctly.
// TestAddNode tests that addNode could update nodes correctly.
func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
}

// TestAddLearner tests that addLearner could update pendingConf and nodes correctly.
// TestAddLearner tests that addLearner could update nodes correctly.
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addLearner(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
Expand All @@ -2841,7 +2819,6 @@ func TestAddLearner(t *testing.T) {
// immediately when checkQuorum is set.
func TestAddNodeCheckQuorum(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.checkQuorum = true

r.becomeCandidate()
Expand Down Expand Up @@ -2872,15 +2849,11 @@ func TestAddNodeCheckQuorum(t *testing.T) {
}
}

// TestRemoveNode tests that removeNode could update pendingConf, nodes and
// TestRemoveNode tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
Expand All @@ -2894,15 +2867,11 @@ func TestRemoveNode(t *testing.T) {
}
}

// TestRemoveLearner tests that removeNode could update pendingConf, nodes and
// TestRemoveLearner tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
Expand Down
2 changes: 0 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
// ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes()}
}
switch cc.Type {
Expand All @@ -180,7 +179,6 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
case pb.ConfChangeRemoveNode:
rn.raft.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
rn.raft.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down