Skip to content

Commit

Permalink
datastream repopulation logic changes and removal of batch partially … (
Browse files Browse the repository at this point in the history
#992)

* datastream repopulation logic changes and removal of batch partially processed

partially processed removed in favour of simply sealing the WIP batch on a restart

* refactor of checking for stream gap in sequencing

* do not connect to datastream during startup

this causes timeout problems on new nodes that could spend a long time running L1 sync

* refactor of batch end logic in sequencing

* tidy up and comments around new datastream handling in sequencer
  • Loading branch information
hexoscott authored Aug 22, 2024
1 parent d142ab1 commit 569ec3b
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 124 deletions.
21 changes: 1 addition & 20 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
latestHeader := backend.dataStream.GetHeader()
if latestHeader.TotalEntries == 0 {
log.Info("[dataStream] setting the stream progress to 0")
if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil {
return nil, err
}
backend.preStartTasks.WarmUpDataStream = true
}
}
Expand Down Expand Up @@ -1052,23 +1049,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien

// creates a datastream client with default parameters
func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient {
// datastream
// Create client
log.Info("Starting datastream client...")
// retry connection
datastreamClient := client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)

for i := 0; i < 30; i++ {
// Start client (connect to the server)
if err := datastreamClient.Start(); err != nil {
log.Warn(fmt.Sprintf("Error when starting datastream client, retrying... Error: %s", err))
time.Sleep(1 * time.Second)
} else {
log.Info("Datastream client initialized...")
return datastreamClient
}
}
panic("datastream client could not be initialized")
return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId)
}

func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
Expand Down
25 changes: 0 additions & 25 deletions zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) {
return len(v) > 0, nil
}

func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error {
return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1})
}

func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error {
return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo))
}

func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) {
v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo))
if err != nil {
return false, err
}
return len(v) > 0, nil
}

func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error {
for batch := fromBatch; batch <= toBatch; batch++ {
if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil {
return err
}
}
return nil
}

func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error {
return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes())
}
Expand Down
21 changes: 17 additions & 4 deletions zk/stages/stage_dataStreamCatchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/erigon/zk/datastream/server"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/zk/sequencer"
)

type DataStreamCatchupCfg struct {
Expand Down Expand Up @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream
srv := server.NewDataStreamServer(stream, chainId)
reader := hermez_db.NewHermezDbReader(tx)

finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return 0, err
var (
err error
finalBlockNumber uint64
)

if sequencer.IsSequencer() {
finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream)
if err != nil {
return 0, err
}
} else {
finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return 0, err
}
}

previousProgress, err := stages.GetStageProgress(tx, stages.DataStream)
previousProgress, err := srv.GetHighestBlockNumber()
if err != nil {
return 0, err
}
Expand Down
33 changes: 16 additions & 17 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func SpawnSequencingStage(
return err
}

isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch)
if err != nil {
return err
}

forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb)
if err != nil {
return err
Expand All @@ -66,7 +61,7 @@ func SpawnSequencingStage(
var block *types.Block
runLoopBlocks := true
batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb)
batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool)
batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool)
blockDataSizeChecker := newBlockDataChecker()
streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake

Expand All @@ -79,31 +74,35 @@ func SpawnSequencingStage(
if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil {
return err
}
if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil {
return err
}

return sdb.tx.Commit()
}

// handle cases where the last batch wasn't committed to the data stream.
// this could occur because we're migrating from an RPC node to a sequencer
// or because the sequencer was restarted and not all processes completed (like waiting from remote executor)
// we consider the data stream as verified by the executor so treat it as "safe" and unwind blocks beyond there
// if we identify any. During normal operation this function will simply check and move on without performing
// any action.
isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u)
if err != nil || isUnwinding {
return err
}

tryHaltSequencer(batchContext, batchState.batchNumber)

if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil {
return err
}

batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed)
batchCounters, err := prepareBatchCounters(batchContext, batchState, nil)
if err != nil {
return err
}

if !isLastBatchPariallyProcessed {
// handle case where batch wasn't closed properly
// close it before starting a new one
// this occurs when sequencer was switched from syncer or sequencer datastream files were deleted
// and datastream was regenerated
if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil {
return err
}
}

if batchState.isL1Recovery() {
if cfg.zk.L1SyncStopBatch > 0 && batchState.batchNumber > cfg.zk.L1SyncStopBatch {
log.Info(fmt.Sprintf("[%s] L1 recovery has completed!", logPrefix), "batch", batchState.batchNumber)
Expand Down
29 changes: 1 addition & 28 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,7 @@ import (
"github.com/ledgerwatch/log/v3"
)

func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 {
if isLastBatchPariallyProcessed {
return lastBatch
}

return lastBatch + 1
}

func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) {
var intermediateUsedCounters *vm.Counters
if isLastBatchPariallyProcessed {
intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber)
if err != nil {
return nil, err
}
if found {
intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap)
} else {
log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber)
}
}

func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) {
return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil
}

Expand Down Expand Up @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this
if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil {
return false, err
}
if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil {
return false, err
}
if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil {
return false, err
}
Expand Down Expand Up @@ -158,9 +134,6 @@ func runBatchLastSteps(
if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil {
return err
}
if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil {
return err
}

// Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch
ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
Expand Down
6 changes: 0 additions & 6 deletions zk/stages/stage_sequence_execute_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ func finaliseBlock(
return nil, err
}

// write partially processed
err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber)
if err != nil {
return nil, err
}

// this is actually account + storage indices stages
quitCh := batchContext.ctx.Done()
from := newNum.Uint64()
Expand Down
55 changes: 42 additions & 13 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/zk/datastream/server"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/core/vm"
)

type SequencerBatchStreamWriter struct {
Expand Down Expand Up @@ -62,6 +64,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, err
}

if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil {
return checkedVerifierBundles, err
}

// once we have handled the very first block we can update the last batch to be the current batch safely so that
// we don't keep adding batch bookmarks in between blocks
sbc.lastBatch = request.BatchNumber
Expand All @@ -78,29 +84,52 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error {
func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
if err != nil {
return err
return false, err
}

if isLastEntryBatchEnd {
return nil
return false, nil
}

log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber))
ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
lastBatch := batchState.batchNumber - 1

log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch))

rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch)
if err != nil {
return err
return false, err
}

latestCounters := vm.NewCountersFromUsedMap(rawCounters)

endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters)

if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil {
return false, err
}

lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock)
// now check if there is a gap in the stream vs the state db
streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream)
if err != nil {
return err
return false, err
}
root := lastBlock.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil {
return err

unwinding := false
if streamProgress > 0 && streamProgress < thisBlock {
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress)
if err != nil {
return true, err
}
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()),
"streamHeight", streamProgress,
"sequencerHeight", thisBlock,
)
u.UnwindTo(streamProgress, block.Hash())
unwinding = true
}
return nil

return unwinding, nil
}
3 changes: 1 addition & 2 deletions zk/stages/stage_sequence_execute_injected_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func processInjectedInitialBatch(
return err
}

// deleting the partially processed flag
return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber)
return err
}

func handleInjectedBatch(
Expand Down
9 changes: 0 additions & 9 deletions zk/stages/stage_sequence_execute_unwind.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw
if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil {
return fmt.Errorf("truncate block batches error: %v", err)
}
// only seq
if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil {
return fmt.Errorf("truncate fork id error: %v", err)
}
if lastBatchToKeepBeforeFrom == fromBatch {
if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil {
return fmt.Errorf("truncate fork id error: %v", err)
}
}

return nil
}

0 comments on commit 569ec3b

Please sign in to comment.