Skip to content

Commit

Permalink
raft: add handleHeartbeat
Browse files Browse the repository at this point in the history
handleHeartbeat commits to the commit index in the message. It never decreases the
commit index of the raft state machine.
  • Loading branch information
xiang90 committed Nov 18, 2014
1 parent 1635844 commit 24a9bfe
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 7 deletions.
16 changes: 11 additions & 5 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
default:
l.append(ci-1, ents[ci-from:]...)
}
tocommit := min(committed, lastnewi)
// if toCommit > commitIndex, set commitIndex = toCommit
if l.committed < tocommit {
l.committed = tocommit
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
Expand Down Expand Up @@ -125,6 +121,16 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
return nil
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
panic("committed out of range")
}
l.committed = tocommit
}
}

func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
Expand Down
10 changes: 9 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}

func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(m.Snapshot) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
Expand Down Expand Up @@ -482,7 +486,11 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgApp:
r.elapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
r.handleHeartbeat(m)
} else {
r.handleAppendEntries(m)
}
case pb.MsgSnap:
r.elapsed = 0
r.handleSnapshot(m)
Expand Down
1 change: 0 additions & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,6 @@ func TestFollowerCheckMsgApp(t *testing.T) {
index uint64
wreject bool
}{
{ents[0].Term, ents[0].Index, false},
{ents[1].Term, ents[1].Index, false},
{ents[2].Term, ents[2].Index, false},
{ents[1].Term, ents[1].Index + 1, true},
Expand Down
29 changes: 29 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,35 @@ func TestHandleMsgApp(t *testing.T) {
}
}

// TestHandleHeartbeat ensures that the follower commits to the commit in the message.
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
}{
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
}

for i, tt := range tests {
sm := &raft{
state: StateFollower,
HardState: pb.HardState{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}, {Term: 3}}},
}
sm.raftLog.commitTo(2)
sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
}
m := sm.readMessages()
if len(m) != 0 {
t.Fatalf("#%d: msg = nil, want 0", i)
}
}
}

func TestRecvMsgVote(t *testing.T) {
tests := []struct {
state StateType
Expand Down

0 comments on commit 24a9bfe

Please sign in to comment.