Skip to content

Commit

Permalink
raft: fast fail while dropping index reading
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Apr 12, 2018
1 parent f46368c commit 99eff43
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 31 deletions.
45 changes: 22 additions & 23 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ type msgWithResult struct {
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
recvc chan msgWithResult
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
Expand All @@ -248,7 +248,7 @@ type node struct {
func newNode() node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
recvc: make(chan msgWithResult),
confc: make(chan pb.ConfChange),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
Expand Down Expand Up @@ -327,10 +327,15 @@ func (n *node) run(r *raft) {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc:
case rm := <-n.recvc:
m := rm.m
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
err := r.Step(m)
if rm.result != nil {
rm.result <- err
close(rm.result)
}
}
case cc := <-n.confc:
if cc.NodeID == None {
Expand Down Expand Up @@ -449,23 +454,17 @@ func (n *node) stepWait(ctx context.Context, m pb.Message) error {
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
if m.Type != pb.MsgProp {
select {
case n.recvc <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
ch := n.propc
pm := msgWithResult{m: m}
ch := n.recvc
msg := msgWithResult{m: m}
if wait {
pm.result = make(chan error, 1)
msg.result = make(chan error, 1)
}
if m.Type == pb.MsgProp {
ch = n.propc
}

select {
case ch <- pm:
case ch <- msg:
if !wait {
return nil
}
Expand All @@ -475,7 +474,7 @@ func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool)
return ErrStopped
}
select {
case rsp := <-pm.result:
case rsp := <-msg.result:
if rsp != nil {
return rsp
}
Expand Down Expand Up @@ -521,7 +520,7 @@ func (n *node) Status() Status {

func (n *node) ReportUnreachable(id uint64) {
select {
case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
case n.recvc <- msgWithResult{m: pb.Message{Type: pb.MsgUnreachable, From: id}}:
case <-n.done:
}
}
Expand All @@ -530,22 +529,22 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
rej := status == SnapshotFailure

select {
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
case n.recvc <- msgWithResult{m: pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}}:
case <-n.done:
}
}

func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
select {
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
case n.recvc <- msgWithResult{m: pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}}:
case <-n.done:
case <-ctx.Done():
}
}

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
return n.stepWait(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
Expand Down
50 changes: 48 additions & 2 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestNodeStep(t *testing.T) {
for i, msgn := range raftpb.MessageType_name {
n := &node{
propc: make(chan msgWithResult, 1),
recvc: make(chan raftpb.Message, 1),
recvc: make(chan msgWithResult, 1),
}
msgt := raftpb.MessageType(i)
n.Step(context.TODO(), raftpb.Message{Type: msgt})
Expand Down Expand Up @@ -473,7 +473,53 @@ func TestNodeProposeWaitDropped(t *testing.T) {

n.Stop()
if len(msgs) != 0 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 0)
}
}

func TestNodeReadIndexWaitDropped(t *testing.T) {
msgs := []raftpb.Message{}
droppingMsg := []byte("test_dropping")
dropStep := func(r *raft, m raftpb.Message) error {
if m.Type == raftpb.MsgReadIndex && strings.Contains(m.String(), string(droppingMsg)) {
t.Logf("dropping message: %v", m.String())
return ErrReadIndexDropped
}
msgs = append(msgs, m)
return nil
}

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
s.Append(rd.Entries)
// change the step function to dropStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
r.step = dropStep
n.Advance()
break
}
n.Advance()
}
readIndexTimeout := time.Millisecond * 100
ctx, cancel := context.WithTimeout(context.Background(), readIndexTimeout)
// read index with cancel should be cancelled earyly if dropped
err := n.ReadIndex(ctx, droppingMsg)
if err != ErrReadIndexDropped {
t.Errorf("should drop read index: %v", err)
}
cancel()

n.Stop()
if len(msgs) != 0 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 0)
}
if len(r.readStates) != 0 {
t.Fatalf("len(readStates) = %d, want %d", len(r.readStates), 0)
}
}

Expand Down
20 changes: 14 additions & 6 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ const (
campaignTransfer CampaignType = "CampaignTransfer"
)

// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors.New("raft proposal dropped")
var (
// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
ErrProposalDropped = errors.New("raft proposal dropped")
// ErrReadIndexDropped is returned when the index reading message is ignored by some cases,
// so that the reader can be notified and fail fast.
ErrReadIndexDropped = errors.New("raft read index dropped")
)

// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
Expand Down Expand Up @@ -958,7 +963,7 @@ func stepLeader(r *raft, m pb.Message) error {
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
return ErrReadIndexDropped
}

// thinking: use an interally defined context instead of the user given context.
Expand Down Expand Up @@ -1134,6 +1139,9 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgReadIndex:
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return ErrReadIndexDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
Expand Down Expand Up @@ -1209,14 +1217,14 @@ func stepFollower(r *raft, m pb.Message) error {
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
return ErrReadIndexDropped
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
return ErrReadIndexDropped
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
Expand Down

0 comments on commit 99eff43

Please sign in to comment.