Skip to content

Commit

Permalink
fix(op-node): pre-fetching handle L1 reOrg (#115)
Browse files Browse the repository at this point in the history
Co-authored-by: Welkin <welkin.b@nodereal.com>
  • Loading branch information
welkin22 and Welkin authored Jan 26, 2024
1 parent 8f623e6 commit a4e7213
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 6 deletions.
3 changes: 2 additions & 1 deletion op-node/sources/caching/pre_fetch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu
defer v.lock.Unlock()
v.lock.Lock()
if _, ok := v.inner[key]; ok {
return false, false
v.inner[key] = value
return true, false
}
if v.queue.Size() >= v.maxSize {
return false, true
Expand Down
44 changes: 39 additions & 5 deletions op-node/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
}
}

const sequencerConfDepth = 15

// L1Client provides typed bindings to retrieve L1 data from an RPC source,
// with optimized batch requests, cached results, and flag to not trust the RPC
// (i.e. to verify all returned contents against corresponding block hashes).
Expand Down Expand Up @@ -139,6 +141,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
s.log.Info("pre-fetching receipts start", "startBlock", l1Start)
go func() {
var currentL1Block uint64
var parentHash *common.Hash
for {
select {
case <-s.done:
Expand All @@ -147,6 +150,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
case currentL1Block = <-s.preFetchReceiptsStartBlockChan:
s.log.Debug("pre-fetching receipts currentL1Block changed", "block", currentL1Block)
s.receiptsCache.RemoveAll()
parentHash = nil
default:
blockRef, err := s.L1BlockRefByLabel(ctx, eth.Unsafe)
if err != nil {
Expand All @@ -169,6 +173,9 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
taskCount = int(blockRef.Number-currentL1Block) + 1
}

blockInfoChan := make(chan eth.L1BlockRef, taskCount)
oldestFetchBlockNumber := currentL1Block

var wg sync.WaitGroup
for i := 0; i < taskCount; i++ {
wg.Add(1)
Expand All @@ -179,15 +186,17 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
case <-s.done:
return
default:
if _, ok := s.receiptsCache.Get(blockNumber); ok {
return
}
pair, ok := s.receiptsCache.Get(blockNumber)
blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber)
if err != nil {
s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber)
time.Sleep(1 * time.Second)
continue
}
if ok && pair.blockHash == blockInfo.Hash {
return
}

isSuccess, err := s.PreFetchReceipts(ctx, blockInfo.Hash)
if err != nil {
s.log.Warn("failed to pre-fetch receipts", "err", err)
Expand All @@ -198,14 +207,39 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
time.Sleep(1 * time.Second)
continue
}
s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number)
break
s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number, "hash", blockInfo.Hash)
blockInfoChan <- blockInfo
return
}
}
}(ctx, currentL1Block)
currentL1Block = currentL1Block + 1
}
wg.Wait()
close(blockInfoChan)

//try to find out l1 reOrg and return to an earlier block height for re-prefetching
var latestBlockHash common.Hash
latestBlockNumber := uint64(0)
var oldestBlockParentHash common.Hash
for l1BlockInfo := range blockInfoChan {
if l1BlockInfo.Number > latestBlockNumber {
latestBlockHash = l1BlockInfo.Hash
latestBlockNumber = l1BlockInfo.Number
}
if l1BlockInfo.Number == oldestFetchBlockNumber {
oldestBlockParentHash = l1BlockInfo.ParentHash
}
}

s.log.Debug("pre-fetching receipts hash", "latestBlockHash", latestBlockHash, "latestBlockNumber", latestBlockNumber, "oldestBlockNumber", oldestFetchBlockNumber, "oldestBlockParentHash", oldestBlockParentHash)
if parentHash != nil && oldestBlockParentHash != (common.Hash{}) && oldestBlockParentHash != *parentHash && currentL1Block >= sequencerConfDepth+uint64(taskCount) {
currentL1Block = currentL1Block - sequencerConfDepth - uint64(taskCount)
s.log.Warn("pre-fetching receipts found l1 reOrg, return to an earlier block height for re-prefetching", "recordParentHash", *parentHash, "unsafeParentHash", oldestBlockParentHash, "number", oldestFetchBlockNumber, "backToNumber", currentL1Block)
parentHash = nil
continue
}
parentHash = &latestBlockHash
}
}
}()
Expand Down
167 changes: 167 additions & 0 deletions op-node/sources/l1_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package sources

import (
"context"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGoOrUpdatePreFetchReceipts(t *testing.T) {
t.Run("handleReOrg", func(t *testing.T) {
m := new(mockRPC)
ctx := context.Background()
clientLog := testlog.Logger(t, log.LvlDebug)
latestHead := &rpcHeader{
ParentHash: randHash(),
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: types.EmptyRootHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
Bloom: eth.Bytes256{},
Difficulty: hexutil.Big{},
Number: 100,
GasLimit: 0,
GasUsed: 0,
Time: 0,
Extra: nil,
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
WithdrawalsRoot: nil,
Hash: randHash(),
}
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{"latest", false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = latestHead
}).Return([]error{nil})
for i := 81; i <= 90; i++ {
currentHead := &rpcHeader{
ParentHash: randHash(),
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: types.EmptyRootHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
Bloom: eth.Bytes256{},
Difficulty: hexutil.Big{},
Number: hexutil.Uint64(i),
GasLimit: 0,
GasUsed: 0,
Time: 0,
Extra: nil,
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
WithdrawalsRoot: nil,
Hash: randHash(),
}
currentBlock := &rpcBlock{
rpcHeader: *currentHead,
Transactions: []*types.Transaction{},
}
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = currentHead
}).Return([]error{nil})
m.On("CallContext", ctx, new(*rpcBlock),
"eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcBlock) = currentBlock
}).Return([]error{nil})
}
for i := 91; i <= 100; i++ {
currentHead := &rpcHeader{
ParentHash: randHash(),
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: types.EmptyRootHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
Bloom: eth.Bytes256{},
Difficulty: hexutil.Big{},
Number: hexutil.Uint64(i),
GasLimit: 0,
GasUsed: 0,
Time: 0,
Extra: nil,
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
WithdrawalsRoot: nil,
Hash: randHash(),
}
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = currentHead
}).Return([]error{nil})
currentBlock := &rpcBlock{
rpcHeader: *currentHead,
Transactions: []*types.Transaction{},
}
m.On("CallContext", ctx, new(*rpcBlock),
"eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcBlock) = currentBlock
}).Return([]error{nil})
}
var lastParentHeader common.Hash
var real100Hash common.Hash
for i := 76; i <= 100; i++ {
currentHead := &rpcHeader{
ParentHash: lastParentHeader,
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: types.EmptyRootHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
Bloom: eth.Bytes256{},
Difficulty: hexutil.Big{},
Number: hexutil.Uint64(i),
GasLimit: 0,
GasUsed: 0,
Time: 0,
Extra: nil,
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
WithdrawalsRoot: nil,
Hash: randHash(),
}
if i == 100 {
real100Hash = currentHead.Hash
}
lastParentHeader = currentHead.Hash
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = currentHead
}).Return([]error{nil})
currentBlock := &rpcBlock{
rpcHeader: *currentHead,
Transactions: []*types.Transaction{},
}
m.On("CallContext", ctx, new(*rpcBlock),
"eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) {
*args[1].(**rpcBlock) = currentBlock
}).Return([]error{nil})
}
s, err := NewL1Client(m, clientLog, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 1000}, true, RPCKindBasic))
require.NoError(t, err)
err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81)
require.NoError(t, err2)
time.Sleep(1 * time.Second)
pair, ok := s.receiptsCache.Get(100)
require.True(t, ok, "100 cache miss")
require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash)
_, ok2 := s.receiptsCache.Get(76)
require.True(t, ok2, "76 cache miss")
})
}

0 comments on commit a4e7213

Please sign in to comment.