Skip to content

Commit

Permalink
raft: provide protection against unbounded Raft log growth
Browse files Browse the repository at this point in the history
The suggested pattern for Raft proposals is that they be retried
periodically until they succeed. This turns out to be an issue
when a leader cannot commit entries because the leader will continue
to append re-proposed entries to its log without committing anything.
This can result in the uncommitted tail of a leader's log growing
without bound until it is able to commit entries.

This change add a safeguard to protect against this case where a
leader's log can grow without bound during loss of quorum scenarios.
It does so by introducing a new, optional ``MaxUncommittedEntriesSize
configuration. This config limits the max aggregate size of uncommitted
entries that may be appended to a leader's log. Once this limit
is exceeded, proposals will begin to return ErrProposalDropped
errors.

See cockroachdb/cockroach#27772
  • Loading branch information
nvanbenschoten committed Oct 11, 2018
1 parent b046a37 commit a76382b
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 25 deletions.
13 changes: 7 additions & 6 deletions contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}

if oldwal {
Expand Down
1 change: 1 addition & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func (n *node) run(r *raft) {

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
advancec = n.advancec
case <-advancec:
if applyingToI != 0 {
Expand Down
54 changes: 54 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,3 +997,57 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
)
}
}

// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
// partitioned from a quorum of nodes. It verifies that the leader's log is
// protected from unbounded growth even as new entries continue to be proposed.
// This protection is provided by the MaxUncommittedEntriesSize configuration.
func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxUncommittedEntriesSize = maxEntrySize
r := newRaft(cfg)
n := newNode()
go n.run(r)
defer n.Stop()
n.Campaign(context.TODO())

rd := readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()

// Simulate a network partition while we make our proposals by never
// committing anything. These proposals should not cause the leader's
// log to grow indefinitely.
for i := 0; i < 1024; i++ {
n.Propose(context.TODO(), data)
}

// Check the size of leader's uncommitted log tail. It should not exceed the
// MaxUncommittedEntriesSize limit.
checkUncommitted := func(exp uint64) {
t.Helper()
if a := r.uncommittedSize; exp != a {
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
}
}
checkUncommitted(maxEntrySize)

// Recover from the partition. The uncommitted tail of the Raft log should
// disappear as entries are committed.
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != maxEntries {
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
checkUncommitted(0)
}
75 changes: 70 additions & 5 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ type Config struct {
// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
// message.
MaxSizePerMsg uint64
// MaxUncommittedEntriesSize limits the aggregate size of the uncommitted
// entries that may be appended to a leader's log. Once this limit is
// exceeded, proposals will begin to return ErrProposalDropped errors.
// Note: 0 for no limit.
MaxUncommittedEntriesSize uint64
// MaxInflightMsgs limits the max number of in-flight append messages during
// optimistic replication phase. The application transportation layer usually
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
Expand Down Expand Up @@ -215,6 +220,12 @@ func (c *Config) validate() error {
return errors.New("storage cannot be nil")
}

if c.MaxUncommittedEntriesSize < 0 {
return errors.New("max uncommitted entry size cannot be less than 0")
} else if c.MaxUncommittedEntriesSize == 0 {
c.MaxUncommittedEntriesSize = noLimit
}

if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
Expand All @@ -241,11 +252,12 @@ type raft struct {
// the log
raftLog *raftLog

maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice
maxMsgSize uint64
maxUncommittedSize uint64
maxInflight int
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice

state StateType

Expand All @@ -268,6 +280,10 @@ type raft struct {
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
// an estimate of the size of the uncommitted tail of the Raft log. Used to
// prevent unbounded log growth. Only maintained by the leader. Reset on
// term changes.
uncommittedSize uint64

readOnly *readOnly

Expand Down Expand Up @@ -326,6 +342,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: make(map[uint64]*Progress),
learnerPrs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick,
Expand Down Expand Up @@ -616,6 +633,7 @@ func (r *raft) reset(term uint64) {
})

r.pendingConfIndex = 0
r.uncommittedSize = 0
r.readOnly = newReadOnly(r.readOnly.option)
}

Expand Down Expand Up @@ -954,6 +972,10 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand Down Expand Up @@ -1462,6 +1484,49 @@ func (r *raft) abortLeaderTransfer() {
r.leadTransferee = None
}

// increaseUncommittedSize computes the size of the proposed entries and
// determines whether they would push leader over its maxUncommittedSize limit.
// If the new entries would exceed the limit, the method returns false. If not,
// the increase in uncommitted entry size is recorded and the method returns
// true.
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
// If the uncommitted tail of the Raft log is empty, allow any size
// proposal. Otherwise, limit the size of the uncommitted tail of the
// log and drop any proposal that would push the size over the limit.
return false
}
r.uncommittedSize += s
return true
}

// reduceUncommittedSize accounts for the newly committed entries by decreasing
// the uncommitted entry size limit.
func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
if r.uncommittedSize == 0 {
// Fast-path for followers, who do not track or enforce the limit.
return
}

var s uint64
for _, e := range ents {
s += uint64(e.Size())
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
// log tail but will never overestimate it. Saturate at 0 instead of
// allowing overflow.
r.uncommittedSize = 0
} else {
r.uncommittedSize -= s
}
}

func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
Expand Down
69 changes: 67 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,71 @@ func TestProgressFlowControl(t *testing.T) {
}
}

func TestUncommittedEntryLimit(t *testing.T) {
const maxEntries = 16
testEntry := pb.Entry{Data: []byte("testdata")}
maxEntrySize := maxEntries * testEntry.Size()

cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()

// Set the two followers to the replicate state. Commit to tail of log.
const numFollowers = 2
r.prs[2].becomeReplicate()
r.prs[3].becomeReplicate()
r.uncommittedSize = 0

// Send proposals to r1. The first 5 entries should be appended to the log.
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}}
propEnts := make([]pb.Entry, maxEntries)
for i := 0; i < maxEntries; i++ {
if err := r.Step(propMsg); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}
propEnts[i] = testEntry
}

// Send one more proposal to r1. It should be rejected.
if err := r.Step(propMsg); err != ErrProposalDropped {
t.Fatalf("proposal not dropped: %v", err)
}

// Read messages and reduce the uncommitted size as if we had committed
// these entries.
ms := r.readMessages()
if e := maxEntries * numFollowers; len(ms) != e {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)

// Send a single large proposal to r1. Should be accepted even though it
// pushes us above the limit because we were beneath it before the proposal.
propEnts = make([]pb.Entry, 2*maxEntries)
for i := range propEnts {
propEnts[i] = testEntry
}
propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
if err := r.Step(propMsgLarge); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}

// Send one more proposal to r1. It should be rejected, again.
if err := r.Step(propMsg); err != ErrProposalDropped {
t.Fatalf("proposal not dropped: %v", err)
}

// Read messages and reduce the uncommitted size as if we had committed
// these entries.
ms = r.readMessages()
if e := 1 * numFollowers; len(ms) != e {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
}

func TestLeaderElection(t *testing.T) {
testLeaderElection(t, false)
}
Expand Down Expand Up @@ -4138,8 +4203,8 @@ func (nw *network) drop(from, to uint64, perc float64) {
}

func (nw *network) cut(one, other uint64) {
nw.drop(one, other, 2.0) // always drop
nw.drop(other, one, 2.0) // always drop
nw.drop(one, other, 1.0) // always drop
nw.drop(other, one, 1.0) // always drop
}

func (nw *network) isolate(id uint64) {
Expand Down
26 changes: 14 additions & 12 deletions raft/rafttest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ type node struct {
func startNode(id uint64, peers []raft.Peer, iface iface) *node {
st := raft.NewMemoryStorage()
c := &raft.Config{
ID: id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: st,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: st,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
rn := raft.StartNode(c, peers)
n := &node{
Expand Down Expand Up @@ -125,12 +126,13 @@ func (n *node) restart() {
// wait for the shutdown
<-n.stopc
c := &raft.Config{
ID: n.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: n.storage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: n.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: n.storage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
n.Node = raft.RestartNode(c)
n.start()
Expand Down
1 change: 1 addition & 0 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error {
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rn.raft.msgs = nil
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
return rd
}

Expand Down
Loading

0 comments on commit a76382b

Please sign in to comment.