Skip to content

Commit

Permalink
fix: the pruning batch logic to avoid the previous batch removing (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored May 29, 2024
1 parent 2f267b4 commit 3cfd715
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 20 deletions.
22 changes: 11 additions & 11 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,13 @@ func (c *Client) startBatchFetching() {
}
if batch.L1BlockNumber > PruningBlocks {
prunedBlockNumber := batch.L1BlockNumber - PruningBlocks
prefix := make([]byte, 8)
prefix := make([]byte, 12)
binary.BigEndian.PutUint64(prefix, prunedBlockNumber)
if err := c.db.Prune(prefix); err != nil {
logger.Errorf("failed to prune the database: %v", err)
}
}
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func (c *Client) TryGetBatch() (*sequencerv2types.Batch, error) {
fromBlockNumber := batch.BatchHeader.FromBlockNumber()
toBlockNumber := batch.BatchHeader.ToBlockNumber()
c.openL1BlockNumber.Store(batch.L1BlockNumber())
logger.Infof("get the batch with L2 block number from %d to %d", fromBlockNumber, toBlockNumber)
logger.Infof("get the batch with L1 block number %d with L2 block number from %d to %d", batch.L1BlockNumber(), fromBlockNumber, toBlockNumber)

// verify the L1 block number
batchHeader, err := c.getBatchHeader(batch.L1BlockNumber(), fromBlockNumber)
Expand All @@ -509,36 +509,36 @@ func (c *Client) TryGetBatch() (*sequencerv2types.Batch, error) {
return batch, ErrBatchNotFound
}
if batch.L1BlockNumber() != batchHeader.L1BlockNumber {
return nil, fmt.Errorf("the batch L1 block number %d is not equal to the rpc L1 block number %d", res.Batch.L1BlockNumber(), batchHeader.L1BlockNumber)
return batch, fmt.Errorf("the batch L1 block number %d is not equal to the rpc L1 block number %d", res.Batch.L1BlockNumber(), batchHeader.L1BlockNumber)
}
if fromBlockNumber != batchHeader.FromBlockNumber() {
return nil, fmt.Errorf("the batch from block number %d is not equal to the rpc from block number %d", fromBlockNumber, batchHeader.FromBlockNumber())
return batch, fmt.Errorf("the batch from block number %d is not equal to the rpc from block number %d", fromBlockNumber, batchHeader.FromBlockNumber())
}
if toBlockNumber != batchHeader.ToBlockNumber() {
return nil, fmt.Errorf("the batch to block number %d is not equal to the rpc to block number %d", toBlockNumber, batchHeader.ToBlockNumber())
return batch, fmt.Errorf("the batch to block number %d is not equal to the rpc to block number %d", toBlockNumber, batchHeader.ToBlockNumber())
}

// verify the committee root
if err := c.verifyCommitteeRoot(batch); err != nil {
logger.Warnf("failed to verify the committee root: %v", err)
return nil, err
return batch, err
}

// verify if the batch hash is correct
batchHash := batch.BatchHeader.Hash()
bhHash := batchHeader.Hash()
if !bytes.Equal(batchHash, bhHash) {
return nil, fmt.Errorf("the batch hash %s is not equal to the batch header hash %s", batchHash, utils.Bytes2Hex(bhHash))
return batch, fmt.Errorf("the batch hash %s is not equal to the batch header hash %s", batchHash, utils.Bytes2Hex(bhHash))
}

// verify the proposer signature
if len(batch.ProposerPubKey) == 0 {
return nil, fmt.Errorf("the block %d proposer key is empty", batch.BatchNumber())
return batch, fmt.Errorf("the block %d proposer key is empty", batch.BatchNumber())
}
blsSigHash := batch.BlsSignature().Hash()
verified, err := c.blsScheme.VerifySignature(common.FromHex(batch.ProposerPubKey), blsSigHash, common.FromHex(batch.ProposerSignature))
if err != nil || !verified {
return nil, fmt.Errorf("failed to verify the proposer signature: %v", err)
return batch, fmt.Errorf("failed to verify the proposer signature: %v", err)
}

telemetry.SetGauge(float64(batch.BatchNumber()), "client", "current_batch_number")
Expand All @@ -565,7 +565,7 @@ func (c *Client) getCommitteeRoot(blockNumber uint64) (*committee.ILagrangeCommi

func (c *Client) verifyCommitteeRoot(batch *sequencerv2types.Batch) error {
blockNumber := batch.L1BlockNumber()
prevBatchL1Number := batch.L1BlockNumber()
prevBatchL1Number := uint64(0)
isGenesis := c.genesisBlockNumber == blockNumber
// verify the previous batch's next committee root
if !isGenesis {
Expand Down
6 changes: 3 additions & 3 deletions rpcclient/arbitrum/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
if err != nil {
return err
}
telemetry.MeasureSince(ti, "rpc_arbitrum", "l1_filter_logs")
telemetry.MeasureSince(ti, "rpc", "l1_filter_logs")

// sort the batches by L1 block number and L1 tx index
sort.Slice(batches, func(i, j int) bool {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
// The range is [start, end].
func (f *Fetcher) getL2BlockHashes(start, end uint64) ([]*sequencerv2types.BlockHeader, error) {
ti := time.Now()
defer telemetry.MeasureSince(ti, "rpc_arbitrum", "fetch_l2_block_hashes")
defer telemetry.MeasureSince(ti, "rpc", "fetch_l2_block_hashes")

g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(f.concurrentFetcher)
Expand Down Expand Up @@ -300,7 +300,7 @@ func (f *Fetcher) fetchBlock(blockNumber uint64, txHash common.Hash) ([]byte, er
logger.Errorf("failed to get blobs: %v", err)
return nil, err
}
telemetry.MeasureSince(ti, "rpc_arbitrum", "fetch_beacon_blobs")
telemetry.MeasureSince(ti, "rpc", "fetch_beacon_blobs")
if len(blobs) != len(hashes) {
logger.Errorf("blobs length is not matched: %d, %d", len(blobs), len(hashes))
return nil, fmt.Errorf("blobs length is not matched: %d, %d", len(blobs), len(hashes))
Expand Down
9 changes: 9 additions & 0 deletions rpcclient/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -20,6 +21,7 @@ var _ types.RpcClient = (*Client)(nil)
type Client struct {
evmclient.Client

mtx sync.Mutex
fromL1BlockNumber uint64

chainID uint32
Expand Down Expand Up @@ -64,13 +66,17 @@ func (c *Client) GetFinalizedBlockNumber() (uint64, error) {

// SetBeginBlockNumber sets the begin L1 & L2 block number.
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
c.fromL1BlockNumber = l1BlockNumber
return true
}

// NextBatch returns the next batch after SetBeginBlockNumber.
func (c *Client) NextBatch() (*sequencerv2types.BatchHeader, error) {
c.mtx.Lock()
l2BlockNumber := c.fromL1BlockNumber
c.mtx.Unlock()
blockHeader, err := c.GetBlockHeaderByNumber(l2BlockNumber, common.Hash{})
if err != nil {
if errors.Is(err, types.ErrNoResult) {
Expand All @@ -91,7 +97,10 @@ func (c *Client) NextBatch() (*sequencerv2types.BatchHeader, error) {
}
}

c.mtx.Lock()
c.fromL1BlockNumber++
c.mtx.Unlock()

return &sequencerv2types.BatchHeader{
BatchNumber: blockHeader.Number.Uint64(),
ChainId: c.chainID,
Expand Down
6 changes: 3 additions & 3 deletions rpcclient/optimism/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
if err := g.Wait(); err != nil {
return err
}
telemetry.MeasureSince(ti, "rpc_optimism", "fetch_l1_blocks")
telemetry.MeasureSince(ti, "rpc", "fetch_l1_blocks")
framesRefs := make([]*FramesRef, 0)
m.Range(func(_, ref interface{}) bool {
framesRefs = append(framesRefs, ref.(*FramesRef))
Expand All @@ -222,7 +222,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
// The range is [start, end].
func (f *Fetcher) getL2BlockHashes(start, end uint64) ([]*sequencerv2types.BlockHeader, error) {
ti := time.Now()
defer telemetry.MeasureSince(ti, "rpc_arbitrum", "fetch_l2_block_hashes")
defer telemetry.MeasureSince(ti, "rpc", "fetch_l2_block_hashes")

g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(f.concurrentFetcher)
Expand Down Expand Up @@ -363,7 +363,7 @@ func (f *Fetcher) fetchBlock(blockNumber uint64) ([]*FramesRef, error) {
logger.Errorf("failed to get blobs: %v", err)
return nil, err
}
telemetry.MeasureSince(ti, "rpc_optimism", "fetch_beacon_blobs")
telemetry.MeasureSince(ti, "rpc", "fetch_beacon_blobs")
if len(blobs) != len(hashes) {
logger.Errorf("blobs length is not matched: %d, %d", len(blobs), len(hashes))
return nil, fmt.Errorf("blobs length is not matched: %d, %d", len(blobs), len(hashes))
Expand Down
4 changes: 2 additions & 2 deletions store/goleveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (d *DB) Prune(prefix []byte) error {
defer iter.Release()

iter.Seek(prefix)
iter.Prev()
for ; iter.Valid(); iter.Prev() {
iter.Prev() // Skip the last key-value pair with the prefix.
for iter.Prev(); iter.Valid(); iter.Prev() {
key := iter.Key()

if err := d.db.Delete(key, &opt.WriteOptions{Sync: true}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion store/goleveldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestGoLevelDB(t *testing.T) {
require.Equal(t, 4, count)

// prune
require.NoError(t, db.Prune([]byte("key1")))
require.NoError(t, db.Prune([]byte("key2")))
_, err = db.Get([]byte("key1"))
require.NoError(t, err)
_, err = db.Get([]byte("key"))
Expand Down

0 comments on commit 3cfd715

Please sign in to comment.