Skip to content

Commit

Permalink
Merge pull request #1740 from xiang90/handleheartbeat
Browse files Browse the repository at this point in the history
raft: add handleHeartbeat
  • Loading branch information
xiang90 committed Nov 18, 2014
2 parents f94ff96 + bd4cfa2 commit e07ef69
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 8 deletions.
18 changes: 12 additions & 6 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
default:
l.append(ci-1, ents[ci-from:]...)
}
tocommit := min(committed, lastnewi)
// if toCommit > commitIndex, set commitIndex = toCommit
if l.committed < tocommit {
l.committed = tocommit
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
Expand Down Expand Up @@ -125,6 +121,16 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
return nil
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
panic("committed out of range")
}
l.committed = tocommit
}
}

func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
Expand Down Expand Up @@ -179,7 +185,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.term(maxIndex) == term {
l.committed = maxIndex
l.commitTo(maxIndex)
return true
}
return false
Expand Down
32 changes: 32 additions & 0 deletions raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,38 @@ func TestUnstableEnts(t *testing.T) {
}
}

func TestCommitTo(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}
commit := uint64(2)
tests := []struct {
commit uint64
wcommit uint64
wpanic bool
}{
{3, 3, false},
{1, 2, false}, // never decrease
{4, 0, true}, // commit out of range -> panic
}
for i, tt := range tests {
func() {
defer func() {
if r := recover(); r != nil {
if tt.wpanic != true {
t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
}
}
}()
raftLog := newLog()
raftLog.append(0, previousEnts...)
raftLog.committed = commit
raftLog.commitTo(tt.commit)
if raftLog.committed != tt.wcommit {
t.Errorf("#%d: committed = %d, want %d", i, raftLog.committed, tt.wcommit)
}
}()
}
}

func TestStableTo(t *testing.T) {
tests := []struct {
stable uint64
Expand Down
10 changes: 9 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
}

func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(m.Snapshot) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
Expand Down Expand Up @@ -482,7 +486,11 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgApp:
r.elapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
r.handleHeartbeat(m)
} else {
r.handleAppendEntries(m)
}
case pb.MsgSnap:
r.elapsed = 0
r.handleSnapshot(m)
Expand Down
1 change: 0 additions & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,6 @@ func TestFollowerCheckMsgApp(t *testing.T) {
index uint64
wreject bool
}{
{ents[0].Term, ents[0].Index, false},
{ents[1].Term, ents[1].Index, false},
{ents[2].Term, ents[2].Index, false},
{ents[1].Term, ents[1].Index + 1, true},
Expand Down
29 changes: 29 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,35 @@ func TestHandleMsgApp(t *testing.T) {
}
}

// TestHandleHeartbeat ensures that the follower commits to the commit in the message.
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
}{
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
}

for i, tt := range tests {
sm := &raft{
state: StateFollower,
HardState: pb.HardState{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}, {Term: 3}}},
}
sm.raftLog.commitTo(commit)
sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
}
m := sm.readMessages()
if len(m) != 0 {
t.Fatalf("#%d: msg = nil, want 0", i)
}
}
}

func TestRecvMsgVote(t *testing.T) {
tests := []struct {
state StateType
Expand Down

0 comments on commit e07ef69

Please sign in to comment.