diff --git a/eth/backend.go b/eth/backend.go index 52b61ba7622..471a1ed9eeb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") - if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { - return nil, err - } backend.preStartTasks.WarmUpDataStream = true } } diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 3130db81570..2adbf23cee7 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) -} - -func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) -} - -func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { - v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) - if err != nil { - return false, err - } - return len(v) > 0, nil -} - -func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { - for batch := fromBatch; batch <= toBatch; batch++ { - if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { - return err - } - } - return nil -} - func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index f3cae1b3ca3..199fe4d69d4 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err + var ( + err error + finalBlockNumber uint64 + ) + + if sequencer.IsSequencer() { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) + if err != nil { + return 0, err + } + } else { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err + } } - previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) + previousProgress, err := srv.GetHighestBlockNumber() if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 2aa293410e1..8687593d66a 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" "github.com/ledgerwatch/erigon/zk/utils" + "github.com/ledgerwatch/erigon/core/vm" ) func SpawnSequencingStage( @@ -46,11 +47,6 @@ func SpawnSequencingStage( return err } - isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) - if err != nil { - return err - } - forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -66,7 +62,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -79,29 +75,69 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } + if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { + return err + } return sdb.tx.Commit() } - tryHaltSequencer(batchContext, batchState.batchNumber) - - if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { - return err + if batchState.hasExecutorForThisBatch { + // identify a stream gap i.e. a sequencer restart without an ack back from an executor. + // in this case we need to unwind the state so that we match the datastream height + streamProgress, err := stages.GetStageProgress(sdb.tx, stages.DataStream) + if err != nil { + return err + } + if streamProgress > 0 && streamProgress < executionAt { + block, err := rawdb.ReadBlockByNumber(sdb.tx, streamProgress) + if err != nil { + return err + } + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", logPrefix), + "streamHeight", streamProgress, + "sequencerHeight", executionAt, + ) + u.UnwindTo(streamProgress, block.Hash()) + return nil + } } - batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) + lastBatchSealed, err := checkIfLastBatchIsSealed(batchContext) if err != nil { return err } - if !isLastBatchPariallyProcessed { - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { + if !lastBatchSealed { + log.Warn(fmt.Sprintf("[%s] Closing batch early due to partial processing", logPrefix), "batch", lastBatch) + + // we are in a state where the sequencer was perhaps restarted or unwound and the last batch + // that was partially processed needed to be closed, and we will have at least one block in it (because the last + // entry wasn't a batch end) + rawCounters, _, err := sdb.hermezDb.GetLatestBatchCounters(lastBatch) + if err != nil { + return err + } + latestCounters := vm.NewCountersFromUsedMap(rawCounters) + + endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) + + if err = runBatchLastSteps(batchContext, lastBatch, executionAt, endBatchCounters); err != nil { return err } + + return sdb.tx.Commit() + } + + tryHaltSequencer(batchContext, batchState.batchNumber) + + if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { + return err + } + + batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) + if err != nil { + return err } if batchState.isL1Recovery() { diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 6f7dc4275da..ec0fec521c6 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,28 +12,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { - if isLastBatchPariallyProcessed { - return lastBatch - } - - return lastBatch + 1 -} - -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { - var intermediateUsedCounters *vm.Counters - if isLastBatchPariallyProcessed { - intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) - if err != nil { - return nil, err - } - if found { - intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) - } else { - log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) - } - } - +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } - if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { - return false, err - } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -158,9 +134,6 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } - if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { - return err - } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 495e9114846..060c753a26d 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,12 +247,6 @@ func finaliseBlock( return nil, err } - // write partially processed - err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) - if err != nil { - return nil, err - } - // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 373511a1212..23f8c480e1f 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -9,6 +9,7 @@ import ( verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" ) type SequencerBatchStreamWriter struct { @@ -62,6 +63,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } + if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { + return checkedVerifierBundles, err + } + // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -78,29 +83,38 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { +func checkIfLastBatchIsSealed(batchContext *BatchContext) (bool, error) { + isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() + if err != nil { + return false, err + } + + return isLastEntryBatchEnd, nil +} + +func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) (bool, error) { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return err + return false, err } if isLastEntryBatchEnd { - return nil + return false, nil } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber-1)) + ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber-1, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) if err != nil { - return err + return true, err } lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) if err != nil { - return err + return true, err } root := lastBlock.Root() if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return err + return true, err } - return nil + return true, nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index 323b7a0f2f9..e1917b7748a 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,8 +80,7 @@ func processInjectedInitialBatch( return err } - // deleting the partially processed flag - return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) + return err } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index 46c0a58846f..b8918aa33d7 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } - // only seq - if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - if lastBatchToKeepBeforeFrom == fromBatch { - if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - } return nil }