Skip to content

Commit

Permalink
refactor datastream server (#837)
Browse files Browse the repository at this point in the history
* refactor datastream server

* optimization and nil checks

* added a datastream check tool

* fix rpc to seqencer switching in datastream

* move batch end check after injected
  • Loading branch information
V-Staykov authored Jul 23, 2024
1 parent 9579348 commit fc8972b
Show file tree
Hide file tree
Showing 11 changed files with 599 additions and 244 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ func (s *Ethereum) PreStart() error {
// so here we loop and take a brief pause waiting for it to be ready
attempts := 0
for {
_, err = zkStages.CatchupDatastream("stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors())
_, err = zkStages.CatchupDatastream(s.sentryCtx, "stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors())
if err != nil {
if errors.Is(err, datastreamer.ErrAtomicOpNotAllowed) {
attempts++
Expand Down
40 changes: 40 additions & 0 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,46 @@ func (c *StreamClient) ReadEntries(bookmark *types.BookmarkProto, l2BlocksAmount
return fullL2Blocks, gerUpates, batchBookmarks, blockBookmarks, entriesRead, nil
}

func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function func(file *types.FileEntry) error) error {
// Get header from server
if err := c.GetHeader(); err != nil {
return fmt.Errorf("%s get header error: %v", c.id, err)
}

protoBookmark, err := bookmark.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal bookmark: %v", err)
}

if err := c.initiateDownloadBookmark(protoBookmark); err != nil {
return err
}
count := uint64(0)
logTicker := time.NewTicker(10 * time.Second)

for {
select {
case <-logTicker.C:
fmt.Println("Entries read count: ", count)
default:
}
if c.Header.TotalEntries == count {
break
}
file, err := c.readFileEntry()
if err != nil {
return fmt.Errorf("error reading file entry: %v", err)
}
if err := function(file); err != nil {
return fmt.Errorf("error executing function: %v", err)

}
count++
}

return nil
}

// reads entries to the end of the stream
// at end will wait for new entries to arrive
func (c *StreamClient) ReadAllEntriesToChannel() error {
Expand Down
205 changes: 148 additions & 57 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package server

import (
"fmt"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
zktypes "github.com/ledgerwatch/erigon/zk/types"
"github.com/ledgerwatch/erigon/zk/utils"

"github.com/gateway-fm/cdk-erigon-lib/common"
libcommon "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
eritypes "github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ledgerwatch/erigon/zk/datastream/types"

"github.com/ledgerwatch/erigon/zk/hermez_db"
)

type DbReader interface {
GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error)
GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error)
GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error)
GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error)
GetForkId(batchNumber uint64) (uint64, error)
GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error)
Expand Down Expand Up @@ -84,10 +85,33 @@ func (d *DataStreamEntries) AddMany(entries []DataStreamEntryProto) {
}
}

func (d *DataStreamEntries) Size() int {
if d == nil || d.entries == nil {
return 0
}
return len(d.entries)
}

func (d *DataStreamEntries) Entries() []DataStreamEntryProto {
if d == nil || d.entries == nil {
return []DataStreamEntryProto{}
}
return d.entries
}

func (d *DataStreamEntries) Marshal() (result []byte, err error) {
var b []byte
for _, entry := range d.entries {
b, err = encodeEntryToBytesProto(entry)
if err != nil {
return nil, err
}
result = append(result, b...)
}

return result, nil
}

func NewDataStreamEntries(size int) *DataStreamEntries {
return &DataStreamEntries{
entries: make([]DataStreamEntryProto, size),
Expand Down Expand Up @@ -115,85 +139,82 @@ func (srv *DataStreamServer) CommitEntriesToStreamProto(entries []DataStreamEntr
}

if latestBlockNum != nil {
srv.highestBlockWritten = latestBlockNum
a := *latestBlockNum
srv.highestBlockWritten = &a
}

if latestBatchNum != nil {
srv.highestBatchWritten = latestBatchNum
a := *latestBatchNum
srv.highestBatchWritten = &a
}
return nil
}

func createBlockWithBatchCheckStreamEntriesProto(
chainId uint64,
reader DbReader,
tx kv.Tx,
block,
lastBlock *eritypes.Block,
batchNumber,
lastBatchNumber uint64,
l1InfoTreeMinTimestamps map[uint64]uint64,
isBatchEnd bool,
transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks
) ([]DataStreamEntryProto, error) {
lastBatchNumber,
chainId,
forkId uint64,
shouldSkipBatchEndEntry bool,
) (*DataStreamEntries, error) {
var err error
var startEntriesProto, blockEntriesProto, endEntriesProto []DataStreamEntryProto

gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber)
if err != nil {
return nil, err
}

var endEntriesProto []DataStreamEntryProto
var startEntriesProto, blockEntries *DataStreamEntries
// we might have a series of empty batches to account for, so we need to know the gap
batchGap := batchNumber - lastBatchNumber
isBatchStart := batchGap > 0

// filter transactions by indexes that should be included
filteredTransactions := filterTransactionByIndexes(block.Transactions(), transactionsToIncludeByIndex)

blockNum := block.NumberU64()
// batch start
// BATCH BOOKMARK
if isBatchStart {
gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber)
if err != nil {
return nil, err
}
// the genesis we insert fully, so we would have to skip closing it
if !shouldSkipBatchEndEntry {
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
if err != nil {
return nil, err
}
blockRoot := block.Root()
if endEntriesProto, err = addBatchEndEntriesProto(lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil {
return nil, err
}
}

if startEntriesProto, err = createBatchStartEntriesProto(reader, tx, batchNumber, lastBatchNumber, batchGap, chainId, block.Root(), gers); err != nil {
return nil, err
}
}

forkId, err := reader.GetForkId(batchNumber)
if err != nil {
return nil, err
}
blockNum := block.NumberU64()

l1InfoTreeMinTimestamps := make(map[uint64]uint64)
deltaTimestamp := block.Time() - lastBlock.Time()
if blockNum == 1 {
deltaTimestamp = block.Time()
l1InfoTreeMinTimestamps[0] = 0
}

blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, block, filteredTransactions, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps)
if err != nil {
if blockEntries, err = createFullBlockStreamEntriesProto(reader, tx, block, block.Transactions(), forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps); err != nil {
return nil, err
}
blockEntriesProto = blockEntries.Entries()

if isBatchEnd {
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
if err != nil {
return nil, err
}
blockRoot := block.Root()
if endEntriesProto, err = addBatchEndEntriesProto(tx, batchNumber, lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil {
return nil, err
}
if blockEntries.Size() == 0 {
return nil, fmt.Errorf("didn't create any entries for block %d", blockNum)
}

entries := NewDataStreamEntries(len(startEntriesProto) + len(blockEntriesProto) + len(endEntriesProto))
entries.AddMany(startEntriesProto)
entries.AddMany(blockEntriesProto)
entries := NewDataStreamEntries(len(endEntriesProto) + startEntriesProto.Size() + blockEntries.Size())
entries.AddMany(endEntriesProto)
entries.AddMany(startEntriesProto.Entries())
entries.AddMany(blockEntries.Entries())

return entries.Entries(), nil
return entries, nil
}

func createFullBlockStreamEntriesProto(
Expand Down Expand Up @@ -283,32 +304,102 @@ func createTransactionEntryProto(
return txProto, nil
}

func CreateAndBuildStreamEntryBytesProto(
chainId uint64,
block *eritypes.Block,
reader *hermez_db.HermezDbReader,
func BuildWholeBatchStreamEntriesProto(
tx kv.Tx,
lastBlock *eritypes.Block,
reader DbReader,
chainId uint64,
previousBatchNumber,
batchNumber uint64,
lastBatchNumber uint64,
blocks []eritypes.Block,
txsPerBlock map[uint64][]eritypes.Transaction,
l1InfoTreeMinTimestamps map[uint64]uint64,
isBatchEnd bool,
transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks
) (result []byte, err error) {
entries, err := createBlockWithBatchCheckStreamEntriesProto(chainId, reader, tx, block, lastBlock, batchNumber, lastBatchNumber, l1InfoTreeMinTimestamps, isBatchEnd, transactionsToIncludeByIndex)
) (allEntries *DataStreamEntries, err error) {
var batchEndEntries []DataStreamEntryProto
var batchStartEntries *DataStreamEntries

forkId, err := reader.GetForkId(batchNumber)
if err != nil {
return nil, err
}

for _, entry := range entries {
b, err := encodeEntryToBytesProto(entry)
gers, err := reader.GetBatchGlobalExitRootsProto(previousBatchNumber, batchNumber)
if err != nil {
return nil, err
}

if batchStartEntries, err = createBatchStartEntriesProto(reader, tx, batchNumber, previousBatchNumber, batchNumber-previousBatchNumber, chainId, blocks[0].Root(), gers); err != nil {
return nil, err
}

prevBatchLastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0].NumberU64()-1)
if err != nil {
return nil, err
}

lastBlock := *prevBatchLastBlock

blocksEntries := make([]DataStreamEntryProto, 0)

for _, block := range blocks {
blockNum := block.NumberU64()

deltaTimestamp := block.Time() - lastBlock.Time()
if blockNum == 1 {
deltaTimestamp = block.Time()
l1InfoTreeMinTimestamps[0] = 0
}

txForBlock, found := txsPerBlock[blockNum]
if !found {
return nil, fmt.Errorf("no transactions array found for block %d", blockNum)
}

blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, &block, txForBlock, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps)
if err != nil {
return nil, err
}
result = append(result, b...)
blocksEntries = append(blocksEntries, blockEntries.Entries()...)

lastBlock = block
}

return result, nil
// the genesis we insert fully, so we would have to skip closing it
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
if err != nil {
return nil, err
}

blockRoot := lastBlock.Root()

batchEndEntries, err = addBatchEndEntriesProto(batchNumber, &blockRoot, gers, &localExitRoot)
if err != nil {
return nil, err
}

allEntries = NewDataStreamEntries(batchStartEntries.Size() + len(blocksEntries) + len(batchEndEntries))
allEntries.AddMany(batchStartEntries.Entries())
allEntries.AddMany(blocksEntries)
allEntries.AddMany(batchEndEntries)

return allEntries, nil
}

func (srv *DataStreamServer) IsLastEntryBatchEnd() (isBatchEnd bool, err error) {
header := srv.stream.GetHeader()

if header.TotalEntries == 0 {
return false, nil
}

//find end block entry to delete from it onward
entryNum := header.TotalEntries - 1
var entry datastreamer.FileEntry
entry, err = srv.stream.GetEntry(entryNum)
if err != nil {
return false, err
}

return uint32(entry.Type) == uint32(types.EntryTypeBatchEnd), nil
}

func (srv *DataStreamServer) GetHighestBlockNumber() (uint64, error) {
Expand Down
Loading

0 comments on commit fc8972b

Please sign in to comment.