Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify datastream entry parsing #902

Merged
merged 8 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 39 additions & 91 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ type StreamClient struct {
progress atomic.Uint64

// Channels
batchStartChan chan types.BatchStart
batchEndChan chan types.BatchEnd
l2BlockChan chan types.FullL2Block
l2TxChan chan types.L2TransactionProto
gerUpdatesChan chan types.GerUpdate // NB: unused from etrog onwards (forkid 7)
entryChan chan interface{}

// keeps track of the latest fork from the stream to assign to l2 blocks
currentFork uint64
Expand All @@ -70,17 +66,14 @@ const (
// server must be in format "url:port"
func NewClient(ctx context.Context, server string, version int, checkTimeout time.Duration, latestDownloadedForkId uint16) *StreamClient {
c := &StreamClient{
ctx: ctx,
checkTimeout: checkTimeout,
server: server,
version: version,
streamType: StSequencer,
id: "",
batchStartChan: make(chan types.BatchStart, 100),
batchEndChan: make(chan types.BatchEnd, 100),
l2BlockChan: make(chan types.FullL2Block, 100000),
gerUpdatesChan: make(chan types.GerUpdate, 1000),
currentFork: uint64(latestDownloadedForkId),
ctx: ctx,
checkTimeout: checkTimeout,
server: server,
version: version,
streamType: StSequencer,
id: "",
entryChan: make(chan interface{}, 100000),
currentFork: uint64(latestDownloadedForkId),
}

return c
Expand All @@ -90,20 +83,8 @@ func (c *StreamClient) IsVersion3() bool {
return c.version >= versionAddedBlockEnd
}

func (c *StreamClient) GetBatchStartChan() chan types.BatchStart {
return c.batchStartChan
}
func (c *StreamClient) GetBatchEndChan() chan types.BatchEnd {
return c.batchEndChan
}
func (c *StreamClient) GetL2BlockChan() chan types.FullL2Block {
return c.l2BlockChan
}
func (c *StreamClient) GetL2TxChan() chan types.L2TransactionProto {
return c.l2TxChan
}
func (c *StreamClient) GetGerUpdatesChan() chan types.GerUpdate {
return c.gerUpdatesChan
func (c *StreamClient) GetEntryChan() chan interface{} {
return c.entryChan
}
func (c *StreamClient) GetLastWrittenTimeAtomic() *atomic.Int64 {
return &c.lastWrittenTime
Expand Down Expand Up @@ -132,8 +113,7 @@ func (c *StreamClient) Start() error {
func (c *StreamClient) Stop() {
c.conn.Close()

close(c.l2BlockChan)
close(c.gerUpdatesChan)
close(c.entryChan)
}

// Command header: Get status
Expand Down Expand Up @@ -321,38 +301,30 @@ LOOP:
c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout))
}

fullBlock, batchStart, batchEnd, gerUpdate, batchBookmark, blockBookmark, localErr := c.readFullBlockProto()
parsedProto, localErr := c.readParsedProto()
if localErr != nil {
err = localErr
break
}
c.lastWrittenTime.Store(time.Now().UnixNano())

// skip over bookmarks (but only when fullblock is nil or will miss l2 blocks)
if batchBookmark != nil || blockBookmark != nil {
switch parsedProto := parsedProto.(type) {
case *types.BookmarkProto:
continue
}

// write batch starts to channel
if batchStart != nil {
c.currentFork = (*batchStart).ForkId
c.batchStartChan <- *batchStart
}

if gerUpdate != nil {
c.gerUpdatesChan <- *gerUpdate
}

if batchEnd != nil {
// this check was inside c.readFullBlockProto() but it is better to move it here
c.batchEndChan <- *batchEnd
}

// ensure the block is assigned the currently known fork
if fullBlock != nil {
fullBlock.ForkId = c.currentFork
log.Trace("writing block to channel", "blockNumber", fullBlock.L2BlockNumber, "batchNumber", fullBlock.BatchNumber)
c.l2BlockChan <- *fullBlock
case *types.BatchStart:
c.currentFork = parsedProto.ForkId
c.entryChan <- parsedProto
case *types.GerUpdateProto:
c.entryChan <- parsedProto
case *types.BatchEnd:
c.entryChan <- parsedProto
case *types.FullL2Block:
parsedProto.ForkId = c.currentFork
log.Trace("writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber)
c.entryChan <- parsedProto
default:
err = fmt.Errorf("unexpected entry type: %v", parsedProto)
break LOOP
}
}

Expand All @@ -378,13 +350,8 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) readFullBlockProto() (
l2Block *types.FullL2Block,
batchStart *types.BatchStart,
batchEnd *types.BatchEnd,
gerUpdate *types.GerUpdate,
batchBookmark *types.BookmarkProto,
blockBookmark *types.BookmarkProto,
func (c *StreamClient) readParsedProto() (
parsedEntry interface{},
err error,
) {
file, err := c.readFileEntry()
Expand All @@ -395,34 +362,15 @@ func (c *StreamClient) readFullBlockProto() (

switch file.EntryType {
case types.BookmarkEntryType:
var bookmark *types.BookmarkProto
if bookmark, err = types.UnmarshalBookmark(file.Data); err != nil {
return
}
if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
batchBookmark = bookmark
return
} else {
blockBookmark = bookmark
return
}
parsedEntry, err = types.UnmarshalBookmark(file.Data)
case types.EntryTypeGerUpdate:
if gerUpdate, err = types.DecodeGerUpdateProto(file.Data); err != nil {
return
}
log.Trace("ger update", "ger", gerUpdate)
return
parsedEntry, err = types.DecodeGerUpdateProto(file.Data)
case types.EntryTypeBatchStart:
if batchStart, err = types.UnmarshalBatchStart(file.Data); err != nil {
return
}
return
parsedEntry, err = types.UnmarshalBatchStart(file.Data)
case types.EntryTypeBatchEnd:
if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil {
return
}
return
parsedEntry, err = types.UnmarshalBatchEnd(file.Data)
case types.EntryTypeL2Block:
var l2Block *types.FullL2Block
if l2Block, err = types.UnmarshalL2Block(file.Data); err != nil {
return
}
Expand Down Expand Up @@ -464,7 +412,7 @@ func (c *StreamClient) readFullBlockProto() (
return
}
} else if innerFile.IsBatchEnd() {
if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil {
if _, err = types.UnmarshalBatchEnd(file.Data); err != nil {
return
}
break LOOP
Expand All @@ -475,14 +423,14 @@ func (c *StreamClient) readFullBlockProto() (
}

l2Block.L2Txs = txs
parsedEntry = l2Block
return
case types.EntryTypeL2Tx:
err = fmt.Errorf("unexpected l2Tx out of block")
return
default:
err = fmt.Errorf("unexpected entry type: %d", file.EntryType)
return
}
return
}

// reads file bytes from socket and tries to parse them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"

"github.com/ledgerwatch/erigon/zk/datastream/client"
"github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/nsf/jsondiff"
)

Expand Down Expand Up @@ -80,13 +81,14 @@ func readFromClient(client *client.StreamClient, total int) ([]interface{}, erro

LOOP:
for {
select {
case d := <-client.GetL2BlockChan():
data = append(data, d)
count++
case d := <-client.GetGerUpdatesChan():
data = append(data, d)
entry := <-client.GetEntryChan()

switch entry.(type) {
case types.FullL2Block:
case types.GerUpdate:
data = append(data, entry)
count++
default:
}

if count == total {
Expand Down
2 changes: 1 addition & 1 deletion zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (db *HermezDbReader) GetBlockL1BlockHashes(fromBlockNo, toBlockNo uint64) (
return l1BlockHashes, nil
}

func (db *HermezDb) WriteBatchGlobalExitRoot(batchNumber uint64, ger dstypes.GerUpdate) error {
func (db *HermezDb) WriteBatchGlobalExitRoot(batchNumber uint64, ger *dstypes.GerUpdate) error {
return db.tx.Put(GLOBAL_EXIT_ROOTS_BATCHES, Uint64ToBytes(batchNumber), ger.EncodeToBytes())
}

Expand Down
Loading
Loading