Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix forward bor snaps #10027

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 146 additions & 69 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func BorHeimdallForward(

var blockNum uint64
var fetchTime time.Duration
var snapTime time.Duration
var snapInitTime time.Duration
var eventRecords int

lastSpanID, err := fetchRequiredHeimdallSpansIfNeeded(ctx, headNumber, tx, cfg, s.LogPrefix(), logger)
if err != nil {
return err
Expand All @@ -193,6 +196,8 @@ func BorHeimdallForward(
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"fetch time", fetchTime,
"snaps", snapTime,
"snap-init", snapInitTime,
"process time", time.Since(processStart),
)
}
Expand Down Expand Up @@ -224,13 +229,25 @@ func BorHeimdallForward(
return fmt.Errorf("verification failed for header %d: %x", blockNum, header.Hash())
}

snapStart := time.Now()

if cfg.blockReader.BorSnapshots().SegmentsMin() == 0 {
snapTime = snapTime + time.Since(snapStart)

// SegmentsMin is only set if running as an uploader process (check SnapshotsCfg.snapshotUploader and
// UploadLocationFlag) when we remove snapshots based on FrozenBlockLimit and number of uploaded snapshots
// avoid calling this if block for blockNums <= SegmentsMin to avoid reinsertion of snapshots
snap := loadSnapshot(blockNum, header.Hash(), cfg.borConfig, recents, signatures, cfg.snapDb, logger)

if snap == nil {
lastPersistedBlockNum, err := lastPersistedSnapshotBlock(ctx, cfg.snapDb)
if err != nil {
return err
}

// if the last time we persisted snapshots is too far away re-run the forward
// initialization process - this is to avoid memory growth due to recusrion
// in persistValidatorSets
if snap == nil && blockNum-lastPersistedBlockNum > (snapshotPersistInterval*5) {
snap, err = initValidatorSets(
ctx,
tx,
Expand All @@ -239,6 +256,7 @@ func BorHeimdallForward(
cfg.heimdallClient,
chain,
blockNum,
lastPersistedBlockNum,
recents,
signatures,
cfg.snapDb,
Expand Down Expand Up @@ -267,6 +285,8 @@ func BorHeimdallForward(
}
}

snapTime = snapTime + time.Since(snapStart)

if err := checkBorHeaderExtraDataIfRequired(chain, header, cfg.borConfig); err != nil {
return err
}
Expand Down Expand Up @@ -312,6 +332,7 @@ func BorHeimdallForward(
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"fetch time", fetchTime,
"snap time", snapTime,
"process time", time.Since(processStart),
)

Expand Down Expand Up @@ -472,6 +493,25 @@ func persistValidatorSets(
return nil
}

func lastPersistedSnapshotBlock(ctx context.Context, snapDb kv.RwDB) (uint64, error) {
var lastPersistedBlockNum uint64

err := snapDb.View(context.Background(), func(tx kv.Tx) error {
progressBytes, err := tx.GetOne(kv.BorSeparate, []byte("bor-snapshot-progress"))
if err != nil {
return err
}

if len(progressBytes) == 8 {
lastPersistedBlockNum = binary.BigEndian.Uint64(progressBytes)
}

return nil
})

return lastPersistedBlockNum, err
}

func initValidatorSets(
ctx context.Context,
tx kv.RwTx,
Expand All @@ -480,6 +520,7 @@ func initValidatorSets(
heimdallClient heimdall.HeimdallClient,
chain consensus.ChainHeaderReader,
blockNum uint64,
lastPersistedBlockNum uint64,
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot],
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address],
snapDb kv.RwDB,
Expand All @@ -492,92 +533,128 @@ func initValidatorSets(

var snap *bor.Snapshot

// Special handling of the headers in the snapshot
zeroHeader := chain.GetHeaderByNumber(0)
if zeroHeader != nil {
// get checkpoint data
hash := zeroHeader.Hash()
var parentHeader *types.Header
var firstBlockNum uint64

if zeroSnap := loadSnapshot(0, hash, config, recents, signatures, snapDb, logger); zeroSnap != nil {
return nil, nil
}
if lastPersistedBlockNum > 0 {
parentHeader = chain.GetHeaderByNumber(lastPersistedBlockNum)
snap = loadSnapshot(lastPersistedBlockNum, parentHeader.Hash(), config, recents, signatures, snapDb, logger)
firstBlockNum = lastPersistedBlockNum + 1
} else {
// Special handling of the headers in the snapshot
zeroHeader := chain.GetHeaderByNumber(0)

// get validators and current span
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)
if zeroHeader != nil {
// get checkpoint data
hash := zeroHeader.Hash()

if err != nil {
if _, err := fetchAndWriteHeimdallSpan(ctx, 0, tx, heimdallClient, logPrefix, logger); err != nil {
return nil, err
}
if snap = loadSnapshot(0, hash, config, recents, signatures, snapDb, logger); snap == nil {
// get validators and current span
zeroSpanBytes, err := blockReader.Span(ctx, tx, 0)

zeroSpanBytes, err = blockReader.Span(ctx, tx, 0)
if err != nil {
if _, err := fetchAndWriteHeimdallSpan(ctx, 0, tx, heimdallClient, logPrefix, logger); err != nil {
return nil, err
}

if err != nil {
return nil, err
zeroSpanBytes, err = blockReader.Span(ctx, tx, 0)

if err != nil {
return nil, err
}
}

if zeroSpanBytes == nil {
return nil, fmt.Errorf("zero span not found")
}

var zeroSpan heimdall.Span
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
return nil, err
}

// new snap shot
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
if err := snap.Store(snapDb); err != nil {
return nil, fmt.Errorf("snap.Store (0): %w", err)
}

logger.Debug(fmt.Sprintf("[%s] Stored proposer snapshot to disk (init)", logPrefix), "number", 0, "hash", hash)
}
}

if zeroSpanBytes == nil {
return nil, fmt.Errorf("zero span not found")
parentHeader = zeroHeader
firstBlockNum = 1
lastPersistedBlockNum = 0
}
}

var zeroSpan heimdall.Span
if err = json.Unmarshal(zeroSpanBytes, &zeroSpan); err != nil {
return nil, err
}
g := errgroup.Group{}
g.SetLimit(estimate.AlmostAllCPUs())
defer func() {
_ = g.Wait() // goroutines used in this err group do not return err (check below)
}()

// new snap shot
snap = bor.NewSnapshot(config, signatures, 0, hash, zeroSpan.ValidatorSet.Validators, logger)
if err := snap.Store(snapDb); err != nil {
return nil, fmt.Errorf("snap.Store (0): %w", err)
}
logger.Debug(fmt.Sprintf("[%s] Stored proposer snapshot to disk", logPrefix), "number", 0, "hash", hash)
g := errgroup.Group{}
g.SetLimit(estimate.AlmostAllCPUs())
defer func() {
_ = g.Wait() // goroutines used in this err group do not return err (check below)
}()

batchSize := 128 // must be < InMemorySignatures
initialHeaders := make([]*types.Header, 0, batchSize)
parentHeader := zeroHeader
for i := uint64(1); i <= blockNum; i++ {
header := chain.GetHeaderByNumber(i)
{
// `snap.apply` bottleneck - is recover of signer.
// to speedup: recover signer in background goroutines and save in `sigcache`
// `batchSize` < `InMemorySignatures`: means all current batch will fit in cache - and
// `snap.apply` will find it there.
g.Go(func() error {
if header == nil {
return nil
}
_, _ = bor.Ecrecover(header, signatures, config)
batchSize := 128 // must be < InMemorySignatures
initialHeaders := make([]*types.Header, 0, batchSize)

var err error

for i := firstBlockNum; i <= blockNum; i++ {
header := chain.GetHeaderByNumber(i)
{
// `snap.apply` bottleneck - is recover of signer.
// to speedup: recover signer in background goroutines and save in `sigcache`
// `batchSize` < `InMemorySignatures`: means all current batch will fit in cache - and
// `snap.apply` will find it there.
g.Go(func() error {
if header == nil {
return nil
})
}
if header == nil {
return nil, fmt.Errorf("missing header persisting validator sets: (inside loop at %d)", i)
}
initialHeaders = append(initialHeaders, header)
if len(initialHeaders) == cap(initialHeaders) {
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return nil, fmt.Errorf("snap.Apply (inside loop): %w", err)
}
parentHeader = initialHeaders[len(initialHeaders)-1]
initialHeaders = initialHeaders[:0]
_, _ = bor.Ecrecover(header, signatures, config)
return nil
})
}
if header == nil {
return nil, fmt.Errorf("missing header persisting validator sets: (inside loop at %d)", i)
}
initialHeaders = append(initialHeaders, header)

if len(initialHeaders) == cap(initialHeaders) {

if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return nil, fmt.Errorf("snap.Apply (inside loop): %w", err)
}
select {
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "blockNum", i)
default:

parentHeader = initialHeaders[len(initialHeaders)-1]
initialHeaders = initialHeaders[:0]

// If we've generated a new persistent snapshot, save to disk
if snap.Number%snapshotPersistInterval == 0 {
if err := snap.Store(snapDb); err != nil {
return nil, fmt.Errorf("snap.Store: %w", err)
}

lastPersistedBlockNum = snap.Number

logger.Trace(
fmt.Sprintf("[%s] Stored proposer snapshot to disk (init loop)", logPrefix),
"number", snap.Number,
"hash", snap.Hash,
)
}
}
if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return nil, fmt.Errorf("snap.Apply (outside loop): %w", err)

select {
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] Computing validator proposer prorities (forward)", logPrefix), "to", blockNum, "snapNum", i, "persisted", lastPersistedBlockNum)
default:
}
}

if snap, err = snap.Apply(parentHeader, initialHeaders, logger); err != nil {
return nil, fmt.Errorf("snap.Apply (outside loop): %w", err)
}

return snap, nil
}

Expand Down
Loading