Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle-unwinds-on-seq-restart #1050

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (api *ZkEvmAPIImpl) GetBatchByNumber(ctx context.Context, batchNumber rpc.B
}

// local exit root
localExitRoot, err := utils.GetBatchLocalExitRoot(batchNo, hermezDb, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNo, hermezDb, tx)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

type DbReader interface {
GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error)
GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error)
GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error)
GetForkId(batchNumber uint64) (uint64, error)
GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error)
Expand Down Expand Up @@ -194,7 +193,7 @@ func createBlockWithBatchCheckStreamEntriesProto(
}
// the genesis we insert fully, so we would have to skip closing it
if !shouldSkipBatchEndEntry {
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -383,7 +382,7 @@ func BuildWholeBatchStreamEntriesProto(
}

// the genesis we insert fully, so we would have to skip closing it
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/server/data_stream_server_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func createBatchStartEntriesProto(
}

// seal off the last batch
if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorage(workingBatch, reader, tx); err != nil {
if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(workingBatch, reader, tx); err != nil {
return nil, err
}
entries.Add(newBatchEndProto(localExitRoot, root, workingBatch))
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/server/datastream_populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (srv *DataStreamServer) WriteGenesisToStream(
l2Block := newL2BlockProto(genesis, genesis.Hash().Bytes(), batchNo, ger, 0, 0, common.Hash{}, 0, common.Hash{})
batchStart := newBatchStartProto(batchNo, srv.chainId, GenesisForkId, datastream.BatchType_BATCH_TYPE_REGULAR)

ler, err := utils.GetBatchLocalExitRoot(0, reader, tx)
ler, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(0, reader, tx)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions zk/debug_tools/datastream-correctness-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func main() {
var lastBlockRoot common.Hash
progressBatch := uint64(0)
progressBlock := uint64(0)
lastSeenBatch := uint64(0)
lastSeenBlock := uint64(0)

function := func(file *types.FileEntry) error {
switch file.EntryType {
Expand Down Expand Up @@ -65,6 +67,10 @@ func main() {
if err != nil {
return err
}
if lastSeenBatch+1 != batchStart.Number {
return fmt.Errorf("unexpected batch %d, expected %d", batchStart.Number, lastSeenBatch+1)
}
lastSeenBatch = batchStart.Number
progressBatch = batchStart.Number
if previousFile != nil {
if previousFile.EntryType != types.BookmarkEntryType {
Expand Down Expand Up @@ -106,6 +112,10 @@ func main() {
if err != nil {
return err
}
if l2Block.L2BlockNumber > 0 && lastSeenBlock+1 != l2Block.L2BlockNumber {
return fmt.Errorf("unexpected block %d, expected %d", l2Block.L2BlockNumber, lastSeenBlock+1)
}
lastSeenBlock = l2Block.L2BlockNumber
progressBlock = l2Block.L2BlockNumber
if previousFile != nil {
if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() {
Expand Down
18 changes: 1 addition & 17 deletions zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BLOCK_L1_BLOCK_HASHES = "block_l1_block_hashes" // block
const L1_BLOCK_HASH_GER = "l1_block_hash_ger" // l1 block hash -> GER
const INTERMEDIATE_TX_STATEROOTS = "hermez_intermediate_tx_stateRoots" // l2blockno -> stateRoot
const BATCH_WITNESSES = "hermez_batch_witnesses" // batch number -> witness
const BATCH_COUNTERS = "hermez_batch_counters" // batch number -> counters
const BATCH_COUNTERS = "hermez_batch_counters" // block number -> counters
const L1_BATCH_DATA = "l1_batch_data" // batch number -> l1 batch data from transaction call data
const REUSED_L1_INFO_TREE_INDEX = "reused_l1_info_tree_index" // block number => const 1
const LATEST_USED_GER = "latest_used_ger" // batch number -> GER latest used GER
Expand All @@ -47,8 +47,6 @@ const SMT_DEPTHS = "smt_depths" // block
const L1_INFO_LEAVES = "l1_info_leaves" // l1 info tree index -> l1 info tree leaf
const L1_INFO_ROOTS = "l1_info_roots" // root hash -> l1 info tree index
const INVALID_BATCHES = "invalid_batches" // batch number -> true
const BATCH_PARTIALLY_PROCESSED = "batch_partially_processed" // batch number -> true
const LOCAL_EXIT_ROOTS = "local_exit_roots" // batch number -> local exit root
const ROllUP_TYPES_FORKS = "rollup_types_forks" // rollup type id -> fork id
const FORK_HISTORY = "fork_history" // index -> fork id + last verified batch
const JUST_UNWOUND = "just_unwound" // batch number -> true
Expand Down Expand Up @@ -86,8 +84,6 @@ var HermezDbTables = []string{
L1_INFO_LEAVES,
L1_INFO_ROOTS,
INVALID_BATCHES,
BATCH_PARTIALLY_PROCESSED,
LOCAL_EXIT_ROOTS,
ROllUP_TYPES_FORKS,
FORK_HISTORY,
JUST_UNWOUND,
Expand Down Expand Up @@ -1610,18 +1606,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) {
return len(v) > 0, nil
}

func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error {
return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes())
}

func (db *HermezDbReader) GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error) {
v, err := db.tx.GetOne(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo))
if err != nil {
return common.Hash{}, err
}
return common.BytesToHash(v), nil
}

func (db *HermezDb) WriteRollupType(rollupType, forkId uint64) error {
return db.tx.Put(ROllUP_TYPES_FORKS, Uint64ToBytes(rollupType), Uint64ToBytes(forkId))
}
Expand Down
12 changes: 0 additions & 12 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type HermezDb interface {
WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error
WriteBlockL1InfoTreeIndexProgress(blockNumber uint64, l1Index uint64) error
WriteLatestUsedGer(blockNo uint64, ger common.Hash) error
WriteLocalExitRootForBatchNo(batchNo uint64, localExitRoot common.Hash) error
}

type DatastreamClient interface {
Expand Down Expand Up @@ -256,9 +255,6 @@ LOOP:
if entry.StateRoot != lastBlockRoot {
log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot))
}
if err := writeBatchEnd(hermezDb, entry); err != nil {
return fmt.Errorf("write batch end error: %v", err)
}
case *types.FullL2Block:
if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit {
// stop the node going into a crazy loop
Expand Down Expand Up @@ -752,14 +748,6 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx
return nil
}

func writeBatchEnd(hermezDb HermezDb, batchEnd *types.BatchEnd) (err error) {
// utils.CalculateAccInputHash(oldAccInputHash, batchStart., l1InfoRoot common.Hash, timestampLimit uint64, sequencerAddr common.Address, forcedBlockhashL1 common.Hash)
if batchEnd.LocalExitRoot != emptyHash {
err = hermezDb.WriteLocalExitRootForBatchNo(batchEnd.Number, batchEnd.LocalExitRoot)
}
return
}

// writeL2Block writes L2Block to ErigonDb and HermezDb
// writes header, body, forkId and blockBatch
func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, highestL1InfoTreeIndex uint64) error {
Expand Down
25 changes: 22 additions & 3 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ func SpawnSequencingStage(
// if we identify any. During normal operation this function will simply check and move on without performing
// any action.
if !batchState.isAnyRecovery() {
isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u)
if err != nil || isUnwinding {
isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u)
if err != nil {
return err
}
if isUnwinding {
return sdb.tx.Commit()
}
}

tryHaltSequencer(batchContext, batchState.batchNumber)
Expand Down Expand Up @@ -375,9 +378,25 @@ func SpawnSequencingStage(
return err
}

if err = runBatchLastSteps(batchContext, batchState.batchNumber, block.NumberU64(), batchCounters); err != nil {
/*
if adding something below that line we must ensure
- it is also handled property in processInjectedInitialBatch
- it is also handled property in alignExecutionToDatastream
- it is also handled property in doCheckForBadBatch
- it is unwound correctly
*/

if err := finalizeLastBatchInDatastream(batchContext, batchState.batchNumber, block.NumberU64()); err != nil {
return err
}

// TODO: It is 99% sure that there is no need to write this in any of processInjectedInitialBatch, alignExecutionToDatastream, doCheckForBadBatch but it is worth double checknig
// the unwind of this value is handed by UnwindExecutionStageDbWrites
if _, err := rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, block.NumberU64()); err != nil {
return fmt.Errorf("writing plain state version: %w", err)
}

log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), batchState.batchNumber))

return sdb.tx.Commit()
}
61 changes: 0 additions & 61 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -112,63 +111,3 @@ func updateStreamAndCheckRollback(

return false, nil
}

func runBatchLastSteps(
batchContext *BatchContext,
thisBatch uint64,
blockNumber uint64,
batchCounters *vm.BatchCounterCollector,
) error {
l1InfoIndex, err := batchContext.sdb.hermezDb.GetBlockL1InfoTreeIndex(blockNumber)
if err != nil {
return err
}

counters, err := batchCounters.CombineCollectors(l1InfoIndex != 0)
if err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] counters consumed", batchContext.s.LogPrefix()), "batch", thisBatch, "counts", counters.UsedAsString())

if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil {
return err
}

// Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch
ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
if err != nil {
return err
}
// write ler to hermezdb
if err = batchContext.sdb.hermezDb.WriteLocalExitRootForBatchNo(thisBatch, ler); err != nil {
return err
}

// get the last block number written to batch
// we should match it's state root in batch end entry
// if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream
// the last block written to the datastream before batch end should be the correct one once we are here
// if it is not, we have a bigger problem
lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return err
}
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber)
if err != nil {
return err
}
blockRoot := block.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, thisBatch, &blockRoot, &ler); err != nil {
return err
}

// the unwind of this value is handed by UnwindExecutionStageDbWrites
if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil {
return fmt.Errorf("writing plain state version: %w", err)
}

log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), thisBatch))

return nil
}
72 changes: 39 additions & 33 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"

"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/datastream/server"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -84,55 +84,61 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
lastExecutedBatch := batchState.batchNumber - 1

lastDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return false, err
}

if isLastEntryBatchEnd {
return false, nil
lastDatastreamBlock, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return false, err
}

lastBatch := batchState.batchNumber - 1
if lastExecutedBatch == lastDatastreamBatch && lastExecutedBlock == lastDatastreamBlock {
return false, nil
}

log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch))
if err := finalizeLastBatchInDatastreamIfNotFinalized(batchContext, lastDatastreamBatch, lastDatastreamBlock); err != nil {
return false, err
}

rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch)
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock)
if err != nil {
return false, err
return true, err
}

latestCounters := vm.NewCountersFromUsedMap(rawCounters)
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), "streamHeight", lastDatastreamBlock, "sequencerHeight", lastExecutedBlock)
u.UnwindTo(lastDatastreamBlock, block.Hash())
return true, nil
}

endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters)
func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
if err != nil {
return false, err
return err
}

if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil {
return false, err
if isLastEntryBatchEnd {
return nil
}
log.Warn(fmt.Sprintf("[%s] Last datastream's batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchToClose))
return finalizeLastBatchInDatastream(batchContext, batchToClose, blockToCloseAt)
}

// now check if there is a gap in the stream vs the state db
streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream)
func finalizeLastBatchInDatastream(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error {
ler, err := utils.GetBatchLocalExitRootFromSCStorageByBlock(blockToCloseAt, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
if err != nil {
return false, err
return err
}

unwinding := false
if streamProgress > 0 && streamProgress < thisBlock {
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress)
if err != nil {
return true, err
}
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()),
"streamHeight", streamProgress,
"sequencerHeight", thisBlock,
)
u.UnwindTo(streamProgress, block.Hash())
unwinding = true
lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, blockToCloseAt)
if err != nil {
return err
}

return unwinding, nil
root := lastBlock.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchToClose, &root, &ler); err != nil {
return err
}
return nil
}
Loading
Loading