Skip to content

Commit

Permalink
op-node: receipts and txs caches limited by volume
Browse files Browse the repository at this point in the history
  • Loading branch information
bendanzhentan committed Dec 25, 2023
1 parent 7947c25 commit 40dd44d
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 53 deletions.
82 changes: 82 additions & 0 deletions op-node/sources/caching/blob_lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package caching

import (
lru "github.com/hashicorp/golang-lru/v2"

Check failure on line 4 in op-node/sources/caching/blob_lru.go

View workflow job for this annotation

GitHub Actions / op-node-lint

File is not `goimports`-ed (goimports)
"math"
"sync"
)

type SizeFn func(value any) int

// SizeConstrainedCache is a cache where capacity is in bytes (instead of item count). When the cache
// is at capacity, and a new item is added, older items are evicted until the size
// constraint is met.
//
// OBS: This cache assumes that items are content-addressed: keys are unique per content.
// In other words: two Add(..) with the same key K, will always have the same value V.
type SizeConstrainedCache[K comparable, V any] struct {
m Metrics
label string
size int
maxSize int
sizeFn SizeFn
lru *lru.Cache[K, V]
lock sync.Mutex
}

// NewSizeConstrainedCache creates a new size-constrained LRU cache.
func NewSizeConstrainedCache[K comparable, V any](m Metrics, label string, maxSize int, sizeFn SizeFn) *SizeConstrainedCache[K, V] {
cache, _ := lru.New[K, V](math.MaxInt)
return &SizeConstrainedCache[K, V]{
m: m,
label: label,
size: 0,
maxSize: maxSize,
sizeFn: sizeFn,
lru: cache,
}
}

// Add adds a value to the cache. Returns true if an eviction occurred.
// OBS: This cache assumes that items are content-addressed: keys are unique per content.
// In other words: two Add(..) with the same key K, will always have the same value V.
// OBS: The value is _not_ copied on Add, so the caller must not modify it afterwards.
func (c *SizeConstrainedCache[K, V]) Add(key K, value V) (evicted bool) {
c.lock.Lock()
defer c.lock.Unlock()

// Unless it is already present, might need to evict something.
// OBS: If it is present, we still call Add internally to bump the recentness.
if !c.lru.Contains(key) {
targetSize := c.size + c.sizeFn(value)
for targetSize > c.maxSize {
evicted = true
_, v, ok := c.lru.RemoveOldest()
if !ok {
// list is now empty. Break
break
}
targetSize -= c.sizeFn(v)
}
c.size = targetSize
}

c.lru.Add(key, value)
if c.m != nil {
c.m.CacheAdd(c.label, c.lru.Len(), evicted)
}

return evicted
}

// Get looks up a key's value from the cache.
func (c *SizeConstrainedCache[K, V]) Get(key K) (V, bool) {
c.lock.Lock()
value, ok := c.lru.Get(key)
c.lock.Unlock()

if c.m != nil {
c.m.CacheGet(c.label, ok)
}
return value, ok
}
47 changes: 33 additions & 14 deletions op-node/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type EthClientConfig struct {

// cache sizes

// Number of blocks worth of receipts to cache
ReceiptsCacheSize int
// Number of blocks worth of transactions to cache
TransactionsCacheSize int
// Volume of blocks worth of receipts to cache
ReceiptsCacheVolumeByte int
// Volume of blocks worth of transactions to cache
TransactionsCacheVolumeByte int
// Number of block headers to cache
HeadersCacheSize int
// Number of payloads to cache
Expand All @@ -65,11 +65,11 @@ type EthClientConfig struct {
}

func (c *EthClientConfig) Check() error {
if c.ReceiptsCacheSize < 0 {
return fmt.Errorf("invalid receipts cache size: %d", c.ReceiptsCacheSize)
if c.ReceiptsCacheVolumeByte < 0 {
return fmt.Errorf("invalid receipts cache volume: %d", c.ReceiptsCacheVolumeByte)
}
if c.TransactionsCacheSize < 0 {
return fmt.Errorf("invalid transactions cache size: %d", c.TransactionsCacheSize)
if c.TransactionsCacheVolumeByte < 0 {
return fmt.Errorf("invalid transactions cache volume: %d", c.TransactionsCacheVolumeByte)
}
if c.HeadersCacheSize < 0 {
return fmt.Errorf("invalid headers cache size: %d", c.HeadersCacheSize)
Expand Down Expand Up @@ -106,11 +106,11 @@ type EthClient struct {
// cache receipts in bundles per block hash
// We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache
receiptsCache *caching.SizeConstrainedCache[common.Hash, *receiptsFetchingJob]

// cache transactions in bundles per block hash
// common.Hash -> types.Transactions
transactionsCache *caching.LRUCache
transactionsCache *caching.SizeConstrainedCache[common.Hash, types.Transactions]

// cache block headers of blocks by hash
// common.Hash -> *HeaderInfo
Expand Down Expand Up @@ -165,15 +165,29 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}
client = LimitRPC(client, config.MaxConcurrentRequests)
receiptsFetchingJobSizeFn := func(value any) (size int) {
job := value.(*receiptsFetchingJob)
for _, rec := range job.result {
size += int(rec.Size())
}
return
}
transactionsSizeFn := func(value any) (size int) {
transactions := value.(types.Transactions)
for _, tx := range transactions {
size += int(tx.Size())
}
return
}
return &EthClient{
client: client,
maxBatchSize: config.MaxRequestsPerBatch,
trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge,
provKind: config.RPCProviderKind,
log: log,
receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize),
receiptsCache: caching.NewSizeConstrainedCache[common.Hash, *receiptsFetchingJob](metrics, "receipts", config.ReceiptsCacheVolumeByte, receiptsFetchingJobSizeFn),
transactionsCache: caching.NewSizeConstrainedCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheVolumeByte, transactionsSizeFn),
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
Expand Down Expand Up @@ -318,7 +332,7 @@ func (s *EthClient) BSCInfoByLabel(ctx context.Context, label eth.BlockLabel) (e
func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) {
if header, ok := s.headersCache.Get(hash); ok {
if txs, ok := s.transactionsCache.Get(hash); ok {
return header.(eth.BlockInfo), txs.(types.Transactions), nil
return header.(eth.BlockInfo), txs, nil
}
}
return s.blockCall(ctx, "eth_getBlockByHash", hashID(hash))
Expand Down Expand Up @@ -366,12 +380,17 @@ func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (e
// The underlying fetcher uses the receipts hash to verify receipt integrity.
var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok {
job = v.(*receiptsFetchingJob)
job = v
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes)
_, err := job.Fetch(ctx)
if err != nil {
return nil, nil, err
}
s.receiptsCache.Add(blockHash, job)
}

receipts, err := job.Fetch(ctx)
if err != nil {
return nil, nil, err
Expand Down
18 changes: 9 additions & 9 deletions op-node/sources/eth_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func (m *mockRPC) Close() {
var _ client.RPC = (*mockRPC)(nil)

var testEthClientConfig = &EthClientConfig{
ReceiptsCacheSize: 10,
TransactionsCacheSize: 10,
HeadersCacheSize: 10,
PayloadsCacheSize: 10,
MaxRequestsPerBatch: 20,
MaxConcurrentRequests: 10,
TrustRPC: false,
MustBePostMerge: false,
RPCProviderKind: RPCKindBasic,
ReceiptsCacheVolumeByte: 10 * 1024 * 1024,
TransactionsCacheVolumeByte: 10 * 1024 * 1024,
HeadersCacheSize: 10,
PayloadsCacheSize: 10,
MaxRequestsPerBatch: 20,
MaxConcurrentRequests: 10,
TrustRPC: false,
MustBePostMerge: false,
RPCProviderKind: RPCKindBasic,
}

func randHash() (out common.Hash) {
Expand Down
20 changes: 10 additions & 10 deletions op-node/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
return &L1ClientConfig{
EthClientConfig: EthClientConfig{
// receipts and transactions are cached per block
ReceiptsCacheSize: span,
TransactionsCacheSize: span,
HeadersCacheSize: span,
PayloadsCacheSize: span,
MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 20,
TrustRPC: trustRPC,
MustBePostMerge: false,
RPCProviderKind: kind,
MethodResetDuration: time.Minute,
ReceiptsCacheVolumeByte: span * 1024 * 1024,
TransactionsCacheVolumeByte: span * 1024 * 1024,
HeadersCacheSize: span,
PayloadsCacheSize: span,
MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 20,
TrustRPC: trustRPC,
MustBePostMerge: false,
RPCProviderKind: kind,
MethodResetDuration: time.Minute,
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan,
Expand Down
20 changes: 10 additions & 10 deletions op-node/sources/l2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig
return &L2ClientConfig{
EthClientConfig: EthClientConfig{
// receipts and transactions are cached per block
ReceiptsCacheSize: span,
TransactionsCacheSize: span,
HeadersCacheSize: span,
PayloadsCacheSize: span,
MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 10,
TrustRPC: trustRPC,
MustBePostMerge: true,
RPCProviderKind: RPCKindBasic,
MethodResetDuration: time.Minute,
ReceiptsCacheVolumeByte: span * 1024 * 1024,
TransactionsCacheVolumeByte: span * 1024 * 1024,
HeadersCacheSize: span,
PayloadsCacheSize: span,
MaxRequestsPerBatch: 20, // TODO: tune batch param
MaxConcurrentRequests: 10,
TrustRPC: trustRPC,
MustBePostMerge: true,
RPCProviderKind: RPCKindBasic,
MethodResetDuration: time.Minute,
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L2BlockRefsCacheSize: fullSpan,
Expand Down
20 changes: 10 additions & 10 deletions op-node/sources/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) {
cl := rpc.DialInProc(srv)
testCfg := &EthClientConfig{
// receipts and transactions are cached per block
ReceiptsCacheSize: 1000,
TransactionsCacheSize: 1000,
HeadersCacheSize: 1000,
PayloadsCacheSize: 1000,
MaxRequestsPerBatch: 20,
MaxConcurrentRequests: 10,
TrustRPC: false,
MustBePostMerge: false,
RPCProviderKind: tc.providerKind,
MethodResetDuration: time.Minute,
ReceiptsCacheVolumeByte: 1000 * 1024 * 1024,
TransactionsCacheVolumeByte: 1000 * 1024 * 1024,
HeadersCacheSize: 1000,
PayloadsCacheSize: 1000,
MaxRequestsPerBatch: 20,
MaxConcurrentRequests: 10,
TrustRPC: false,
MustBePostMerge: false,
RPCProviderKind: tc.providerKind,
MethodResetDuration: time.Minute,
}
if tc.staticMethod { // if static, instantly reset, for fast clock-independent testing
testCfg.MethodResetDuration = 0
Expand Down

0 comments on commit 40dd44d

Please sign in to comment.