Skip to content

Commit

Permalink
TLA+ spec of raft consensus algorithm in etcd implementation and mode…
Browse files Browse the repository at this point in the history
…l based trace validation

Signed-off-by: Joshua Zhang <joshuazh@microsoft.com>
  • Loading branch information
joshuazh-x committed Apr 6, 2024
1 parent ebcf5af commit 64ced68
Show file tree
Hide file tree
Showing 14 changed files with 7,359 additions and 0 deletions.
29 changes: 29 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type Config struct {
// This behavior will become unconditional in the future. See:
// https://github.com/etcd-io/raft/issues/83
StepDownOnRemoval bool

// raft state tracer
TraceLogger TraceLogger
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -427,6 +430,8 @@ type raft struct {
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message

traceLogger TraceLogger
}

func newRaft(c *Config) *raft {
Expand Down Expand Up @@ -456,8 +461,11 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
disableConfChangeValidation: c.DisableConfChangeValidation,
stepDownOnRemoval: c.StepDownOnRemoval,
traceLogger: c.TraceLogger,
}

traceInitState(r)

lastID := r.raftLog.lastEntryID()
cfg, trk, err := confchange.Restore(confchange.Changer{
Tracker: r.trk,
Expand Down Expand Up @@ -580,11 +588,13 @@ func (r *raft) send(m pb.Message) {
// we err on the side of safety and omit a `&& !m.Reject` condition
// above.
r.msgsAfterAppend = append(r.msgsAfterAppend, m)
traceSendMessage(r, &m)
} else {
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
traceSendMessage(r, &m)
}
}

Expand Down Expand Up @@ -766,6 +776,8 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
// index changed (in which case the caller should call r.bcastAppend). This can
// only be called in StateLeader.
func (r *raft) maybeCommit() bool {
defer traceCommit(r)

return r.raftLog.maybeCommit(entryID{term: r.Term, index: r.trk.Committed()})
}

Expand Down Expand Up @@ -815,6 +827,13 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
// Drop the proposal.
return false
}
if StateTraceDeployed {
for i := range es {
if es[i].Type == pb.EntryNormal {
traceReplicate(r)
}
}
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
// The leader needs to self-ack the entries just appended once they have
Expand Down Expand Up @@ -880,6 +899,8 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)

traceBecomeFollower(r)
}

func (r *raft) becomeCandidate() {
Expand All @@ -893,6 +914,8 @@ func (r *raft) becomeCandidate() {
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)

traceBecomeCandidate(r)
}

func (r *raft) becomePreCandidate() {
Expand Down Expand Up @@ -938,6 +961,7 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

traceBecomeLeader(r)
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
Expand Down Expand Up @@ -1063,6 +1087,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
}

func (r *raft) Step(m pb.Message) error {
traceReceiveMessage(r, &m)

// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
Expand Down Expand Up @@ -1311,6 +1337,7 @@ func stepLeader(r *raft, m pb.Message) error {
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
traceChangeConfEvent(cc, r)
}
}
}
Expand Down Expand Up @@ -1950,6 +1977,8 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.ConfState {
traceConfChangeEvent(cfg, r)

r.trk.Config = cfg
r.trk.Progress = trk

Expand Down
2 changes: 2 additions & 0 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ func (rn *RawNode) acceptReady(rd Ready) {
index := ents[len(ents)-1].Index
rn.raft.raftLog.acceptApplying(index, entsSize(ents), rn.applyUnstableEntries())
}

traceReady(rn.raft)
}

// applyUnstableEntries returns whether entries are allowed to be applied once
Expand Down
Loading

0 comments on commit 64ced68

Please sign in to comment.