Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLA+ Trace validation #113

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type Config struct {
// This behavior will become unconditional in the future. See:
// https://github.com/etcd-io/raft/issues/83
StepDownOnRemoval bool

// raft state tracer
TraceLogger TraceLogger
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -427,6 +430,8 @@ type raft struct {
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message

traceLogger TraceLogger
}

func newRaft(c *Config) *raft {
Expand Down Expand Up @@ -456,8 +461,11 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
disableConfChangeValidation: c.DisableConfChangeValidation,
stepDownOnRemoval: c.StepDownOnRemoval,
traceLogger: c.TraceLogger,
}

traceInitState(r)

lastID := r.raftLog.lastEntryID()
cfg, trk, err := confchange.Restore(confchange.Changer{
Tracker: r.trk,
Expand Down Expand Up @@ -580,11 +588,13 @@ func (r *raft) send(m pb.Message) {
// we err on the side of safety and omit a `&& !m.Reject` condition
// above.
r.msgsAfterAppend = append(r.msgsAfterAppend, m)
traceSendMessage(r, &m)
} else {
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
traceSendMessage(r, &m)
}
}

Expand Down Expand Up @@ -766,6 +776,8 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
// index changed (in which case the caller should call r.bcastAppend). This can
// only be called in StateLeader.
func (r *raft) maybeCommit() bool {
defer traceCommit(r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's maybe Commit. Which means that it may not commit. But you always trace the commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit trace here will make state machine step with AdvanceCommitIndex action which contains same logic as that in maybeCommit. We expect state machine has states as that after calling maybeCommit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit trace here will make state machine step with AdvanceCommitIndex action which contains same logic as that in maybeCommit. We expect state machine has states as that after calling maybeCommit.

  • I do not see the "same logic".
  • Do you mean that you always need to record a rsmCommit event in the trace even if raft doesn't really commit although it called (*raft) maybeCommit()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rsmCommit tells trace validator to try advancing commit index as that in action AdvanceCommitIndex in the spec. As you can see, newCommitIndex will has same value as current commitIndex if no uncommitted entries have quorum acks in current term. This is of same behavior as maybeCommit.

AdvanceCommitIndex(i) ==
    /\ state[i] = Leader
    /\ LET \* The set of servers that agree up through index.
           Agree(index) == {k \in GetConfig(i) : matchIndex[i][k] >= index}
           logSize == Len(log[i])
           \* logSize == MaxLogLength
           \* The maximum indexes for which a quorum agrees
           agreeIndexes == {index \in 1..logSize :
                                Agree(index) \in Quorum(GetConfig(i))}
           \* New value for commitIndex'[i]
           newCommitIndex ==
              IF /\ agreeIndexes /= {}
                 /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
              THEN
                  Max(agreeIndexes)
              ELSE
                  commitIndex[i]
       IN
        /\ CommitTo(i, newCommitIndex)
    /\ UNCHANGED <<messageVars, serverVars, candidateVars, leaderVars, log, configVars, durableState>>


return r.raftLog.maybeCommit(entryID{term: r.Term, index: r.trk.Committed()})
}

Expand Down Expand Up @@ -815,6 +827,9 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
// Drop the proposal.
return false
}

traceReplicate(r, es...)

// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
// The leader needs to self-ack the entries just appended once they have
Expand Down Expand Up @@ -880,6 +895,8 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)

traceBecomeFollower(r)
}

func (r *raft) becomeCandidate() {
Expand All @@ -893,6 +910,8 @@ func (r *raft) becomeCandidate() {
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)

traceBecomeCandidate(r)
}

func (r *raft) becomePreCandidate() {
Expand Down Expand Up @@ -938,6 +957,7 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

traceBecomeLeader(r)
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
Expand Down Expand Up @@ -1063,6 +1083,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
}

func (r *raft) Step(m pb.Message) error {
traceReceiveMessage(r, &m)

// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
Expand Down Expand Up @@ -1311,6 +1333,7 @@ func stepLeader(r *raft, m pb.Message) error {
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
traceChangeConfEvent(cc, r)
}
}
}
Expand Down Expand Up @@ -1950,6 +1973,8 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.ConfState {
traceConfChangeEvent(cfg, r)

r.trk.Config = cfg
r.trk.Progress = trk

Expand Down
2 changes: 2 additions & 0 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ func (rn *RawNode) acceptReady(rd Ready) {
index := ents[len(ents)-1].Index
rn.raft.raftLog.acceptApplying(index, entsSize(ents), rn.applyUnstableEntries())
}

traceReady(rn.raft)
}

// applyUnstableEntries returns whether entries are allowed to be applied once
Expand Down
Loading
Loading