Skip to content

Commit

Permalink
feat(op-node): pre-fetch receipts concurrently round 2 (#104)
Browse files Browse the repository at this point in the history
* remove rateLimiter for prefetch and use waitGroup

* PreFetchReceipts logic

* add lock

* miss continue

* add preFetchCache

* fix unit test

* fix unit test

* fix unit test

* fix e2e case

* rollback lru cache

* rollback lru cache

* make code simple

* maxConcurrentRequests/2

---------

Co-authored-by: Welkin <welkin.b@nodereal.com>
  • Loading branch information
welkin22 and Welkin authored Jan 2, 2024
1 parent 387eb6c commit e8d466b
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 31 deletions.
2 changes: 2 additions & 0 deletions op-node/rollup/derive/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ type L1ReceiptsFetcher interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error)
GoOrUpdatePreFetchReceipts(ctx context.Context, l1StartBlock uint64) error
ClearReceiptsCacheBefore(blockNumber uint64)
}

type SystemConfigL2Fetcher interface {
Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
}
}
eq.l1Fetcher.ClearReceiptsCacheBefore(eq.safeHead.L1Origin.Number)
}

func (eq *EngineQueue) logSyncProgress(reason string) {
Expand Down
17 changes: 17 additions & 0 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,14 @@ func TestEngineQueue_Finalize(t *testing.T) {
eq.origin = refD
prev.origin = refD
eq.safeHead = refC1
l1F.ExpectClearReceiptsCacheBefore(refC1.L1Origin.Number)
eq.postProcessSafeL2()

// now say D0 was included in E and became the new safe head
eq.origin = refE
prev.origin = refE
eq.safeHead = refD0
l1F.ExpectClearReceiptsCacheBefore(refD0.L1Origin.Number)
eq.postProcessSafeL2()

// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
Expand Down Expand Up @@ -703,16 +705,19 @@ func TestVerifyNewL1Origin(t *testing.T) {
newOrigin eth.L1BlockRef
expectReset bool
expectedFetchBlocks map[uint64]eth.L1BlockRef
verifyPass bool
}{
{
name: "L1OriginBeforeUnsafeOrigin",
newOrigin: refD,
expectReset: false,
verifyPass: true,
},
{
name: "Matching",
newOrigin: refF,
expectReset: false,
verifyPass: true,
},
{
name: "BlockNumberEqualDifferentHash",
Expand All @@ -723,11 +728,13 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refF.Time,
},
expectReset: true,
verifyPass: false,
},
{
name: "UnsafeIsParent",
newOrigin: refG,
expectReset: false,
verifyPass: true,
},
{
name: "UnsafeIsParentNumberDifferentHash",
Expand All @@ -738,6 +745,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refG.Time,
},
expectReset: true,
verifyPass: false,
},
{
name: "UnsafeIsOlderCanonical",
Expand All @@ -746,6 +754,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
expectedFetchBlocks: map[uint64]eth.L1BlockRef{
refF.Number: refF,
},
verifyPass: true,
},
{
name: "UnsafeIsOlderNonCanonical",
Expand All @@ -765,6 +774,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refF.Time,
},
},
verifyPass: false,
},
}
for _, test := range tests {
Expand Down Expand Up @@ -839,6 +849,9 @@ func TestVerifyNewL1Origin(t *testing.T) {
// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
eq.UnsafeL2Head()
if test.verifyPass {
l1F.ExpectClearReceiptsCacheBefore(refB.Number)
}
err = eq.Step(context.Background())
if test.expectReset {
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")
Expand Down Expand Up @@ -938,6 +951,7 @@ func TestBlockBuildingRace(t *testing.T) {

// Expect initial forkchoice update
eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil)
l1F.ExpectClearReceiptsCacheBefore(refA.Number)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")

// Expect initial building update, to process the attributes we queued up
Expand Down Expand Up @@ -1005,6 +1019,7 @@ func TestBlockBuildingRace(t *testing.T) {
}
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)

l1F.ExpectClearReceiptsCacheBefore(refA.Number)
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -1093,6 +1108,7 @@ func TestResetLoop(t *testing.T) {
eq.engineSyncTarget = refA2
eq.safeHead = refA1
eq.finalized = refA0
l1F.ExpectClearReceiptsCacheBefore(refA.Number)

// Qeueue up the safe attributes
require.Nil(t, eq.safeAttributes)
Expand All @@ -1111,6 +1127,7 @@ func TestResetLoop(t *testing.T) {
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")

l1F.ExpectClearReceiptsCacheBefore(refA.Number)
// Crux of the test. Should be in a valid state after the reset.
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "Should be able to step after a reset")

Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/derive/l1_traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error)
}

type L1Traversal struct {
Expand Down
10 changes: 10 additions & 0 deletions op-node/rollup/driver/metered_l1fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (m *MeteredL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.H
return m.inner.FetchReceipts(ctx, blockHash)
}

func (m *MeteredL1Fetcher) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) {
defer m.recordTime("PreFetchReceipts")()
return m.inner.PreFetchReceipts(ctx, blockHash)
}

var _ derive.L1Fetcher = (*MeteredL1Fetcher)(nil)

func (m *MeteredL1Fetcher) recordTime(method string) func() {
Expand All @@ -71,3 +76,8 @@ func (m *MeteredL1Fetcher) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Sta
defer m.recordTime("GoOrUpdatePreFetchReceipts")()
return m.inner.GoOrUpdatePreFetchReceipts(ctx, l1StartBlock)
}

func (m *MeteredL1Fetcher) ClearReceiptsCacheBefore(blockNumber uint64) {
defer m.recordTime("ClearReceiptsCacheBefore")()
m.inner.ClearReceiptsCacheBefore(blockNumber)
}
1 change: 1 addition & 0 deletions op-node/rollup/driver/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error)
}

type L1OriginSelectorIface interface {
Expand Down
90 changes: 90 additions & 0 deletions op-node/sources/caching/pre_fetch_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package caching

import (
"sync"

"github.com/ethereum/go-ethereum/common/prque"
)

type PreFetchCache[V any] struct {
m Metrics
label string
inner map[uint64]V
queue *prque.Prque[uint64, V]
lock sync.Mutex
maxSize int
}

func NewPreFetchCache[V any](m Metrics, label string, maxSize int) *PreFetchCache[V] {
return &PreFetchCache[V]{
m: m,
label: label,
inner: make(map[uint64]V),
queue: prque.New[uint64, V](nil),
maxSize: maxSize,
}
}

func (v *PreFetchCache[V]) Add(key uint64, value V) bool {
defer v.lock.Unlock()
v.lock.Lock()
if _, ok := v.inner[key]; ok {
return false
}
v.queue.Push(value, -key)
v.inner[key] = value
if v.m != nil {
v.m.CacheAdd(v.label, v.queue.Size(), false)
}
return true
}

func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFull bool) {
defer v.lock.Unlock()
v.lock.Lock()
if _, ok := v.inner[key]; ok {
return false, false
}
if v.queue.Size() >= v.maxSize {
return false, true
}
v.queue.Push(value, -key)
v.inner[key] = value
if v.m != nil {
v.m.CacheAdd(v.label, v.queue.Size(), false)
}
return true, false
}

func (v *PreFetchCache[V]) Get(key uint64) (V, bool) {
defer v.lock.Unlock()
v.lock.Lock()
value, ok := v.inner[key]
if v.m != nil {
v.m.CacheGet(v.label, ok)
}
return value, ok
}

func (v *PreFetchCache[V]) RemoveAll() {
defer v.lock.Unlock()
v.lock.Lock()
v.inner = make(map[uint64]V)
v.queue.Reset()
}

func (v *PreFetchCache[V]) RemoveLessThan(p uint64) (isRemoved bool) {
defer v.lock.Unlock()
v.lock.Lock()
for !v.queue.Empty() {
_, qKey := v.queue.Peek()
if -qKey < p {
v.queue.Pop()
delete(v.inner, -qKey)
isRemoved = true
continue
}
break
}
return
}
42 changes: 33 additions & 9 deletions op-node/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ type EthClient struct {

// cache receipts in bundles per block hash
// We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache
// common.Hash -> *receiptsFetchingJobPair
receiptsCache *caching.PreFetchCache[*receiptsFetchingJobPair]

// cache transactions in bundles per block hash
// common.Hash -> types.Transactions
Expand Down Expand Up @@ -172,7 +172,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
mustBePostMerge: config.MustBePostMerge,
provKind: config.RPCProviderKind,
log: log,
receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize),
receiptsCache: caching.NewPreFetchCache[*receiptsFetchingJobPair](metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize),
Expand Down Expand Up @@ -357,27 +357,51 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*
// It verifies the receipt hash in the block header against the receipt hash of the fetched receipts
// to ensure that the execution engine did not fail to return any receipts.
func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
blockInfo, receipts, err, _ := s.fetchReceiptsInner(ctx, blockHash, false)
return blockInfo, receipts, err
}

func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Hash, isForPreFetch bool) (eth.BlockInfo, types.Receipts, error, bool) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
return nil, nil, err
return nil, nil, err, false
}
// Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means
// that if just one of many calls fail, we only retry the failed call rather than all of the calls.
// The underlying fetcher uses the receipts hash to verify receipt integrity.
var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok {
job = v.(*receiptsFetchingJob)
var isFull bool
v, ok := s.receiptsCache.Get(info.NumberU64())
if ok && v.blockHash == blockHash {
job = v.job
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes)
s.receiptsCache.Add(blockHash, job)
_, isFull = s.receiptsCache.AddIfNotFull(info.NumberU64(), &receiptsFetchingJobPair{
blockHash: blockHash,
job: job,
})
if isForPreFetch && isFull {
return nil, nil, nil, true
}
}
receipts, err := job.Fetch(ctx)
if err != nil {
return nil, nil, err
return nil, nil, err, isFull
}

return info, receipts, nil
return info, receipts, nil, isFull
}

func (s *EthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) {
_, _, err, isFull := s.fetchReceiptsInner(ctx, blockHash, true)
if err != nil {
return false, err
}
if isFull {
return false, nil
}
return true, nil
}

// GetProof returns an account proof result, with any optional requested storage proofs.
Expand Down
Loading

0 comments on commit e8d466b

Please sign in to comment.