Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: clean-up log conflict search #126318

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,61 +106,77 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(a logSlice) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
match, ok := l.findConflict(a)
if !ok {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(a.entries[ci-offset:]...)
}
return lastnewi, true

// Fast-forward to the first mismatching or missing entry.
// NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe.
a.entries = a.entries[match.index-a.prev.index:]
a.prev = match

// TODO(pav-kv): pass the logSlice down the stack, for safety checks and
// bookkeeping in the unstable structure.
l.append(a.entries...)
return a.lastIndex(), true
}

func (l *raftLog) append(ents ...pb.Entry) {
if len(ents) == 0 {
return
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
if first := ents[0].Index; first <= l.committed {
l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed)
}
l.unstable.truncateAndAppend(ents)
}

// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return id.index
// findConflict finds the last entry in the given log slice that matches the
// log. The next entry either mismatches, or is missing.
//
// If the slice partially/fully matches, this method returns true. The returned
// entryID is the ID of the last matching entry. It can be s.prev if it is the
// only matching entry. It is guaranteed that the returned entryID.index is in
// the [s.prev.index, s.lastIndex()] range.
//
// All the entries up to the returned entryID are already present in the log,
// and do not need to be appended again. The caller can safely fast-forward an
// append request to the next entry after it.
//
// Returns false if the given slice mismatches the log entirely, i.e. the s.prev
// entry has a mismatching entryID.term. In this case an append request can not
// proceed.
func (l *raftLog) findConflict(s logSlice) (entryID, bool) {
if !l.matchTerm(s.prev) {
return entryID{}, false
}

// TODO(pav-kv): add a fast-path here using the Log Matching property of raft.
// Check the term match at min(s.lastIndex(), l.lastIndex()) entry, and fall
// back to conflict search only if it mismatches.
// TODO(pav-kv): also, there should be no mismatch if s.term == l.accTerm, so
// the fast-path can avoid this one check too.
//
// TODO(pav-kv): every matchTerm call in the linear scan below can fall back
// to fetching an entry from storage. This is inefficient, we can improve it.
// Logs that don't match at one index, don't match at all indices above. So we
// can use binary search to find the fork.
match := s.prev
for i := range s.entries {
id := pbEntryID(&s.entries[i])
if l.matchTerm(id) {
match = id
continue
}
if id.index <= l.lastIndex() {
// TODO(pav-kv): should simply print %+v of the id.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return match, true
}
return 0
return match, true // all entries match
}

// findConflictByTerm returns a best guess on where this log ends matching
Expand Down
62 changes: 39 additions & 23 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,48 @@ import (

func TestFindConflict(t *testing.T) {
previousEnts := index(1).terms(1, 2, 3)
tests := []struct {
ents []pb.Entry
wconflict uint64
ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0
for i := range previousEnts {
ids = append(ids, pbEntryID(&previousEnts[i]))
}
for _, tt := range []struct {
prev entryID
ents []pb.Entry
notOk bool
want entryID
}{
// no conflict, empty ent
{nil, 0},
// prev does not match the log
{prev: entryID{term: 10, index: 1}, notOk: true},
{prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true},
{prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true},
// no conflict, empty entries
{ents: nil, want: ids[0]},
// no conflict
{index(1).terms(1, 2, 3), 0},
{index(2).terms(2, 3), 0},
{index(3).terms(3), 0},
{prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3), want: ids[3]},
// no conflict, but has new entries
{index(1).terms(1, 2, 3, 4, 4), 4},
{index(2).terms(2, 3, 4, 5), 4},
{index(3).terms(3, 4, 4), 4},
{index(4).terms(4, 4), 4},
// conflicts with existing entries
{index(1).terms(4, 4), 1},
{index(2).terms(1, 4, 4), 2},
{index(3).terms(1, 2, 4, 4), 3},
}

for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents))
{prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]},
{prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]},
// passes prev check, but conflicts with existing entries
{prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]},
{prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]},
{prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]},
// out of bounds
{prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true},
// just touching the right bound, but still out of bounds
{prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true},
} {
t.Run("", func(t *testing.T) {
log := newLog(NewMemoryStorage(), discardLogger)
log.append(previousEnts...)
app := logSlice{term: 100, prev: tt.prev, entries: tt.ents}
require.NoError(t, app.valid())
match, ok := log.findConflict(app)
require.Equal(t, !tt.notOk, ok)
require.Equal(t, tt.want, match)
})
}
}
Expand Down
Loading