From bd14081f4764ebf758705d134df6ccc74e494d77 Mon Sep 17 00:00:00 2001 From: Kamen Stoykov Date: Wed, 28 Aug 2024 12:51:46 +0000 Subject: [PATCH 1/2] handle-unwinds-on-seq-restart --- cmd/rpcdaemon/commands/zkevm_api.go | 2 +- zk/datastream/server/data_stream_server.go | 5 +- .../server/data_stream_server_utils.go | 2 +- zk/datastream/server/datastream_populate.go | 2 +- zk/hermez_db/db.go | 18 +---- zk/stages/stage_batches.go | 12 ---- zk/stages/stage_sequence_execute.go | 25 ++++++- zk/stages/stage_sequence_execute_batch.go | 61 ---------------- .../stage_sequence_execute_data_stream.go | 72 ++++++++++--------- zk/utils/utils.go | 28 +++----- 10 files changed, 78 insertions(+), 149 deletions(-) diff --git a/cmd/rpcdaemon/commands/zkevm_api.go b/cmd/rpcdaemon/commands/zkevm_api.go index 87383802c1a..c8d2a62dd99 100644 --- a/cmd/rpcdaemon/commands/zkevm_api.go +++ b/cmd/rpcdaemon/commands/zkevm_api.go @@ -646,7 +646,7 @@ func (api *ZkEvmAPIImpl) GetBatchByNumber(ctx context.Context, batchNumber rpc.B } // local exit root - localExitRoot, err := utils.GetBatchLocalExitRoot(batchNo, hermezDb, tx) + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNo, hermezDb, tx) if err != nil { return nil, err } diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 0d3fd1129d5..bddd671e9b9 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -17,7 +17,6 @@ import ( type DbReader interface { GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error) - GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error) GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error) GetForkId(batchNumber uint64) (uint64, error) GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error) @@ -194,7 +193,7 @@ func createBlockWithBatchCheckStreamEntriesProto( } // the genesis we insert fully, so we would have to skip closing it if !shouldSkipBatchEndEntry { - localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx) if err != nil { return nil, err } @@ -383,7 +382,7 @@ func BuildWholeBatchStreamEntriesProto( } // the genesis we insert fully, so we would have to skip closing it - localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx) if err != nil { return nil, err } diff --git a/zk/datastream/server/data_stream_server_utils.go b/zk/datastream/server/data_stream_server_utils.go index 259608958ce..b25550ace7f 100644 --- a/zk/datastream/server/data_stream_server_utils.go +++ b/zk/datastream/server/data_stream_server_utils.go @@ -176,7 +176,7 @@ func createBatchStartEntriesProto( } // seal off the last batch - if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorage(workingBatch, reader, tx); err != nil { + if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(workingBatch, reader, tx); err != nil { return nil, err } entries.Add(newBatchEndProto(localExitRoot, root, workingBatch)) diff --git a/zk/datastream/server/datastream_populate.go b/zk/datastream/server/datastream_populate.go index d78ea11cd17..157c750e7bd 100644 --- a/zk/datastream/server/datastream_populate.go +++ b/zk/datastream/server/datastream_populate.go @@ -386,7 +386,7 @@ func (srv *DataStreamServer) WriteGenesisToStream( l2Block := newL2BlockProto(genesis, genesis.Hash().Bytes(), batchNo, ger, 0, 0, common.Hash{}, 0, common.Hash{}) batchStart := newBatchStartProto(batchNo, srv.chainId, GenesisForkId, datastream.BatchType_BATCH_TYPE_REGULAR) - ler, err := utils.GetBatchLocalExitRoot(0, reader, tx) + ler, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(0, reader, tx) if err != nil { return err } diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 2adbf23cee7..3d6fff913fd 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -38,7 +38,7 @@ const BLOCK_L1_BLOCK_HASHES = "block_l1_block_hashes" // block const L1_BLOCK_HASH_GER = "l1_block_hash_ger" // l1 block hash -> GER const INTERMEDIATE_TX_STATEROOTS = "hermez_intermediate_tx_stateRoots" // l2blockno -> stateRoot const BATCH_WITNESSES = "hermez_batch_witnesses" // batch number -> witness -const BATCH_COUNTERS = "hermez_batch_counters" // batch number -> counters +const BATCH_COUNTERS = "hermez_batch_counters" // block number -> counters const L1_BATCH_DATA = "l1_batch_data" // batch number -> l1 batch data from transaction call data const REUSED_L1_INFO_TREE_INDEX = "reused_l1_info_tree_index" // block number => const 1 const LATEST_USED_GER = "latest_used_ger" // batch number -> GER latest used GER @@ -47,8 +47,6 @@ const SMT_DEPTHS = "smt_depths" // block const L1_INFO_LEAVES = "l1_info_leaves" // l1 info tree index -> l1 info tree leaf const L1_INFO_ROOTS = "l1_info_roots" // root hash -> l1 info tree index const INVALID_BATCHES = "invalid_batches" // batch number -> true -const BATCH_PARTIALLY_PROCESSED = "batch_partially_processed" // batch number -> true -const LOCAL_EXIT_ROOTS = "local_exit_roots" // batch number -> local exit root const ROllUP_TYPES_FORKS = "rollup_types_forks" // rollup type id -> fork id const FORK_HISTORY = "fork_history" // index -> fork id + last verified batch const JUST_UNWOUND = "just_unwound" // batch number -> true @@ -86,8 +84,6 @@ var HermezDbTables = []string{ L1_INFO_LEAVES, L1_INFO_ROOTS, INVALID_BATCHES, - BATCH_PARTIALLY_PROCESSED, - LOCAL_EXIT_ROOTS, ROllUP_TYPES_FORKS, FORK_HISTORY, JUST_UNWOUND, @@ -1610,18 +1606,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { - return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) -} - -func (db *HermezDbReader) GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error) { - v, err := db.tx.GetOne(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo)) - if err != nil { - return common.Hash{}, err - } - return common.BytesToHash(v), nil -} - func (db *HermezDb) WriteRollupType(rollupType, forkId uint64) error { return db.tx.Put(ROllUP_TYPES_FORKS, Uint64ToBytes(rollupType), Uint64ToBytes(forkId)) } diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index cbce1c28be9..71b3706cba9 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -70,7 +70,6 @@ type HermezDb interface { WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error WriteBlockL1InfoTreeIndexProgress(blockNumber uint64, l1Index uint64) error WriteLatestUsedGer(blockNo uint64, ger common.Hash) error - WriteLocalExitRootForBatchNo(batchNo uint64, localExitRoot common.Hash) error } type DatastreamClient interface { @@ -256,9 +255,6 @@ LOOP: if entry.StateRoot != lastBlockRoot { log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) } - if err := writeBatchEnd(hermezDb, entry); err != nil { - return fmt.Errorf("write batch end error: %v", err) - } case *types.FullL2Block: if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit { // stop the node going into a crazy loop @@ -752,14 +748,6 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx return nil } -func writeBatchEnd(hermezDb HermezDb, batchEnd *types.BatchEnd) (err error) { - // utils.CalculateAccInputHash(oldAccInputHash, batchStart., l1InfoRoot common.Hash, timestampLimit uint64, sequencerAddr common.Address, forcedBlockhashL1 common.Hash) - if batchEnd.LocalExitRoot != emptyHash { - err = hermezDb.WriteLocalExitRootForBatchNo(batchEnd.Number, batchEnd.LocalExitRoot) - } - return -} - // writeL2Block writes L2Block to ErigonDb and HermezDb // writes header, body, forkId and blockBatch func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, highestL1InfoTreeIndex uint64) error { diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index d975075b0e0..3fa0c2dec3d 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -88,10 +88,13 @@ func SpawnSequencingStage( // if we identify any. During normal operation this function will simply check and move on without performing // any action. if !batchState.isAnyRecovery() { - isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) - if err != nil || isUnwinding { + isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u) + if err != nil { return err } + if isUnwinding { + return sdb.tx.Commit() + } } tryHaltSequencer(batchContext, batchState.batchNumber) @@ -375,9 +378,25 @@ func SpawnSequencingStage( return err } - if err = runBatchLastSteps(batchContext, batchState.batchNumber, block.NumberU64(), batchCounters); err != nil { + /* + if adding something below that line we must ensure + - it is also handled property in processInjectedInitialBatch + - it is also handled property in alignExecutionToDatastream + - it is also handled property in doCheckForBadBatch + - it is unwound correctly + */ + + if err := finalizeLastBatchInDatastream(batchContext, batchState.batchNumber, block.NumberU64()); err != nil { return err } + // TODO: It is 99% sure that there is no need to write this in any of processInjectedInitialBatch, alignExecutionToDatastream, doCheckForBadBatch but it is worth double checknig + // the unwind of this value is handed by UnwindExecutionStageDbWrites + if _, err := rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, block.NumberU64()); err != nil { + return fmt.Errorf("writing plain state version: %w", err) + } + + log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), batchState.batchNumber)) + return sdb.tx.Commit() } diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 23bb025ef01..981812b42d9 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -8,7 +8,6 @@ import ( "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" ) @@ -112,63 +111,3 @@ func updateStreamAndCheckRollback( return false, nil } - -func runBatchLastSteps( - batchContext *BatchContext, - thisBatch uint64, - blockNumber uint64, - batchCounters *vm.BatchCounterCollector, -) error { - l1InfoIndex, err := batchContext.sdb.hermezDb.GetBlockL1InfoTreeIndex(blockNumber) - if err != nil { - return err - } - - counters, err := batchCounters.CombineCollectors(l1InfoIndex != 0) - if err != nil { - return err - } - - log.Info(fmt.Sprintf("[%s] counters consumed", batchContext.s.LogPrefix()), "batch", thisBatch, "counts", counters.UsedAsString()) - - if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { - return err - } - - // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch - ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) - if err != nil { - return err - } - // write ler to hermezdb - if err = batchContext.sdb.hermezDb.WriteLocalExitRootForBatchNo(thisBatch, ler); err != nil { - return err - } - - // get the last block number written to batch - // we should match it's state root in batch end entry - // if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream - // the last block written to the datastream before batch end should be the correct one once we are here - // if it is not, we have a bigger problem - lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber() - if err != nil { - return err - } - block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber) - if err != nil { - return err - } - blockRoot := block.Root() - if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, thisBatch, &blockRoot, &ler); err != nil { - return err - } - - // the unwind of this value is handed by UnwindExecutionStageDbWrites - if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil { - return fmt.Errorf("writing plain state version: %w", err) - } - - log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), thisBatch)) - - return nil -} diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index c57e91b04d5..6c04ce7be8b 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk/datastream/server" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" + "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" ) @@ -84,55 +84,61 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) { - isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() +func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) { + lastExecutedBatch := batchState.batchNumber - 1 + + lastDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber() if err != nil { return false, err } - if isLastEntryBatchEnd { - return false, nil + lastDatastreamBlock, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber() + if err != nil { + return false, err } - lastBatch := batchState.batchNumber - 1 + if lastExecutedBatch == lastDatastreamBatch && lastExecutedBlock == lastDatastreamBlock { + return false, nil + } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch)) + if err := finalizeLastBatchInDatastreamIfNotFinalized(batchContext, lastDatastreamBatch, lastDatastreamBlock); err != nil { + return false, err + } - rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch) + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock) if err != nil { - return false, err + return true, err } - latestCounters := vm.NewCountersFromUsedMap(rawCounters) + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), "streamHeight", lastDatastreamBlock, "sequencerHeight", lastExecutedBlock) + u.UnwindTo(lastDatastreamBlock, block.Hash()) + return true, nil +} - endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) +func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error { + isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return false, err + return err } - - if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil { - return false, err + if isLastEntryBatchEnd { + return nil } + log.Warn(fmt.Sprintf("[%s] Last datastream's batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchToClose)) + return finalizeLastBatchInDatastream(batchContext, batchToClose, blockToCloseAt) +} - // now check if there is a gap in the stream vs the state db - streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream) +func finalizeLastBatchInDatastream(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error { + ler, err := utils.GetBatchLocalExitRootFromSCStorageByBlock(blockToCloseAt, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) if err != nil { - return false, err + return err } - - unwinding := false - if streamProgress > 0 && streamProgress < thisBlock { - block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress) - if err != nil { - return true, err - } - log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), - "streamHeight", streamProgress, - "sequencerHeight", thisBlock, - ) - u.UnwindTo(streamProgress, block.Hash()) - unwinding = true + lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, blockToCloseAt) + if err != nil { + return err } - - return unwinding, nil + root := lastBlock.Root() + if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchToClose, &root, &ler); err != nil { + return err + } + return nil } diff --git a/zk/utils/utils.go b/zk/utils/utils.go index c799e15148c..6059a0780f2 100644 --- a/zk/utils/utils.go +++ b/zk/utils/utils.go @@ -82,7 +82,6 @@ type ForkConfigWriter interface { } type DbReader interface { - GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error) GetHighestBlockInBatch(batchNo uint64) (uint64, error) } @@ -132,28 +131,23 @@ func RecoverySetBlockConfigForks(blockNum uint64, forkId uint64, cfg ForkConfigW return nil } -func GetBatchLocalExitRoot(batchNo uint64, db DbReader, tx kv.Tx) (libcommon.Hash, error) { - // check db first - localExitRoot, err := db.GetLocalExitRootForBatchNo(batchNo) - if err != nil { - return libcommon.Hash{}, err - } - - if localExitRoot != (libcommon.Hash{}) { - return localExitRoot, nil - } - - return GetBatchLocalExitRootFromSCStorage(batchNo, db, tx) -} - -func GetBatchLocalExitRootFromSCStorage(batchNo uint64, db DbReader, tx kv.Tx) (libcommon.Hash, error) { +func GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNo uint64, db DbReader, tx kv.Tx) (libcommon.Hash, error) { if batchNo > 0 { blockNo, err := db.GetHighestBlockInBatch(batchNo) if err != nil { return libcommon.Hash{}, err } - stateReader := state.NewPlainState(tx, blockNo+1, systemcontracts.SystemContractCodeLookup["hermez"]) + return GetBatchLocalExitRootFromSCStorageByBlock(blockNo, db, tx) + } + + return libcommon.Hash{}, nil + +} + +func GetBatchLocalExitRootFromSCStorageByBlock(blockNumber uint64, db DbReader, tx kv.Tx) (libcommon.Hash, error) { + if blockNumber > 0 { + stateReader := state.NewPlainState(tx, blockNumber+1, systemcontracts.SystemContractCodeLookup["hermez"]) defer stateReader.Close() rawLer, err := stateReader.ReadAccountStorage(state.GER_MANAGER_ADDRESS, 1, &state.GLOBAL_EXIT_ROOT_POS_1) if err != nil { From 0e0374dbd619ab33d0a29e7d298df215c357f513 Mon Sep 17 00:00:00 2001 From: Kamen Stoykov Date: Wed, 28 Aug 2024 14:04:13 +0000 Subject: [PATCH 2/2] add more check to DS correcness --- zk/debug_tools/datastream-correctness-check/main.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go index 4def0d775d7..e3dd18700c6 100644 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -34,6 +34,8 @@ func main() { var lastBlockRoot common.Hash progressBatch := uint64(0) progressBlock := uint64(0) + lastSeenBatch := uint64(0) + lastSeenBlock := uint64(0) function := func(file *types.FileEntry) error { switch file.EntryType { @@ -65,6 +67,10 @@ func main() { if err != nil { return err } + if lastSeenBatch+1 != batchStart.Number { + return fmt.Errorf("unexpected batch %d, expected %d", batchStart.Number, lastSeenBatch+1) + } + lastSeenBatch = batchStart.Number progressBatch = batchStart.Number if previousFile != nil { if previousFile.EntryType != types.BookmarkEntryType { @@ -106,6 +112,10 @@ func main() { if err != nil { return err } + if l2Block.L2BlockNumber > 0 && lastSeenBlock+1 != l2Block.L2BlockNumber { + return fmt.Errorf("unexpected block %d, expected %d", l2Block.L2BlockNumber, lastSeenBlock+1) + } + lastSeenBlock = l2Block.L2BlockNumber progressBlock = l2Block.L2BlockNumber if previousFile != nil { if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() {