Skip to content

Commit

Permalink
When calculating a snapshot, retrieve entries in batches. (hypermodei…
Browse files Browse the repository at this point in the history
…nc#3409)

Calculating a snapshot can lead to an OOM issue since the entries might
not be able to fit into memory. This PR changes to logic to instead
retrieve the entries in batches of 64MB.
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent e5b3209 commit 7ab7a93
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,15 +1225,10 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
}
span.Annotatef(nil, "Found Raft entries: %d", last-first)

entries, err := n.Store.Entries(first, last+1, math.MaxUint64)
if err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}

if num := posting.Oracle().NumPendingTxns(); num > 0 {
glog.V(2).Infof("Num pending txns: %d", num)
}

// We can't rely upon the Raft entries to determine the minPendingStart,
// because there are many cases during mutations where we don't commit or
// abort the transaction. This might happen due to an early error thrown.
Expand All @@ -1248,37 +1243,60 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
minPendingStart := posting.Oracle().MinPendingStartTs()
maxCommitTs := snap.ReadTs
var snapshotIdx uint64
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal pb.Proposal
if err := proposal.Unmarshal(entry.Data); err != nil {

// Trying to retrieve all entries at once might cause out-of-memory issues in
// cases where the raft log is too big to fit into memory. Instead of retrieving
// all entries at once, retrieve it in batches of 64MB.
var lastEntry raftpb.Entry
for batchFirst := first; batchFirst <= last; {
entries, err := n.Store.Entries(batchFirst, last+1, 64<<20)
if err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}
if proposal.Mutations != nil {
start := proposal.Mutations.StartTs
if start >= minPendingStart && snapshotIdx == 0 {
snapshotIdx = entry.Index - 1
}

// Exit early from the loop if no entries were found.
if len(entries) == 0 {
break
}
if proposal.Delta != nil {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)

// Store the last entry (as it might be needed outside the loop) and set the
// start of the new batch at the entry following it. Also set foundEntries to
// true to indicate to the code outside the loop that entries were retrieved.
lastEntry = entries[len(entries)-1]
batchFirst = lastEntry.Index + 1

for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal pb.Proposal
if err := proposal.Unmarshal(entry.Data); err != nil {
span.Annotatef(nil, "Error: %v", err)
return nil, err
}
if proposal.Mutations != nil {
start := proposal.Mutations.StartTs
if start >= minPendingStart && snapshotIdx == 0 {
snapshotIdx = entry.Index - 1
}
}
if proposal.Delta != nil {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
}
}
}
}

if maxCommitTs == 0 {
span.Annotate(nil, "maxCommitTs is zero")
return nil, nil
}
if snapshotIdx <= 0 {
// It is possible that there are no pending transactions. In that case,
// snapshotIdx would be zero.
if len(entries) > 0 {
snapshotIdx = entries[len(entries)-1].Index
}
snapshotIdx = lastEntry.Index
span.Annotatef(nil, "snapshotIdx is zero. Using last entry's index: %d", snapshotIdx)
}

Expand Down

0 comments on commit 7ab7a93

Please sign in to comment.