From 522523597f2c46ee2bfa5a001b26c6aead2addd9 Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Thu, 25 Jul 2024 23:08:28 +0300 Subject: [PATCH] 000Revert "refactor datastream server (#837)" This reverts commit fc8972b017e43566adb588792df26bdccd02b4df. --- eth/backend.go | 2 +- zk/datastream/client/stream_client.go | 40 --- zk/datastream/server/data_stream_server.go | 205 ++++--------- .../server/data_stream_server_utils.go | 33 +- zk/datastream/server/datastream_populate.go | 282 ++++++------------ zk/debug_tools/datastream-bytes/main.go | 39 +-- .../datastream-correctness-check/main.go | 123 -------- .../legacy_executor_verifier.go | 66 ++-- zk/stages/stage_dataStreamCatchup.go | 6 +- zk/stages/stage_sequence_execute.go | 42 +-- zk/stages/stage_sequencer_executor_verify.go | 5 +- 11 files changed, 244 insertions(+), 599 deletions(-) delete mode 100644 zk/debug_tools/datastream-correctness-check/main.go diff --git a/eth/backend.go b/eth/backend.go index 89a5ff5c7d1..3be28bef3c6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1078,7 +1078,7 @@ func (s *Ethereum) PreStart() error { // so here we loop and take a brief pause waiting for it to be ready attempts := 0 for { - _, err = zkStages.CatchupDatastream(s.sentryCtx, "stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors()) + _, err = zkStages.CatchupDatastream("stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors()) if err != nil { if errors.Is(err, datastreamer.ErrAtomicOpNotAllowed) { attempts++ diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index e97f2d33e56..becca1a5721 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -181,46 +181,6 @@ func (c *StreamClient) ReadEntries(bookmark *types.BookmarkProto, l2BlocksAmount return fullL2Blocks, gerUpates, batchBookmarks, blockBookmarks, entriesRead, nil } -func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function func(file *types.FileEntry) error) error { - // Get header from server - if err := c.GetHeader(); err != nil { - return fmt.Errorf("%s get header error: %v", c.id, err) - } - - protoBookmark, err := bookmark.Marshal() - if err != nil { - return fmt.Errorf("failed to marshal bookmark: %v", err) - } - - if err := c.initiateDownloadBookmark(protoBookmark); err != nil { - return err - } - count := uint64(0) - logTicker := time.NewTicker(10 * time.Second) - - for { - select { - case <-logTicker.C: - fmt.Println("Entries read count: ", count) - default: - } - if c.Header.TotalEntries == count { - break - } - file, err := c.readFileEntry() - if err != nil { - return fmt.Errorf("error reading file entry: %v", err) - } - if err := function(file); err != nil { - return fmt.Errorf("error executing function: %v", err) - - } - count++ - } - - return nil -} - // reads entries to the end of the stream // at end will wait for new entries to arrive func (c *StreamClient) ReadAllEntriesToChannel() error { diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index c2c174c594f..40ea2f3933e 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -1,23 +1,22 @@ package server import ( - "fmt" - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" zktypes "github.com/ledgerwatch/erigon/zk/types" "github.com/ledgerwatch/erigon/zk/utils" + "github.com/gateway-fm/cdk-erigon-lib/common" libcommon "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/kv" - "github.com/ledgerwatch/erigon/core/rawdb" eritypes "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" + + "github.com/ledgerwatch/erigon/zk/hermez_db" ) type DbReader interface { - GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error) - GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error) + GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error) GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error) GetForkId(batchNumber uint64) (uint64, error) GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error) @@ -85,33 +84,10 @@ func (d *DataStreamEntries) AddMany(entries []DataStreamEntryProto) { } } -func (d *DataStreamEntries) Size() int { - if d == nil || d.entries == nil { - return 0 - } - return len(d.entries) -} - func (d *DataStreamEntries) Entries() []DataStreamEntryProto { - if d == nil || d.entries == nil { - return []DataStreamEntryProto{} - } return d.entries } -func (d *DataStreamEntries) Marshal() (result []byte, err error) { - var b []byte - for _, entry := range d.entries { - b, err = encodeEntryToBytesProto(entry) - if err != nil { - return nil, err - } - result = append(result, b...) - } - - return result, nil -} - func NewDataStreamEntries(size int) *DataStreamEntries { return &DataStreamEntries{ entries: make([]DataStreamEntryProto, size), @@ -139,82 +115,85 @@ func (srv *DataStreamServer) CommitEntriesToStreamProto(entries []DataStreamEntr } if latestBlockNum != nil { - a := *latestBlockNum - srv.highestBlockWritten = &a + srv.highestBlockWritten = latestBlockNum } if latestBatchNum != nil { - a := *latestBatchNum - srv.highestBatchWritten = &a + srv.highestBatchWritten = latestBatchNum } return nil } func createBlockWithBatchCheckStreamEntriesProto( + chainId uint64, reader DbReader, tx kv.Tx, block, lastBlock *eritypes.Block, batchNumber, - lastBatchNumber, - chainId, - forkId uint64, - shouldSkipBatchEndEntry bool, -) (*DataStreamEntries, error) { + lastBatchNumber uint64, + l1InfoTreeMinTimestamps map[uint64]uint64, + isBatchEnd bool, + transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks +) ([]DataStreamEntryProto, error) { var err error - var endEntriesProto []DataStreamEntryProto - var startEntriesProto, blockEntries *DataStreamEntries + var startEntriesProto, blockEntriesProto, endEntriesProto []DataStreamEntryProto + + gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber) + if err != nil { + return nil, err + } + // we might have a series of empty batches to account for, so we need to know the gap batchGap := batchNumber - lastBatchNumber isBatchStart := batchGap > 0 + // filter transactions by indexes that should be included + filteredTransactions := filterTransactionByIndexes(block.Transactions(), transactionsToIncludeByIndex) + + blockNum := block.NumberU64() // batch start // BATCH BOOKMARK if isBatchStart { - gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber) - if err != nil { - return nil, err - } - // the genesis we insert fully, so we would have to skip closing it - if !shouldSkipBatchEndEntry { - localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) - if err != nil { - return nil, err - } - blockRoot := block.Root() - if endEntriesProto, err = addBatchEndEntriesProto(lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil { - return nil, err - } - } - if startEntriesProto, err = createBatchStartEntriesProto(reader, tx, batchNumber, lastBatchNumber, batchGap, chainId, block.Root(), gers); err != nil { return nil, err } } - blockNum := block.NumberU64() + forkId, err := reader.GetForkId(batchNumber) + if err != nil { + return nil, err + } - l1InfoTreeMinTimestamps := make(map[uint64]uint64) deltaTimestamp := block.Time() - lastBlock.Time() if blockNum == 1 { deltaTimestamp = block.Time() l1InfoTreeMinTimestamps[0] = 0 } - if blockEntries, err = createFullBlockStreamEntriesProto(reader, tx, block, block.Transactions(), forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps); err != nil { + blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, block, filteredTransactions, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps) + if err != nil { return nil, err } + blockEntriesProto = blockEntries.Entries() - if blockEntries.Size() == 0 { - return nil, fmt.Errorf("didn't create any entries for block %d", blockNum) + if isBatchEnd { + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) + if err != nil { + return nil, err + } + blockRoot := block.Root() + if endEntriesProto, err = addBatchEndEntriesProto(tx, batchNumber, lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil { + return nil, err + } } - entries := NewDataStreamEntries(len(endEntriesProto) + startEntriesProto.Size() + blockEntries.Size()) + entries := NewDataStreamEntries(len(startEntriesProto) + len(blockEntriesProto) + len(endEntriesProto)) + entries.AddMany(startEntriesProto) + entries.AddMany(blockEntriesProto) entries.AddMany(endEntriesProto) - entries.AddMany(startEntriesProto.Entries()) - entries.AddMany(blockEntries.Entries()) - return entries, nil + return entries.Entries(), nil } func createFullBlockStreamEntriesProto( @@ -304,102 +283,32 @@ func createTransactionEntryProto( return txProto, nil } -func BuildWholeBatchStreamEntriesProto( - tx kv.Tx, - reader DbReader, +func CreateAndBuildStreamEntryBytesProto( chainId uint64, - previousBatchNumber, + block *eritypes.Block, + reader *hermez_db.HermezDbReader, + tx kv.Tx, + lastBlock *eritypes.Block, batchNumber uint64, - blocks []eritypes.Block, - txsPerBlock map[uint64][]eritypes.Transaction, + lastBatchNumber uint64, l1InfoTreeMinTimestamps map[uint64]uint64, -) (allEntries *DataStreamEntries, err error) { - var batchEndEntries []DataStreamEntryProto - var batchStartEntries *DataStreamEntries - - forkId, err := reader.GetForkId(batchNumber) - if err != nil { - return nil, err - } - - gers, err := reader.GetBatchGlobalExitRootsProto(previousBatchNumber, batchNumber) - if err != nil { - return nil, err - } - - if batchStartEntries, err = createBatchStartEntriesProto(reader, tx, batchNumber, previousBatchNumber, batchNumber-previousBatchNumber, chainId, blocks[0].Root(), gers); err != nil { - return nil, err - } - - prevBatchLastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0].NumberU64()-1) + isBatchEnd bool, + transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks +) (result []byte, err error) { + entries, err := createBlockWithBatchCheckStreamEntriesProto(chainId, reader, tx, block, lastBlock, batchNumber, lastBatchNumber, l1InfoTreeMinTimestamps, isBatchEnd, transactionsToIncludeByIndex) if err != nil { return nil, err } - lastBlock := *prevBatchLastBlock - - blocksEntries := make([]DataStreamEntryProto, 0) - - for _, block := range blocks { - blockNum := block.NumberU64() - - deltaTimestamp := block.Time() - lastBlock.Time() - if blockNum == 1 { - deltaTimestamp = block.Time() - l1InfoTreeMinTimestamps[0] = 0 - } - - txForBlock, found := txsPerBlock[blockNum] - if !found { - return nil, fmt.Errorf("no transactions array found for block %d", blockNum) - } - - blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, &block, txForBlock, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps) + for _, entry := range entries { + b, err := encodeEntryToBytesProto(entry) if err != nil { return nil, err } - blocksEntries = append(blocksEntries, blockEntries.Entries()...) - - lastBlock = block - } - - // the genesis we insert fully, so we would have to skip closing it - localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) - if err != nil { - return nil, err - } - - blockRoot := lastBlock.Root() - - batchEndEntries, err = addBatchEndEntriesProto(batchNumber, &blockRoot, gers, &localExitRoot) - if err != nil { - return nil, err - } - - allEntries = NewDataStreamEntries(batchStartEntries.Size() + len(blocksEntries) + len(batchEndEntries)) - allEntries.AddMany(batchStartEntries.Entries()) - allEntries.AddMany(blocksEntries) - allEntries.AddMany(batchEndEntries) - - return allEntries, nil -} - -func (srv *DataStreamServer) IsLastEntryBatchEnd() (isBatchEnd bool, err error) { - header := srv.stream.GetHeader() - - if header.TotalEntries == 0 { - return false, nil - } - - //find end block entry to delete from it onward - entryNum := header.TotalEntries - 1 - var entry datastreamer.FileEntry - entry, err = srv.stream.GetEntry(entryNum) - if err != nil { - return false, err + result = append(result, b...) } - return uint32(entry.Type) == uint32(types.EntryTypeBatchEnd), nil + return result, nil } func (srv *DataStreamServer) GetHighestBlockNumber() (uint64, error) { diff --git a/zk/datastream/server/data_stream_server_utils.go b/zk/datastream/server/data_stream_server_utils.go index be1814ecb0d..71dcc6b7a03 100644 --- a/zk/datastream/server/data_stream_server_utils.go +++ b/zk/datastream/server/data_stream_server_utils.go @@ -134,11 +134,11 @@ func createBatchStartEntriesProto( batchNumber, lastBatchNumber, batchGap, chainId uint64, root libcommon.Hash, gers []types.GerUpdateProto, -) (*DataStreamEntries, error) { +) ([]DataStreamEntryProto, error) { var err error var batchStartEntries []DataStreamEntryProto - entries := NewDataStreamEntries(2 + int(3*(batchGap-1)) + len(gers)) + entries := make([]DataStreamEntryProto, 0, 2+int(3*(batchGap-1))+len(gers)) // if we have a gap of more than 1 batch then we need to write in the batch start and ends for these empty batches if batchGap > 1 { @@ -150,14 +150,14 @@ func createBatchStartEntriesProto( if batchStartEntries, err = addBatchStartEntries(reader, workingBatch, chainId); err != nil { return nil, err } - // entries = append(entries, batchStartEntries...) - entries.AddMany(batchStartEntries) + entries = append(entries, batchStartEntries...) // see if we have any gers to handle for _, ger := range gers { upd := ger.UpdateGER if upd.BatchNumber == workingBatch { - entries.Add( + entries = append( + entries, newGerUpdateProto(upd.BatchNumber, upd.Timestamp, libcommon.BytesToHash(upd.GlobalExitRoot), libcommon.BytesToAddress(upd.Coinbase), upd.ForkId, upd.ChainId, libcommon.BytesToHash(upd.StateRoot)), ) } @@ -167,7 +167,7 @@ func createBatchStartEntriesProto( if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorage(workingBatch, reader, tx); err != nil { return nil, err } - entries.Add(newBatchEndProto(localExitRoot, root, workingBatch)) + entries = append(entries, newBatchEndProto(localExitRoot, root, workingBatch)) } } @@ -175,12 +175,13 @@ func createBatchStartEntriesProto( if batchStartEntries, err = addBatchStartEntries(reader, batchNumber, chainId); err != nil { return nil, err } - entries.AddMany(batchStartEntries) + entries = append(entries, batchStartEntries...) return entries, nil } func addBatchEndEntriesProto( - batchNumber uint64, + tx kv.Tx, + batchNumber, lastBatchNumber uint64, root *libcommon.Hash, gers []types.GerUpdateProto, localExitRoot *libcommon.Hash, @@ -230,6 +231,22 @@ func addBatchStartEntries(reader DbReader, batchNum, chainId uint64) ([]DataStre return entries, nil } +func filterTransactionByIndexes( + filteredTransactions eritypes.Transactions, + transactionsToIncludeByIndex []int, +) eritypes.Transactions { + if transactionsToIncludeByIndex != nil { + filteredTransactionsBuilder := make(eritypes.Transactions, len(transactionsToIncludeByIndex)) + for i, txIndexInBlock := range transactionsToIncludeByIndex { + filteredTransactionsBuilder[i] = filteredTransactions[txIndexInBlock] + } + + filteredTransactions = filteredTransactionsBuilder + } + + return filteredTransactions +} + const ( PACKET_TYPE_DATA = 2 // NOOP_ENTRY_NUMBER is used because we don't care about the entry number when feeding an atrificial diff --git a/zk/datastream/server/datastream_populate.go b/zk/datastream/server/datastream_populate.go index 17777cc85be..fd6ed247691 100644 --- a/zk/datastream/server/datastream_populate.go +++ b/zk/datastream/server/datastream_populate.go @@ -1,7 +1,6 @@ package server import ( - "context" "fmt" "time" @@ -15,185 +14,71 @@ import ( "github.com/ledgerwatch/log/v3" ) -const ( - GenesisForkId = 0 // genesis fork is always 0 in the datastream - insertEntryCount = 100_000 - commitEntryCountLimit = 80_000 -) - -// gets the blocks for the said batch from the reader -// writes a bookmarks, batch start, blocks and batch end -// basically writes a whole standalone batch -// plus the GER updates if the batch gap is > 1 -// starts atomicOp and commits it internally -func (srv *DataStreamServer) WriteWholeBatchToStream( - logPrefix string, - tx kv.Tx, - reader DbReader, - prevBatchNum, - batchNum uint64, -) error { - var err error - if err = srv.stream.StartAtomicOp(); err != nil { - return err - } - - blocksForBatch, err := reader.GetL2BlockNosByBatch(batchNum) - if err != nil { - return err - } - - var fromBlockNum, toBlockNum uint64 - for _, blockNum := range blocksForBatch { - if fromBlockNum == 0 || blockNum < fromBlockNum { - fromBlockNum = blockNum - } - if blockNum > toBlockNum { - toBlockNum = blockNum - } - } - - if err = srv.UnwindIfNecessary(logPrefix, reader, fromBlockNum, prevBatchNum, batchNum); err != nil { - return err - } - - blocks := make([]eritypes.Block, 0) - txsPerBlock := make(map[uint64][]eritypes.Transaction) - for blockNumber := fromBlockNum; blockNumber <= toBlockNum; blockNumber++ { - block, err := rawdb.ReadBlockByNumber(tx, blockNumber) - if err != nil { - return err - } - - blocks = append(blocks, *block) - txsPerBlock[blockNumber] = block.Transactions() - } - - entries, err := BuildWholeBatchStreamEntriesProto(tx, reader, srv.GetChainId(), batchNum, batchNum, blocks, txsPerBlock, make(map[uint64]uint64)) - if err != nil { - return err - } - - if err = srv.CommitEntriesToStreamProto(entries.Entries(), &toBlockNum, &batchNum); err != nil { - return err - } - - if err = srv.stream.CommitAtomicOp(); err != nil { - return err - } +const GenesisForkId = 0 // genesis fork is always 0 in the datastream - return nil -} - -// writes consecutively blocks from-to -// checks for all batch related stuff in the meantime - batch start, batche end, etc -// starts atomicOp and commits it internally -func (srv *DataStreamServer) WriteBlocksToStreamConsecutively( - ctx context.Context, - logPrefix string, +func (srv *DataStreamServer) WriteBlocksToStream( tx kv.Tx, reader DbReader, from, to uint64, + logPrefix string, ) error { - var err error - - // logger stuff t := utils.StartTimer("write-stream", "writeblockstostream") defer t.LogTimer() - logTicker := time.NewTicker(10 * time.Second) - totalToWrite := to - (from - 1) - copyFrom := from - ////////// - latestbatchNum, err := reader.GetBatchNoByL2Block(from - 1) - if err != nil { - return err - } - - batchNum, err := reader.GetBatchNoByL2Block(from) - if err != nil { - return err - } + var err error + logTicker := time.NewTicker(10 * time.Second) + var lastBlock *eritypes.Block if err = srv.stream.StartAtomicOp(); err != nil { return err } - - if err = srv.UnwindIfNecessary(logPrefix, reader, from, latestbatchNum, batchNum); err != nil { - return err - } - - // check if a new batch starts and the old needs closing before that - // if it is already closed with a batch end, do not add a new batch end - // this is needed because we have to write a batch end when writing a new block from the next batch - // because at the current block we might not know if it is the last one in the batch - // but we know for certain if it is a 1st block from a new batch - islastEntrybatchEnd, err := srv.IsLastEntryBatchEnd() - if err != nil { - return err - } - - lastBlock, err := rawdb.ReadBlockByNumber(tx, from-1) - if err != nil { - return err - } - - entries := make([]DataStreamEntryProto, 0, insertEntryCount) - var forkId uint64 -LOOP: + totalToWrite := to - (from - 1) + insertEntryCount := 100_000 + entries := make([]DataStreamEntryProto, insertEntryCount) + index := 0 + copyFrom := from + var latestbatchNum uint64 for currentBlockNumber := from; currentBlockNumber <= to; currentBlockNumber++ { select { case <-logTicker.C: log.Info(fmt.Sprintf("[%s]: progress", logPrefix), "block", currentBlockNumber, "target", to, "%", float64(currentBlockNumber-copyFrom)/float64(totalToWrite)*100) - case <-ctx.Done(): - break LOOP default: } - block, err := rawdb.ReadBlockByNumber(tx, currentBlockNumber) - if err != nil { - return err - } - - batchNum, err := reader.GetBatchNoByL2Block(currentBlockNumber) - if err != nil { - return err - } - - // fork id changes only per batch so query it only once per batch - if batchNum != latestbatchNum { - forkId, err = reader.GetForkId(batchNum) + if lastBlock == nil { + lastBlock, err = rawdb.ReadBlockByNumber(tx, currentBlockNumber-1) if err != nil { return err } } - blockEntries, err := createBlockWithBatchCheckStreamEntriesProto(reader, tx, block, lastBlock, batchNum, latestbatchNum, srv.chainId, forkId, islastEntrybatchEnd) + block, blockEntries, batchNum, err := srv.createBlockStreamEntriesWithBatchCheck(logPrefix, tx, reader, lastBlock, currentBlockNumber) if err != nil { return err } - entries = append(entries, blockEntries.Entries()...) - latestbatchNum = batchNum - lastBlock = block - // the check is needed only before the first block - // after that - write batch end before each batch start - islastEntrybatchEnd = false + for _, entry := range blockEntries { + entries[index] = entry + index++ + } // basically commit once 80% of the entries array is filled - if len(entries) >= commitEntryCountLimit { + if index+1 >= insertEntryCount*4/5 { log.Info(fmt.Sprintf("[%s] Commit count reached, committing entries", logPrefix), "block", currentBlockNumber) - if err = srv.CommitEntriesToStreamProto(entries, ¤tBlockNumber, &batchNum); err != nil { + if err = srv.CommitEntriesToStreamProto(entries[:index], ¤tBlockNumber, &batchNum); err != nil { return err } - entries = make([]DataStreamEntryProto, 0, insertEntryCount) + entries = make([]DataStreamEntryProto, insertEntryCount) + index = 0 } + + lastBlock = block } - if err = srv.CommitEntriesToStreamProto(entries, &to, &latestbatchNum); err != nil { + if err = srv.CommitEntriesToStreamProto(entries[:index], &to, &latestbatchNum); err != nil { return err } @@ -204,62 +89,41 @@ LOOP: return nil } -// gets other needed data from the reader -// writes a batchBookmark and batch start (if needed), block bookmark, block and txs in it -// basically a full standalone block -func (srv *DataStreamServer) WriteBlockWithBatchStartToStream( +func (srv *DataStreamServer) WriteBlockToStream( logPrefix string, tx kv.Tx, reader DbReader, - forkId, - batchNum, prevBlockBatchNum uint64, - prevBlock, block eritypes.Block, -) (err error) { + batchNum, prevBatchNum, + blockNum uint64, +) error { t := utils.StartTimer("write-stream", "writeblockstostream") defer t.LogTimer() - if err = srv.stream.StartAtomicOp(); err != nil { - return err - } - - blockNum := block.NumberU64() + var err error - if err = srv.UnwindIfNecessary(logPrefix, reader, blockNum, prevBlockBatchNum, batchNum); err != nil { + if err = srv.UnwindIfNecessary(logPrefix, reader, blockNum, prevBatchNum, batchNum); err != nil { return err } - // if start of new batch add batch start entries - var batchStartEntries *DataStreamEntries - if prevBlockBatchNum != batchNum { - gers, err := reader.GetBatchGlobalExitRootsProto(prevBlockBatchNum, batchNum) - if err != nil { - return err - } - - if batchStartEntries, err = createBatchStartEntriesProto(reader, tx, batchNum, prevBlockBatchNum, batchNum-prevBlockBatchNum, srv.GetChainId(), block.Root(), gers); err != nil { - return err - } + if err = srv.stream.StartAtomicOp(); err != nil { + return err } - l1InfoTreeMinTimestamps := make(map[uint64]uint64) - deltaTimestamp := block.Time() - prevBlock.Time() - if blockNum == 1 { - deltaTimestamp = block.Time() - l1InfoTreeMinTimestamps[0] = 0 + lastBlock, err := rawdb.ReadBlockByNumber(tx, blockNum-1) + if err != nil { + return err } - - blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, &block, block.Transactions(), forkId, deltaTimestamp, batchNum, make(map[uint64]uint64)) + block, err := rawdb.ReadBlockByNumber(tx, blockNum) if err != nil { return err } - if batchStartEntries != nil { - if err = srv.CommitEntriesToStreamProto(batchStartEntries.Entries(), &blockNum, &batchNum); err != nil { - return err - } + entries, err := createBlockWithBatchCheckStreamEntriesProto(srv.chainId, reader, tx, block, lastBlock, batchNum, prevBatchNum, make(map[uint64]uint64), false, nil) + if err != nil { + return err } - if err = srv.CommitEntriesToStreamProto(blockEntries.Entries(), &blockNum, &batchNum); err != nil { + if err = srv.CommitEntriesToStreamProto(entries, &blockNum, &batchNum); err != nil { return err } @@ -270,25 +134,21 @@ func (srv *DataStreamServer) WriteBlockWithBatchStartToStream( return nil } -// checks if the stream has blocks above the current one -// if there is something, try to unwind it -// in the unwind chek if the block is at batch start -// if it is - unwind to previous batch's end, so it deletes batch stat of current batch as well -func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader, blockNum, prevBlockBatchNum, batchNum uint64) error { +func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader, blockNum, prevBatchNum, batchNum uint64) error { // if from is higher than the last datastream block number - unwind the stream highestDatastreamBlock, err := srv.GetHighestBlockNumber() if err != nil { return err } - // if this is a new batch case, we must unwind to previous batch's batch end + //if this is a new batch case, we must unwind to previous batch's batch end // otherwise it would corrupt the datastream with batch bookmark after a batch start or something similar if highestDatastreamBlock >= blockNum { - if prevBlockBatchNum != batchNum { - log.Warn(fmt.Sprintf("[%s] Datastream must unwind to batch", logPrefix), "prevBlockBatchNum", prevBlockBatchNum, "batchNum", batchNum) + if prevBatchNum != batchNum { + log.Warn(fmt.Sprintf("[%s] Datastream must unwind to batch", logPrefix), "prevBatchNum", prevBatchNum, "batchNum", batchNum) //get latest block in prev batch - lastBlockInPrevbatch, err := reader.GetHighestBlockInBatch(prevBlockBatchNum) + lastBlockInPrevbatch, err := reader.GetHighestBlockInBatch(prevBatchNum) if err != nil { return err } @@ -296,7 +156,7 @@ func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader // this represents a case where the block we must unwind to is part of a previous batch // this should never happen since previous batch in this use must be already completed if lastBlockInPrevbatch != blockNum-1 { - return fmt.Errorf("datastream must unwind to prev batch, but it would corrupt the datastream: prevBlockBatchNum: %d, batchNum: %d, blockNum: %d", prevBlockBatchNum, batchNum, blockNum) + return fmt.Errorf("datastream must unwind to prev batch, but it would corrupt the datastream: prevBatchNum: %d, abtchNum: %d, blockNum: %d", prevBatchNum, batchNum, blockNum) } if err := srv.UnwindToBatchStart(batchNum); err != nil { @@ -313,6 +173,8 @@ func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader } func (srv *DataStreamServer) WriteBatchEnd( + logPrefix string, + tx kv.Tx, reader DbReader, batchNumber, lastBatchNumber uint64, @@ -328,7 +190,7 @@ func (srv *DataStreamServer) WriteBatchEnd( return err } - batchEndEntries, err := addBatchEndEntriesProto(batchNumber, stateRoot, gers, localExitRoot) + batchEndEntries, err := addBatchEndEntriesProto(tx, batchNumber, lastBatchNumber, stateRoot, gers, localExitRoot) if err != nil { return err } @@ -344,6 +206,48 @@ func (srv *DataStreamServer) WriteBatchEnd( return nil } +func (srv *DataStreamServer) createBlockStreamEntriesWithBatchCheck( + logPrefix string, + tx kv.Tx, + reader DbReader, + lastBlock *eritypes.Block, + blockNumber uint64, +) (*eritypes.Block, []DataStreamEntryProto, uint64, error) { + block, err := rawdb.ReadBlockByNumber(tx, blockNumber) + if err != nil { + return nil, nil, 0, err + } + + batchNum, err := reader.GetBatchNoByL2Block(blockNumber) + if err != nil { + return nil, nil, 0, err + } + + prevBatchNum, err := reader.GetBatchNoByL2Block(blockNumber - 1) + if err != nil { + return nil, nil, 0, err + } + + if err = srv.UnwindIfNecessary(logPrefix, reader, blockNumber, prevBatchNum, batchNum); err != nil { + return nil, nil, 0, err + } + + nextBatchNum, nextBatchExists, err := reader.CheckBatchNoByL2Block(blockNumber + 1) + if err != nil { + return nil, nil, 0, err + } + + // a 0 next batch num here would mean we don't know about the next batch so must be at the end of the batch + isBatchEnd := !nextBatchExists || nextBatchNum > batchNum + + entries, err := createBlockWithBatchCheckStreamEntriesProto(srv.chainId, reader, tx, block, lastBlock, batchNum, prevBatchNum, make(map[uint64]uint64), isBatchEnd, nil) + if err != nil { + return nil, nil, 0, err + } + + return block, entries, batchNum, nil +} + func (srv *DataStreamServer) WriteGenesisToStream( genesis *eritypes.Block, reader *hermez_db.HermezDbReader, diff --git a/zk/debug_tools/datastream-bytes/main.go b/zk/debug_tools/datastream-bytes/main.go index 83fba978f24..aecec678716 100644 --- a/zk/debug_tools/datastream-bytes/main.go +++ b/zk/debug_tools/datastream-bytes/main.go @@ -8,7 +8,6 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/gateway-fm/cdk-erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" ) @@ -31,35 +30,41 @@ func main() { err := db.View(context.Background(), func(tx kv.Tx) error { hermezDb := hermez_db.NewHermezDbReader(tx) - blockNumbers, err := hermezDb.GetL2BlockNosByBatch(uint64(batchNum)) + blocks, err := hermezDb.GetL2BlockNosByBatch(uint64(batchNum)) if err != nil { return err } - if len(blockNumbers) == 0 { + if len(blocks) == 0 { return fmt.Errorf("no blocks found for batch %d", batchNum) } + lastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0]-1) + if err != nil { + return err + } + previousBatch := batchNum - 1 - blocks := make([]types.Block, 0, len(blockNumbers)) - txsPerBlock := make(map[uint64][]types.Transaction) - for _, blockNumber := range blockNumbers { + for idx, blockNumber := range blocks { block, err := rawdb.ReadBlockByNumber(tx, blockNumber) if err != nil { return err } - blocks = append(blocks, *block) - txsPerBlock[blockNumber] = block.Transactions() - } - var l1InfoTreeMinTimestamps map[uint64]uint64 - entries, err := server.BuildWholeBatchStreamEntriesProto(tx, hermezDb, uint64(chainId), uint64(previousBatch), uint64(batchNum), blocks, txsPerBlock, l1InfoTreeMinTimestamps) - if err != nil { - return err - } - streamBytes, err = entries.Marshal() - if err != nil { - return err + + //gerUpdates := []dstypes.GerUpdate{} + var l1InfoTreeMinTimestamps map[uint64]uint64 + + isBatchEnd := idx == len(blocks)-1 + + sBytes, err := server.CreateAndBuildStreamEntryBytesProto(uint64(chainId), block, hermezDb, tx, lastBlock, uint64(batchNum), uint64(previousBatch), l1InfoTreeMinTimestamps, isBatchEnd, nil) + if err != nil { + return err + } + streamBytes = append(streamBytes, sBytes...) + lastBlock = block + // we only put in the batch bookmark at the start of the stream data once + previousBatch = batchNum } return nil diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go deleted file mode 100644 index 39674ea41cd..00000000000 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ /dev/null @@ -1,123 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/ledgerwatch/erigon/zk/datastream/client" - "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" - "github.com/ledgerwatch/erigon/zk/datastream/types" - "github.com/ledgerwatch/erigon/zk/debug_tools" -) - -func main() { - ctx := context.Background() - cfg, err := debug_tools.GetConf() - if err != nil { - panic(fmt.Sprintf("RPGCOnfig: %s", err)) - } - - // Create client - client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0) - - // Start client (connect to the server) - defer client.Stop() - if err := client.Start(); err != nil { - panic(err) - } - - // create bookmark - bookmark := types.NewBookmarkProto(1, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) - - var previousFile *types.FileEntry - progressBatch := uint64(0) - progressBlock := uint64(0) - function := func(file *types.FileEntry) error { - switch file.EntryType { - case types.BookmarkEntryType: - bookmark, err := types.UnmarshalBookmark(file.Data) - if err != nil { - return err - } - if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - progressBatch = bookmark.Value - if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { - return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) - } - } - if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - progressBlock = bookmark.Value - if previousFile != nil && - previousFile.EntryType != types.EntryTypeBatchStart && - previousFile.EntryType != types.EntryTypeL2Tx && - previousFile.EntryType != types.EntryTypeL2Block { - return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) - } - } - case types.EntryTypeBatchStart: - batchStart, err := types.UnmarshalBatchStart(file.Data) - if err != nil { - return err - } - progressBatch = batchStart.Number - if previousFile != nil { - if previousFile.EntryType != types.BookmarkEntryType { - return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) - } else { - bookmark, err := types.UnmarshalBookmark(previousFile.Data) - if err != nil { - return err - } - if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) - } - } - } - case types.EntryTypeBatchEnd: - if previousFile != nil && - previousFile.EntryType != types.EntryTypeL2Tx && - previousFile.EntryType != types.EntryTypeL2Block && - previousFile.EntryType != types.EntryTypeBatchStart { - return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) - } - case types.EntryTypeL2Tx: - if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { - return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) - } - case types.EntryTypeL2Block: - l2Block, err := types.UnmarshalL2Block(file.Data) - if err != nil { - return err - } - progressBlock = l2Block.L2BlockNumber - if previousFile != nil { - if previousFile.EntryType != types.BookmarkEntryType { - return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) - } else { - bookmark, err := types.UnmarshalBookmark(previousFile.Data) - if err != nil { - return err - } - if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) - } - - } - } - case types.EntryTypeGerUpdate: - return nil - default: - return fmt.Errorf("unexpected entry type: %v", file.EntryType) - } - - previousFile = file - return nil - } - // send start command - err = client.ExecutePerFile(bookmark, function) - fmt.Println("progress block: ", progressBlock) - fmt.Println("progress batch: ", progressBatch) - if err != nil { - panic(fmt.Sprintf("found an error: %s", err)) - } -} diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index a0631c2bc25..fe9eba22e73 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -17,7 +17,6 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/ledgerwatch/erigon/chain" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" @@ -228,7 +227,7 @@ func (v *LegacyExecutorVerifier) AddRequestUnsafe(request *VerifierRequest, sequ hermezDb := hermez_db.NewHermezDbReader(tx) l1InfoTreeMinTimestamps := make(map[uint64]uint64) - streamBytes, err := v.GetWholeBatchStreamBytes(request.BatchNumber, tx, blocks, hermezDb, l1InfoTreeMinTimestamps, nil) + streamBytes, err := v.GetStreamBytes(request.BatchNumber, tx, blocks, hermezDb, l1InfoTreeMinTimestamps, nil) if err != nil { return verifierBundle, err } @@ -330,10 +329,10 @@ func (v *LegacyExecutorVerifier) checkAndWriteToStream(tx kv.Tx, hdb *hermez_db. // check if we have the next batch we're waiting for if latestBatch == newBatch-1 { + v.lowestWrittenBatch = newBatch if err := v.WriteBatchToStream(newBatch, hdb, tx); err != nil { return err } - v.lowestWrittenBatch = newBatch delete(v.responsesToWrite, newBatch) } } @@ -441,8 +440,12 @@ func (v *LegacyExecutorVerifier) IsRequestAddedUnsafe(batch uint64) bool { func (v *LegacyExecutorVerifier) WriteBatchToStream(batchNumber uint64, hdb *hermez_db.HermezDbReader, roTx kv.Tx) error { log.Info("[Verifier] Writing batch to stream", "batch", batchNumber) + blks, err := hdb.GetL2BlockNosByBatch(batchNumber) + if err != nil { + return err + } - if err := v.streamServer.WriteWholeBatchToStream("verifier", roTx, hdb, v.lowestWrittenBatch, batchNumber); err != nil { + if err := v.streamServer.WriteBlocksToStream(roTx, hdb, blks[0], blks[len(blks)-1], "verifier"); err != nil { return err } return nil @@ -495,58 +498,49 @@ func (v *LegacyExecutorVerifier) availableBlocksToProcess(innerCtx context.Conte return blocks, nil } -func (v *LegacyExecutorVerifier) GetWholeBatchStreamBytes( +func (v *LegacyExecutorVerifier) GetStreamBytes( batchNumber uint64, tx kv.Tx, - blockNumbers []uint64, + blocks []uint64, hermezDb *hermez_db.HermezDbReader, l1InfoTreeMinTimestamps map[uint64]uint64, transactionsToIncludeByIndex [][]int, // passing nil here will include all transactions in the blocks -) (streamBytes []byte, err error) { - blocks := make([]types.Block, 0, len(blockNumbers)) - txsPerBlock := make(map[uint64][]types.Transaction) +) ([]byte, error) { + lastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0]-1) + if err != nil { + return nil, err + } + var streamBytes []byte // as we only ever use the executor verifier for whole batches we can safely assume that the previous batch // will always be the request batch - 1 and that the first block in the batch will be at the batch // boundary so we will always add in the batch bookmark to the stream previousBatch := batchNumber - 1 - for idx, blockNumber := range blockNumbers { + for idx, blockNumber := range blocks { block, err := rawdb.ReadBlockByNumber(tx, blockNumber) if err != nil { return nil, err } - blocks = append(blocks, *block) - filteredTransactions := block.Transactions() - // filter transactions by indexes that should be included - if transactionsToIncludeByIndex != nil { - filteredTransactions = filterTransactionByIndexes(block.Transactions(), transactionsToIncludeByIndex[idx]) - } + var sBytes []byte - txsPerBlock[blockNumber] = filteredTransactions - } + isBatchEnd := idx == len(blocks)-1 - entries, err := server.BuildWholeBatchStreamEntriesProto(tx, hermezDb, v.streamServer.GetChainId(), batchNumber, previousBatch, blocks, txsPerBlock, l1InfoTreeMinTimestamps) - if err != nil { - return nil, err - } - - return entries.Marshal() -} - -func filterTransactionByIndexes( - filteredTransactions types.Transactions, - transactionsToIncludeByIndex []int, -) types.Transactions { - if transactionsToIncludeByIndex != nil { - filteredTransactionsBuilder := make(types.Transactions, len(transactionsToIncludeByIndex)) - for i, txIndexInBlock := range transactionsToIncludeByIndex { - filteredTransactionsBuilder[i] = filteredTransactions[txIndexInBlock] + var transactionsToIncludeByIndexInBlock []int = nil + if transactionsToIncludeByIndex != nil { + transactionsToIncludeByIndexInBlock = transactionsToIncludeByIndex[idx] + } + sBytes, err = server.CreateAndBuildStreamEntryBytesProto(v.streamServer.GetChainId(), block, hermezDb, tx, lastBlock, batchNumber, previousBatch, l1InfoTreeMinTimestamps, isBatchEnd, transactionsToIncludeByIndexInBlock) + if err != nil { + return nil, err } - filteredTransactions = filteredTransactionsBuilder + streamBytes = append(streamBytes, sBytes...) + lastBlock = block + // we only put in the batch bookmark at the start of the stream data once + previousBatch = batchNumber } - return filteredTransactions + return streamBytes, nil } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index 20100319db3..c515534cc42 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -61,7 +61,7 @@ func SpawnStageDataStreamCatchup( createdTx = true } - finalBlockNumber, err := CatchupDatastream(ctx, logPrefix, tx, stream, cfg.chainId, cfg.streamVersion, cfg.hasExecutors) + finalBlockNumber, err := CatchupDatastream(logPrefix, tx, stream, cfg.chainId, cfg.streamVersion, cfg.hasExecutors) if err != nil { return err } @@ -77,7 +77,7 @@ func SpawnStageDataStreamCatchup( return err } -func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream *datastreamer.StreamServer, chainId uint64, streamVersion int, hasExecutors bool) (uint64, error) { +func CatchupDatastream(logPrefix string, tx kv.RwTx, stream *datastreamer.StreamServer, chainId uint64, streamVersion int, hasExecutors bool) (uint64, error) { srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) @@ -143,7 +143,7 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream } } - if err = srv.WriteBlocksToStreamConsecutively(ctx, logPrefix, tx, reader, previousProgress+1, finalBlockNumber); err != nil { + if err = srv.WriteBlocksToStream(tx, reader, previousProgress+1, finalBlockNumber, logPrefix); err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 288fdd43a57..6919f3d9784 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -74,15 +74,6 @@ func SpawnSequencingStage( getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(sdb.tx, hash, number) } hasExecutorForThisBatch := !isLastBatchPariallyProcessed && cfg.zk.HasExecutors() - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - isLastEntryBatchEnd, err := cfg.datastreamServer.IsLastEntryBatchEnd() - if err != nil { - return err - } - // injected batch if executionAt == 0 { // set the block height for the fork we're running at to ensure contract interactions are correct @@ -102,7 +93,8 @@ func SpawnSequencingStage( return err } - if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchNumber); err != nil { + // write the batch directly to the stream + if err = cfg.datastreamServer.WriteBlocksToStream(tx, sdb.hermezDb.HermezDbReader, injectedBatchBlockNumber, injectedBatchBlockNumber, logPrefix); err != nil { return err } @@ -115,23 +107,6 @@ func SpawnSequencingStage( return nil } - if !isLastBatchPariallyProcessed && !isLastEntryBatchEnd { - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", logPrefix, lastBatch)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(lastBatch, sdb.hermezDb.HermezDbReader, tx) - if err != nil { - return err - } - - lastBlock, err := rawdb.ReadBlockByNumber(sdb.tx, executionAt) - if err != nil { - return err - } - root := lastBlock.Root() - if err = cfg.datastreamServer.WriteBatchEnd(sdb.hermezDb, lastBatch, lastBatch-1, &root, &ler); err != nil { - return err - } - } - if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { return err } @@ -526,22 +501,25 @@ func SpawnSequencingStage( // because it would be later added twice counters := batchCounters.CombineCollectorsNoChanges(l1InfoIndex != 0) - if err = sdb.hermezDb.WriteBatchCounters(thisBatch, counters.UsedAsMap()); err != nil { + err = sdb.hermezDb.WriteBatchCounters(thisBatch, counters.UsedAsMap()) + if err != nil { return err } - if err = sdb.hermezDb.WriteIsBatchPartiallyProcessed(thisBatch); err != nil { + err = sdb.hermezDb.WriteIsBatchPartiallyProcessed(thisBatch) + if err != nil { return err } - if err = cfg.datastreamServer.WriteBlockWithBatchStartToStream(logPrefix, tx, sdb.hermezDb, forkId, thisBatch, lastBatch, *parentBlock, *block); err != nil { + if err = cfg.datastreamServer.WriteBlockToStream(logPrefix, tx, sdb.hermezDb, thisBatch, lastBatch, blockNumber); err != nil { return err } if err = tx.Commit(); err != nil { return err } - if tx, err = cfg.db.BeginRw(ctx); err != nil { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { return err } // TODO: This creates stacked up deferrals @@ -585,7 +563,7 @@ func SpawnSequencingStage( if !hasExecutorForThisBatch { blockRoot := block.Root() - if err = cfg.datastreamServer.WriteBatchEnd(sdb.hermezDb, thisBatch, lastBatch, &blockRoot, &ler); err != nil { + if err = cfg.datastreamServer.WriteBatchEnd(logPrefix, tx, sdb.hermezDb, thisBatch, lastBatch, &blockRoot, &ler); err != nil { return err } } diff --git a/zk/stages/stage_sequencer_executor_verify.go b/zk/stages/stage_sequencer_executor_verify.go index 68299035c7d..564bb6aa5cc 100644 --- a/zk/stages/stage_sequencer_executor_verify.go +++ b/zk/stages/stage_sequencer_executor_verify.go @@ -175,7 +175,8 @@ func SpawnSequencerExecutorVerifyStage( } l1InfoTreeMinTimestamps := make(map[uint64]uint64) - if _, err = cfg.verifier.GetWholeBatchStreamBytes(response.BatchNumber, tx, blockNumbers, hermezDbReader, l1InfoTreeMinTimestamps, nil); err != nil { + _, err = cfg.verifier.GetStreamBytes(response.BatchNumber, tx, blockNumbers, hermezDbReader, l1InfoTreeMinTimestamps, nil) + if err != nil { return err } @@ -216,7 +217,7 @@ func SpawnSequencerExecutorVerifyStage( senderMapKey := sender.Hex() blocksForStreamBytes, transactionsToIncludeByIndex := limboStreamBytesBuilderHelper.append(senderMapKey, blockNumber, i) - streamBytes, err := cfg.verifier.GetWholeBatchStreamBytes(response.BatchNumber, tx, blocksForStreamBytes, hermezDbReader, l1InfoTreeMinTimestamps, transactionsToIncludeByIndex) + streamBytes, err := cfg.verifier.GetStreamBytes(response.BatchNumber, tx, blocksForStreamBytes, hermezDbReader, l1InfoTreeMinTimestamps, transactionsToIncludeByIndex) if err != nil { return err }