From 4e5a699dd3113584e472073e781c7fbdbe228956 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 2 Aug 2022 19:20:53 +0200 Subject: [PATCH] eth/filters: add global logs cache This adds a cache for block logs which is shared by all filters. In order to share the cache reference, filters now need to be created through a 'filter system' object. --- accounts/abi/bind/backends/simulated.go | 31 +++++----- cmd/geth/config.go | 20 ++++++- cmd/utils/flags.go | 6 +- core/blockchain.go | 5 -- core/blockchain_reader.go | 17 ------ eth/api_backend.go | 4 +- eth/backend.go | 5 -- eth/filters/api.go | 20 +++---- eth/filters/bench_test.go | 16 +++-- eth/filters/filter.go | 61 ++++++++++++-------- eth/filters/filter_system.go | 46 ++++++++++++++- eth/filters/filter_system_test.go | 77 +++++++++++-------------- eth/filters/filter_test.go | 20 +++---- graphql/graphql.go | 7 ++- graphql/graphql_test.go | 9 ++- graphql/service.go | 9 +-- internal/ethapi/backend.go | 13 ++--- les/api_backend.go | 7 +-- les/client.go | 4 -- 19 files changed, 205 insertions(+), 172 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index cd14afa14755..d03ea760483d 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -68,7 +68,8 @@ type SimulatedBackend struct { pendingState *state.StateDB // Currently pending state that will be the active on request pendingReceipts types.Receipts // Currently receipts for the pending block - events *filters.EventSystem // Event system for filtering log events live + events *filters.EventSystem // for filtering log events live + filterSystem *filters.FilterSystem // for filtering database logs config *params.ChainConfig } @@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis blockchain: blockchain, config: genesis.Config, } - backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false) + + filterBackend := &filterBackend{database, blockchain, backend} + backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) + backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.rollback(blockchain.CurrentBlock()) return backend } @@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter var filter *filters.Filter if query.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics) + filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics) } else { // Initialize unset filter boundaries to run from genesis to chain head from := int64(0) @@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter to = query.ToBlock.Int64() } // Construct the range filter - filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics) + filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -827,7 +832,8 @@ type filterBackend struct { backend *SimulatedBackend } -func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db } +func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db } + func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") } func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) { @@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil } -func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - number := rawdb.ReadHeaderNumber(fb.db, hash) - if number == nil { - return nil, nil - } - receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()) - if receipts == nil { - return nil, nil - } - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config()) return logs, nil } diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 2562de8ae9ea..2319a0e7972b 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -32,13 +32,16 @@ import ( "github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/naoina/toml" ) @@ -163,7 +166,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name) cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override } + backend, eth := utils.RegisterEthService(stack, &cfg.Eth) + // Warn users to migrate if they have a legacy freezer format. if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) { firstIdx := uint64(0) @@ -181,10 +186,21 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.") } } - // Configure GraphQL if requested + + // Enable filter RPC API. + filterBackend := backend.(filters.Backend) + filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{}) + isLightClient := cfg.Eth.SyncMode == downloader.LightSync + stack.RegisterAPIs([]rpc.API{{ + Namespace: "eth", + Service: filters.NewFilterAPI(filterSystem, isLightClient), + }}) + + // Configure GraphQL if requested. if ctx.IsSet(utils.GraphQLEnabledFlag.Name) { - utils.RegisterGraphQLService(stack, backend, cfg.Node) + utils.RegisterGraphQLService(stack, backend, filterSystem, cfg.Node) } + // Add the Ethereum Stats daemon if requested. if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ff85e259aea3..1c101917783a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -43,6 +43,7 @@ import ( ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" @@ -2014,8 +2015,9 @@ func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url strin } // RegisterGraphQLService is a utility function to construct a new service and register it against a node. -func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) { - if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil { +func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg node.Config) { + err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts) + if err != nil { Fatalf("Failed to register the GraphQL service: %v", err) } } diff --git a/core/blockchain.go b/core/blockchain.go index 22f6a97b1c11..506034b539a7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -88,7 +88,6 @@ const ( bodyCacheLimit = 256 blockCacheLimit = 256 receiptsCacheLimit = 32 - logsCacheLimit = 32 txLookupCacheLimit = 1024 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 @@ -199,7 +198,6 @@ type BlockChain struct { bodyCache *lru.Cache // Cache for the most recent block bodies bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format receiptsCache *lru.Cache // Cache for the most recent receipts per block - logsCache *lru.Cache // Cache for the most recent logs per block blockCache *lru.Cache // Cache for the most recent entire blocks txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing @@ -227,7 +225,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) receiptsCache, _ := lru.New(receiptsCacheLimit) - logsCache, _ := lru.New(logsCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) @@ -247,7 +244,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, - logsCache: logsCache, blockCache: blockCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, @@ -685,7 +681,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.receiptsCache.Purge() - bc.logsCache.Purge() bc.blockCache.Purge() bc.txLookupCache.Purge() bc.futureBlocks.Purge() diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index dc3fd5b22df8..96e9f80b6aac 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -222,23 +222,6 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { return receipts } -// GetLogsByHash retrieves the logs for all transactions in a given block. -func (bc *BlockChain) GetLogsByHash(hash common.Hash) [][]*types.Log { - if logs, ok := bc.logsCache.Get(hash); ok { - return logs.([][]*types.Log) - } - number := rawdb.ReadHeaderNumber(bc.db, hash) - if number == nil { - return nil - } - logs := rawdb.ReadLogs(bc.db, hash, *number, bc.chainConfig) - if logs == nil { - return nil - } - bc.logsCache.Add(hash, logs) - return logs -} - // GetUnclesInChain retrieves all the uncles from a given block backwards until // a specific distance is reached. func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header { diff --git a/eth/api_backend.go b/eth/api_backend.go index eb312e89d8c9..00ecacc31df7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -201,8 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type return b.eth.blockchain.GetReceiptsByHash(hash), nil } -func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - return b.eth.blockchain.GetLogsByHash(hash), nil +func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil } func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { diff --git a/eth/backend.go b/eth/backend.go index ebe7001c7994..778207636344 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -25,7 +25,6 @@ import ( "strings" "sync" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -41,7 +40,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), - }, { - Namespace: "eth", - Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute), }, { Namespace: "admin", Service: NewAdminAPI(s), diff --git a/eth/filters/api.go b/eth/filters/api.go index 07714791d263..98566d4a01bd 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -46,7 +46,7 @@ type filter struct { // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type FilterAPI struct { - backend Backend + sys *FilterSystem events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter @@ -54,14 +54,14 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI { +func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { api := &FilterAPI{ - backend: backend, - events: NewEventSystem(backend, lightMode), + sys: system, + events: NewEventSystem(system, lightMode), filters: make(map[rpc.ID]*filter), - timeout: timeout, + timeout: system.cfg.Timeout, } - go api.timeoutLoop(timeout) + go api.timeoutLoop(system.cfg.Timeout) return api } @@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type var filter *Filter if crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type end = crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo var filter *Filter if f.crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo end = f.crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 694d73735028..73b96b77af62 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -122,22 +122,27 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { b.Log("Running filter benchmarks...") start = time.Now() - var backend *testBackend + var ( + backend *testBackend + sys *FilterSystem + ) for i := 0; i < benchFilterCnt; i++ { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false) backend = &testBackend{db: db, sections: cnt} + sys = NewFilterSystem(backend, Config{}) } var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { - b.Error("filter.Find error:", err) + b.Error("filter.Logs error:", err) } } + d = time.Since(start) b.Log("Finished running filter benchmarks") b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") @@ -171,10 +176,11 @@ func BenchmarkNoBloomBits(b *testing.B) { clearBloomBits(db) + _, sys := newTestFilterSystem(b, db, Config{}) + b.Log("Running filter benchmarks...") start := time.Now() - backend := &testBackend{db: db} - filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil) + filter := sys.NewRangeFilter(0, int64(*headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) b.Log("Finished running filter benchmarks") diff --git a/eth/filters/filter.go b/eth/filters/filter.go index b009b90b5b0f..53686f55e8aa 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) - GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) + GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) PendingBlockAndReceipts() (*types.Block, types.Receipts) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription @@ -49,9 +49,27 @@ type Backend interface { ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } +// cachedGetLogs loads block logs from the backend and caches the result. +func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { + cached, ok := sys.logsCache.Get(blockHash) + if ok { + return cached.([][]*types.Log), nil + } + + logs, err := sys.backend.GetLogs(ctx, blockHash, number) + if err != nil { + return nil, err + } + if logs == nil { + return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) + } + sys.logsCache.Add(blockHash, logs) + return logs, nil +} + // Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend + sys *FilterSystem db ethdb.Database addresses []common.Address @@ -65,7 +83,7 @@ type Filter struct { // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. -func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -84,10 +102,10 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres } filters = append(filters, filter) } - size, _ := backend.BloomStatus() + size, _ := sys.backend.BloomStatus() // Create a generic filter and convert it into a range filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.matcher = bloombits.NewMatcher(size, filters) filter.begin = begin @@ -98,21 +116,21 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres // NewBlockFilter creates a new filter which directly inspects the contents of // a block to figure out whether it is interesting or not. -func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.block = block return filter } // newFilter creates a generic filter that can either filter based on a block hash, // or based on range queries. The search criteria needs to be explicitly set. -func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { +func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ - backend: backend, + sys: sys, addresses: addresses, topics: topics, - db: backend.ChainDb(), + db: sys.backend.ChainDb(), } } @@ -121,7 +139,7 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return if f.block != (common.Hash{}) { - header, err := f.backend.HeaderByHash(ctx, f.block) + header, err := f.sys.backend.HeaderByHash(ctx, f.block) if err != nil { return nil, err } @@ -138,7 +156,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.pendingLogs() } // Figure out the limits of the filter range - header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } @@ -157,7 +175,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { var ( logs []*types.Log err error - size, sections = f.backend.BloomStatus() + size, sections = f.sys.backend.BloomStatus() ) if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { @@ -193,7 +211,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err } defer session.Close() - f.backend.ServiceFilter(ctx, session) + f.sys.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log @@ -212,7 +230,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err f.begin = int64(number) + 1 // Retrieve the suggested block and pull any truly matching logs - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } @@ -234,7 +252,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } @@ -262,14 +280,11 @@ func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*t // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { - // Get the logs of the block - logsList, err := f.backend.GetLogs(ctx, header.Hash()) + logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - if logsList == nil { - return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", header.Number.Uint64(), header.Hash().TerminalString()) - } + var unfiltered []*types.Log for _, logs := range logsList { unfiltered = append(unfiltered, logs...) @@ -278,7 +293,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client if logs[0].TxHash == (common.Hash{}) { - receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } @@ -295,7 +310,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ // pendingLogs returns the logs matching the filter criteria within the pending block. func (f *Filter) pendingLogs() ([]*types.Log, error) { - block, receipts := f.backend.PendingBlockAndReceipts() + block, receipts := f.sys.backend.PendingBlockAndReceipts() if bloomFilter(block.Bloom(), f.addresses, f.topics) { var unfiltered []*types.Log for _, r := range receipts { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index c1a1b408b7a7..bcfa87d2c6c5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -32,8 +32,46 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" + lru "github.com/hashicorp/golang-lru" ) +// Config represents the configuration of the filter system. +type Config struct { + LogCacheSize int // maximum number of cached blocks (default: 32) + Timeout time.Duration // how long filters stay active (default: 5min) +} + +func (cfg Config) withDefaults() Config { + if cfg.Timeout == 0 { + cfg.Timeout = 5 * time.Minute + } + if cfg.LogCacheSize == 0 { + cfg.LogCacheSize = 32 + } + return cfg +} + +// FilterSystem holds resources shared by all filters. +type FilterSystem struct { + backend Backend + logsCache *lru.Cache + cfg *Config +} + +func NewFilterSystem(backend Backend, config Config) *FilterSystem { + config = config.withDefaults() + + cache, err := lru.New(config.LogCacheSize) + if err != nil { + panic(err) + } + return &FilterSystem{ + backend: backend, + logsCache: cache, + cfg: &config, + } +} + // Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. type Type byte @@ -84,6 +122,7 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { backend Backend + sys *FilterSystem lightMode bool lastHead *types.Header @@ -110,9 +149,10 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m := &EventSystem{ - backend: backend, + sys: sys, + backend: sys.backend, lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), @@ -405,7 +445,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // Get the logs of the block ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - logsList, err := es.backend.GetLogs(ctx, header.Hash()) + logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c7fc4331b222..51bda29b4244 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -39,10 +39,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -var ( - deadline = 5 * time.Minute -) - type testBackend struct { db ethdb.Database sections uint64 @@ -91,17 +87,8 @@ func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types. return nil, nil } -func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - number := rawdb.ReadHeaderNumber(b.db, hash) - if number == nil { - return nil, nil - } - receipts := rawdb.ReadReceipts(b.db, hash, *number, params.TestChainConfig) - - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(b.db, hash, number, params.TestChainConfig) return logs, nil } @@ -160,6 +147,12 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { + backend := &testBackend{db: db} + sys := NewFilterSystem(backend, cfg) + return backend, sys +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -169,12 +162,12 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) - genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) - chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) + genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) for _, blk := range chain { @@ -221,9 +214,9 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -276,9 +269,9 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) testCases = []struct { crit FilterCriteria @@ -323,9 +316,9 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) ) // different situations where log filter creation should fail. @@ -346,8 +339,8 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -370,9 +363,9 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -484,9 +477,9 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -668,10 +661,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) { timeout := 100 * time.Millisecond var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, timeout) - done = make(chan struct{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) + api = NewFilterAPI(sys, false) + done = make(chan struct{}) ) go func() { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index ae4f069048f3..3860cae69723 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -44,7 +44,7 @@ func BenchmarkFilters(b *testing.B) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false) - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(b, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) @@ -82,7 +82,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -97,7 +97,7 @@ func TestFilters(t *testing.T) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false) - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(t, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -161,14 +161,14 @@ func TestFilters(t *testing.T) { rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) } - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -177,7 +177,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -186,7 +186,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -194,7 +194,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -202,14 +202,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) + filter = sys.NewRangeFilter(0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { diff --git a/graphql/graphql.go b/graphql/graphql.go index 0949c34803cf..04af530d0c3d 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -978,7 +978,7 @@ func (b *Block) Logs(ctx context.Context, args struct{ Filter BlockFilterCriteri hash = header.Hash() } // Construct the range filter - filter := filters.NewBlockFilter(b.r.backend, hash, addresses, topics) + filter := b.r.filterSystem.NewBlockFilter(hash, addresses, topics) // Run the filter and return all the logs return runFilter(ctx, b.r, filter) @@ -1137,7 +1137,8 @@ func (p *Pending) EstimateGas(ctx context.Context, args struct { // Resolver is the top-level object in the GraphQL hierarchy. type Resolver struct { - backend ethapi.Backend + backend ethapi.Backend + filterSystem *filters.FilterSystem } func (r *Resolver) Block(ctx context.Context, args struct { @@ -1284,7 +1285,7 @@ func (r *Resolver) Logs(ctx context.Context, args struct{ Filter FilterCriteria topics = *args.Filter.Topics } // Construct the range filter - filter := filters.NewRangeFilter(r.backend, begin, end, addresses, topics) + filter := r.filterSystem.NewRangeFilter(begin, end, addresses, topics) return runFilter(ctx, r, filter) } diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index 4b7f7bf96021..d55f4e063486 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" @@ -50,7 +51,7 @@ func TestBuildSchema(t *testing.T) { } defer stack.Close() // Make sure the schema can be parsed and matched up to the object model. - if err := newHandler(stack, nil, []string{}, []string{}); err != nil { + if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { t.Errorf("Could not construct GraphQL handler: %v", err) } } @@ -263,7 +264,8 @@ func createGQLService(t *testing.T, stack *node.Node) { t.Fatalf("could not create import blocks: %v", err) } // create gql service - err = New(stack, ethBackend.APIBackend, []string{}, []string{}) + filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) + err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } @@ -348,7 +350,8 @@ func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { t.Fatalf("could not create import blocks: %v", err) } // create gql service - err = New(stack, ethBackend.APIBackend, []string{}, []string{}) + filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) + err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } diff --git a/graphql/service.go b/graphql/service.go index 1a2ffaa9469d..019026bc7ea7 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -20,6 +20,7 @@ import ( "encoding/json" "net/http" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/node" "github.com/graph-gophers/graphql-go" @@ -55,14 +56,14 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // New constructs a new GraphQL service instance. -func New(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error { - return newHandler(stack, backend, cors, vhosts) +func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { + return newHandler(stack, backend, filterSystem, cors, vhosts) } // newHandler returns a new `http.Handler` that will answer GraphQL queries. // It additionally exports an interactive query browser on the / endpoint. -func newHandler(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error { - q := Resolver{backend} +func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { + q := Resolver{backend, filterSystem} s, err := graphql.ParseSchema(schema, &q) if err != nil { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d13547f234a3..fa42139dded6 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -84,16 +83,12 @@ type Backend interface { TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription - // Filter API - BloomStatus() (uint64, uint64) - GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) - SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription - ChainConfig() *params.ChainConfig Engine() consensus.Engine + + // eth/filters needs to be initialized from this backend type, so methods needed by + // it must also be included here. + // filters.Backend } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/les/api_backend.go b/les/api_backend.go index 11a9ca128aab..5b4213134b24 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -168,11 +168,8 @@ func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (type return nil, nil } -func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return light.GetBlockLogs(ctx, b.eth.odr, hash, *number) - } - return nil, nil +func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return light.GetBlockLogs(ctx, b.eth.odr, hash, number) } func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { diff --git a/les/client.go b/les/client.go index 7caaf2c18a58..6504fe2af8f6 100644 --- a/les/client.go +++ b/les/client.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -298,9 +297,6 @@ func (s *LightEthereum) APIs() []rpc.API { }, { Namespace: "eth", Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), - }, { - Namespace: "eth", - Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute), }, { Namespace: "net", Service: s.netRPCService,