From b1228c74655b757d5b3677fa5a23502f4146adeb Mon Sep 17 00:00:00 2001 From: JukLee0ira Date: Thu, 11 Jul 2024 19:52:13 +0800 Subject: [PATCH] all: add global block logs cache (25459) --- accounts/abi/bind/backends/simulated.go | 32 ++++---- cmd/XDC/main.go | 1 + cmd/utils/flags.go | 24 ++++++ cmd/utils/utils.go | 3 +- core/chain_makers.go | 9 +++ eth/api_backend.go | 13 +--- eth/backend.go | 3 +- eth/ethconfig/config.go | 16 ++-- eth/ethconfig/gen_config.go | 6 ++ eth/filters/api.go | 22 +++--- eth/filters/bench_test.go | 16 ++-- eth/filters/filter.go | 97 +++++++++++-------------- eth/filters/filter_system.go | 86 +++++++++++++++++++++- eth/filters/filter_system_test.go | 74 +++++++++---------- eth/filters/filter_test.go | 24 +++--- internal/ethapi/backend.go | 5 ++ les/api_backend.go | 4 +- les/backend.go | 2 +- node/node.go | 16 ++++ 19 files changed, 289 insertions(+), 164 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 622d917d151f..71b44b15c741 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -66,7 +66,8 @@ type SimulatedBackend struct { pendingState *state.StateDB // Currently pending state that will be the active on request pendingReceipts types.Receipts // Currently receipts for the pending block - events *filters.EventSystem // Event system for filtering log events live + events *filters.EventSystem // for filtering log events live + filterSystem *filters.FilterSystem // for filtering database logs config *params.ChainConfig } @@ -95,9 +96,7 @@ func SimulateWalletAddressAndSignFn() (common.Address, func(account accounts.Acc // XDC simulated backend for testing purpose. func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfig *params.ChainConfig) *SimulatedBackend { - // database := ethdb.NewMemDatabase() database := rawdb.NewMemoryDatabase() - genesis := core.Genesis{ GasLimit: gasLimit, // need this big, support initial smart contract Config: chainConfig, @@ -128,7 +127,11 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi blockchain: blockchain, config: genesis.Config, } - backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false) + + filterBackend := &filterBackend{database, blockchain, backend} + backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) + backend.events = filters.NewEventSystem(backend.filterSystem, false) + blockchain.Client = backend backend.rollback() return backend @@ -148,7 +151,11 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend { blockchain: blockchain, config: genesis.Config, } - backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false) + + filterBackend := &filterBackend{database, blockchain, backend} + backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) + backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.rollback() return backend } @@ -422,7 +429,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt var filter *filters.Filter if query.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics) + filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics) } else { // Initialize unset filter boundaried to run from genesis to chain head from := int64(0) @@ -434,7 +441,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt to = query.ToBlock.Int64() } // Construct the range filter - filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics) + filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -551,15 +558,8 @@ func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts return fb.backend.pendingBlock, fb.backend.pendingReceipts } -func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)) - if receipts == nil { - return nil, nil - } - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(fb.db, hash, number) return logs, nil } diff --git a/cmd/XDC/main.go b/cmd/XDC/main.go index 3a028fe180aa..ad8c4fd13a44 100644 --- a/cmd/XDC/main.go +++ b/cmd/XDC/main.go @@ -93,6 +93,7 @@ var ( //utils.CacheDatabaseFlag, //utils.CacheGCFlag, //utils.TrieCacheGenFlag, + utils.CacheLogSizeFlag, utils.ListenPortFlag, utils.MaxPeersFlag, utils.MaxPendingPeersFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ae845a4469a4..589053f3ceb0 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -40,8 +40,10 @@ import ( "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/eth/downloader" "github.com/XinFinOrg/XDPoSChain/eth/ethconfig" + "github.com/XinFinOrg/XDPoSChain/eth/filters" "github.com/XinFinOrg/XDPoSChain/eth/gasprice" "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/internal/ethapi" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/metrics/exp" @@ -52,6 +54,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/p2p/nat" "github.com/XinFinOrg/XDPoSChain/p2p/netutil" "github.com/XinFinOrg/XDPoSChain/params" + "github.com/XinFinOrg/XDPoSChain/rpc" whisper "github.com/XinFinOrg/XDPoSChain/whisper/whisperv6" "gopkg.in/urfave/cli.v1" ) @@ -313,6 +316,11 @@ var ( Usage: "Percentage of cache memory allowance to use for trie pruning", Value: 25, } + CacheLogSizeFlag = &cli.IntFlag{ + Name: "cache.blocklogs", + Usage: "Size (in number of blocks) of the log cache for filtering", + Value: ethconfig.Defaults.FilterLogCacheSize, + } // Miner settings StakingEnabledFlag = cli.BoolFlag{ Name: "mine", @@ -1208,6 +1216,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(GasPriceFlag.Name) { cfg.GasPrice = GlobalBig(ctx, GasPriceFlag.Name) } + if ctx.IsSet(CacheLogSizeFlag.Name) { + cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name) + } if ctx.GlobalIsSet(VMEnableDebugFlag.Name) { // TODO(fjl): force-enable this in --dev mode cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name) @@ -1407,6 +1418,19 @@ func WalkMatch(root, pattern string) ([]string, error) { return matches, nil } +// RegisterFilterAPI adds the eth log filtering RPC API to the node. +func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem { + isLightClient := ethcfg.SyncMode == downloader.LightSync + filterSystem := filters.NewFilterSystem(backend, filters.Config{ + LogCacheSize: ethcfg.FilterLogCacheSize, + }) + stack.RegisterAPIs([]rpc.API{{ + Namespace: "eth", + Service: filters.NewFilterAPI(filterSystem, isLightClient), + }}) + return filterSystem +} + func SetupMetrics(ctx *cli.Context) { if metrics.Enabled { log.Info("Enabling metrics collection") diff --git a/cmd/utils/utils.go b/cmd/utils/utils.go index 406d15ebfc83..a329fc8c4d2e 100644 --- a/cmd/utils/utils.go +++ b/cmd/utils/utils.go @@ -47,8 +47,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) { } } -// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to -// th egiven node. +// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node. func RegisterEthStatsService(stack *node.Node, url string) { if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { // Retrieve both eth and les services diff --git a/core/chain_makers.go b/core/chain_makers.go index 348d68d1687a..15e87c88378d 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -113,6 +113,15 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) { } } +// AddUncheckedTx forcefully adds a transaction to the block without any +// validation. +// +// AddUncheckedTx will cause consensus failures when used during real +// chain processing. This is best used in conjunction with raw block insertion. +func (b *BlockGen) AddUncheckedTx(tx *types.Transaction) { + b.txs = append(b.txs, tx) +} + // Number returns the block number of the block being generated. func (b *BlockGen) Number() *big.Int { return new(big.Int).Set(b.header.Number) diff --git a/eth/api_backend.go b/eth/api_backend.go index 299c7a9f79bc..b2aaf2fbe8b0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -234,17 +234,8 @@ func (b *EthApiBackend) GetReceipts(ctx context.Context, blockHash common.Hash) return core.GetBlockReceipts(b.eth.chainDb, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash)), nil } -func (b *EthApiBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { - db := b.eth.ChainDb() - number := rawdb.ReadHeaderNumber(db, blockHash) - if number == nil { - return nil, errors.New("failed to get block number from hash") - } - logs := rawdb.ReadLogs(db, blockHash, *number) - if logs == nil { - return nil, errors.New("failed to get logs for block") - } - return logs, nil +func (b *EthApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return rawdb.ReadLogs(b.eth.chainDb, hash, number), nil } func (b *EthApiBackend) GetTd(blockHash common.Hash) *big.Int { diff --git a/eth/backend.go b/eth/backend.go index 9408ebe61de9..08c9ec33fa6e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -24,7 +24,6 @@ import ( "runtime" "sync" "sync/atomic" - "time" "github.com/XinFinOrg/XDPoSChain/XDCx" "github.com/XinFinOrg/XDPoSChain/XDCxlending" @@ -401,7 +400,7 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Version: "1.0", - Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute), + Service: filters.NewFilterAPI(filters.NewFilterSystem(s.ApiBackend, filters.Config{LogCacheSize: s.config.FilterLogCacheSize}), true), Public: true, }, { Namespace: "admin", diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index b246a9b332a6..73cc4e7cddbe 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -60,12 +60,13 @@ var Defaults = Config{ DatasetsInMem: 1, DatasetsOnDisk: 2, }, - NetworkId: 88, - LightPeers: 100, - DatabaseCache: 768, - TrieCache: 256, - TrieTimeout: 5 * time.Minute, - GasPrice: big.NewInt(0.25 * params.Shannon), + NetworkId: 88, + LightPeers: 100, + DatabaseCache: 768, + TrieCache: 256, + TrieTimeout: 5 * time.Minute, + FilterLogCacheSize: 32, + GasPrice: big.NewInt(0.25 * params.Shannon), TxPool: core.DefaultTxPoolConfig, RPCGasCap: 25000000, @@ -111,6 +112,9 @@ type Config struct { TrieCache int TrieTimeout time.Duration + // This is the number of blocks for which logs will be cached in the filter system. + FilterLogCacheSize int + // Mining-related options Etherbase common.Address `toml:",omitempty"` MinerThreads int `toml:",omitempty"` diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index d24b3251c05b..6b27542f1933 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -31,6 +31,7 @@ func (c Config) MarshalTOML() (interface{}, error) { MinerThreads int `toml:",omitempty"` ExtraData []byte `toml:",omitempty"` GasPrice *big.Int + FilterLogCacheSize int Ethash ethash.Config TxPool core.TxPoolConfig GPO gasprice.Config @@ -55,6 +56,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.MinerThreads = c.MinerThreads enc.ExtraData = c.ExtraData enc.GasPrice = c.GasPrice + enc.FilterLogCacheSize = c.FilterLogCacheSize enc.Ethash = c.Ethash enc.TxPool = c.TxPool enc.GPO = c.GPO @@ -83,6 +85,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { MinerThreads *int `toml:",omitempty"` ExtraData []byte `toml:",omitempty"` GasPrice *big.Int + FilterLogCacheSize *int Ethash *ethash.Config TxPool *core.TxPoolConfig GPO *gasprice.Config @@ -140,6 +143,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.GasPrice != nil { c.GasPrice = dec.GasPrice } + if dec.FilterLogCacheSize != nil { + c.FilterLogCacheSize = *dec.FilterLogCacheSize + } if dec.Ethash != nil { c.Ethash = *dec.Ethash } diff --git a/eth/filters/api.go b/eth/filters/api.go index 108feaf3e962..6196eb67cc18 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -54,7 +54,7 @@ type filter struct { // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type FilterAPI struct { - backend Backend + sys *FilterSystem chainDb ethdb.Database events *EventSystem filtersMu sync.Mutex @@ -63,15 +63,15 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI { +func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { api := &FilterAPI{ - backend: backend, - chainDb: backend.ChainDb(), - events: NewEventSystem(backend, lightMode), + sys: system, + chainDb: system.backend.ChainDb(), + events: NewEventSystem(system, lightMode), filters: make(map[rpc.ID]*filter), - timeout: timeout, + timeout: system.cfg.Timeout, } - go api.timeoutLoop(timeout) + go api.timeoutLoop(system.cfg.Timeout) return api } @@ -341,7 +341,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type var filter *Filter if crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -353,7 +353,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type end = crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -396,7 +396,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo var filter *Filter if f.crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -408,7 +408,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo end = f.crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 9c9aa5a80a3c..f465d67f7069 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -123,20 +123,25 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { b.Log("Running filter benchmarks...") start = time.Now() - var backend *testBackend + + var ( + backend *testBackend + sys *FilterSystem + ) for i := 0; i < benchFilterCnt; i++ { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") backend = &testBackend{db: db, sections: cnt} + sys = NewFilterSystem(backend, Config{}) } var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { - b.Error("filter.Find error:", err) + b.Error("filter.Logs error:", err) } } d = time.Since(start) @@ -186,10 +191,11 @@ func BenchmarkNoBloomBits(b *testing.B) { clearBloomBits(db) + _, sys := newTestFilterSystem(b, db, Config{}) + b.Log("Running filter benchmarks...") start := time.Now() - backend := &testBackend{db: db} - filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil) + filter := sys.NewRangeFilter(0, int64(headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) b.Log("Finished running filter benchmarks") diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 52b5eedb6f2d..1fa21bb7e0d9 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -22,37 +22,15 @@ import ( "math/big" "github.com/XinFinOrg/XDPoSChain/common" - "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" "github.com/XinFinOrg/XDPoSChain/core/types" - "github.com/XinFinOrg/XDPoSChain/ethdb" - "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/rpc" ) -type Backend interface { - ChainDb() ethdb.Database - HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) - HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) - GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) - GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) - - SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription - SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription - SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription - SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription - - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) -} - // Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend + sys *FilterSystem - db ethdb.Database addresses []common.Address topics [][]common.Hash @@ -64,7 +42,7 @@ type Filter struct { // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. -func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -83,10 +61,10 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres } filters = append(filters, filter) } - size, _ := backend.BloomStatus() + size, _ := sys.backend.BloomStatus() // Create a generic filter and convert it into a range filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.matcher = bloombits.NewMatcher(size, filters) filter.begin = begin @@ -97,21 +75,20 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres // NewBlockFilter creates a new filter which directly inspects the contents of // a block to figure out whether it is interesting or not. -func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.block = block return filter } // newFilter creates a generic filter that can either filter based on a block hash, // or based on range queries. The search criteria needs to be explicitly set. -func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { +func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ - backend: backend, + sys: sys, addresses: addresses, topics: topics, - db: backend.ChainDb(), } } @@ -120,14 +97,14 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return if f.block != (common.Hash{}) { - header, err := f.backend.HeaderByHash(ctx, f.block) + header, err := f.sys.backend.HeaderByHash(ctx, f.block) if err != nil { return nil, err } if header == nil { return nil, errors.New("unknown block") } - return f.blockLogs(ctx, header) + return f.blockLogs(ctx, header, false) } // Short-cut if all we care about is pending logs if f.begin == rpc.PendingBlockNumber.Int64() { @@ -137,7 +114,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.pendingLogs() } // Figure out the limits of the filter range - header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } @@ -156,7 +133,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { var ( logs []*types.Log err error - size, sections = f.backend.BloomStatus() + size, sections = f.sys.backend.BloomStatus() ) if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { @@ -192,7 +169,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err } defer session.Close() - f.backend.ServiceFilter(ctx, session) + f.sys.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log @@ -211,11 +188,11 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err f.begin = int64(number) + 1 // Retrieve the suggested block and pull any truly matching logs - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } - found, err := f.checkMatches(ctx, header) + found, err := f.blockLogs(ctx, header, true) if err != nil { return logs, err } @@ -233,11 +210,11 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header) + found, err := f.blockLogs(ctx, header, false) if err != nil { return logs, err } @@ -247,34 +224,34 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e } // blockLogs returns the logs matching the filter criteria within a single block. -func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { - if bloomFilter(header.Bloom, f.addresses, f.topics) { - found, err := f.checkMatches(ctx, header) +func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) { + // Fast track: no filtering criteria + if len(f.addresses) == 0 && len(f.topics) == 0 { + list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { - return logs, err + return nil, err } - logs = append(logs, found...) + return flatten(list), nil + } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) { + return f.checkMatches(ctx, header) } - return logs, nil + return nil, nil } // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. -func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { - // Get the logs of the block - logsList, err := f.backend.GetLogs(ctx, header.Hash()) +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) { + logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - var unfiltered []*types.Log - for _, logs := range logsList { - unfiltered = append(unfiltered, logs...) - } - logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + + unfiltered := flatten(logsList) + logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client if logs[0].TxHash == (common.Hash{}) { - receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } @@ -291,7 +268,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ // pendingLogs returns the logs matching the filter criteria within the pending block. func (f *Filter) pendingLogs() ([]*types.Log, error) { - block, receipts := f.backend.PendingBlockAndReceipts() + block, receipts := f.sys.backend.PendingBlockAndReceipts() if bloomFilter(block.Bloom(), f.addresses, f.topics) { var unfiltered []*types.Log for _, r := range receipts { @@ -376,3 +353,11 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } + +func flatten(list [][]*types.Log) []*types.Log { + var flat []*types.Log + for _, logs := range list { + flat = append(flat, logs...) + } + return flat +} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 921797acb6cf..78f77a89e116 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -21,18 +21,96 @@ package filters import ( "context" "errors" + "fmt" "sync" "time" ethereum "github.com/XinFinOrg/XDPoSChain" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/bloombits" "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/rpc" + lru "github.com/hashicorp/golang-lru" ) +// Config represents the configuration of the filter system. +type Config struct { + LogCacheSize int // maximum number of cached blocks (default: 32) + Timeout time.Duration // how long filters stay active (default: 5min) +} + +func (cfg Config) withDefaults() Config { + if cfg.Timeout == 0 { + cfg.Timeout = 5 * time.Minute + } + if cfg.LogCacheSize == 0 { + cfg.LogCacheSize = 32 + } + return cfg +} + +type Backend interface { + ChainDb() ethdb.Database + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) + PendingBlockAndReceipts() (*types.Block, types.Receipts) + + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription + + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) +} + +// FilterSystem holds resources shared by all filters. +type FilterSystem struct { + backend Backend + logsCache *lru.Cache + cfg *Config +} + +// NewFilterSystem creates a filter system. +func NewFilterSystem(backend Backend, config Config) *FilterSystem { + config = config.withDefaults() + + cache, err := lru.New(config.LogCacheSize) + if err != nil { + panic(err) + } + return &FilterSystem{ + backend: backend, + logsCache: cache, + cfg: &config, + } +} + +// cachedGetLogs loads block logs from the backend and caches the result. +func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { + cached, ok := sys.logsCache.Get(blockHash) + if ok { + return cached.([][]*types.Log), nil + } + + logs, err := sys.backend.GetLogs(ctx, blockHash, number) + if err != nil { + return nil, err + } + if logs == nil { + return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) + } + sys.logsCache.Add(blockHash, logs) + return logs, nil +} + // Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. type Type byte @@ -83,6 +161,7 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { backend Backend + sys *FilterSystem lightMode bool lastHead *types.Header @@ -109,9 +188,10 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m := &EventSystem{ - backend: backend, + sys: sys, + backend: sys.backend, lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), @@ -405,7 +485,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // Get the logs of the block ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - logsList, err := es.backend.GetLogs(ctx, header.Hash()) + logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index ebbf0ee1f44b..da36c01843b6 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -39,10 +39,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/rpc" ) -var ( - deadline = 5 * time.Minute -) - type testBackend struct { mux *event.TypeMux db ethdb.Database @@ -81,14 +77,8 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t return core.GetBlockReceipts(b.db, blockHash, number), nil } -func (b *testBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { - number := core.GetBlockNumber(b.db, blockHash) - receipts := core.GetBlockReceipts(b.db, blockHash, number) - - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(b.db, hash, number) return logs, nil } @@ -147,6 +137,12 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { + backend := &testBackend{db: db} + sys := NewFilterSystem(backend, cfg) + return backend, sys +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -156,12 +152,12 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) - genesis = new(core.Genesis).MustCommit(db) - chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) + genesis = new(core.Genesis).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) for _, blk := range chain { @@ -208,9 +204,9 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -263,9 +259,9 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) testCases = []struct { crit FilterCriteria @@ -307,9 +303,9 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) ) // different situations where log filter creation should fail. @@ -330,8 +326,8 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -354,9 +350,9 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -468,9 +464,9 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -604,10 +600,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) { timeout := 100 * time.Millisecond var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, timeout) - done = make(chan struct{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) + api = NewFilterAPI(sys, false) + done = make(chan struct{}) ) go func() { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 98de7db846b6..0945c661c290 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -50,7 +50,7 @@ func BenchmarkFilters(b *testing.B) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(b, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) @@ -91,7 +91,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -110,7 +110,7 @@ func TestFilters(t *testing.T) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(t, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -133,6 +133,7 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) + gen.AddUncheckedTx(types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(1), 1, big.NewInt(2100), nil)) case 2: receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ @@ -142,6 +143,7 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) + gen.AddUncheckedTx(types.NewTransaction(2, common.HexToAddress("0x2"), big.NewInt(2), 2, big.NewInt(2100), nil)) case 998: receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ @@ -151,6 +153,7 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) + gen.AddUncheckedTx(types.NewTransaction(998, common.HexToAddress("0x998"), big.NewInt(998), 998, big.NewInt(2100), nil)) case 999: receipt := types.NewReceipt(nil, false, 0) receipt.Logs = []*types.Log{ @@ -160,6 +163,7 @@ func TestFilters(t *testing.T) { }, } gen.AddUncheckedReceipt(receipt) + gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, big.NewInt(2100), nil)) } }) for i, block := range chain { @@ -175,14 +179,14 @@ func TestFilters(t *testing.T) { } } - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -191,7 +195,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -200,7 +204,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -208,7 +212,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -216,14 +220,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) + filter = sys.NewRangeFilter(0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 40854f90dc47..ad09ad95c00c 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -35,6 +35,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" "github.com/XinFinOrg/XDPoSChain/eth/downloader" + "github.com/XinFinOrg/XDPoSChain/eth/filters" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/params" @@ -103,6 +104,10 @@ type Backend interface { GetBlocksHashCache(blockNr uint64) []common.Hash AreTwoBlockSamePath(newBlock common.Hash, oldBlock common.Hash) bool GetOrderNonce(address common.Hash) (uint64, error) + + // eth/filters needs to be initialized from this backend type, so methods needed by + // it must also be included here. + filters.Backend } func GetAPIs(apiBackend Backend, chainReader consensus.ChainReader) []rpc.API { diff --git a/les/api_backend.go b/les/api_backend.go index 98ef15d8508a..680d81f18ed4 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -167,8 +167,8 @@ func (b *LesApiBackend) GetReceipts(ctx context.Context, blockHash common.Hash) return light.GetBlockReceipts(ctx, b.eth.odr, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash)) } -func (b *LesApiBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { - return light.GetBlockLogs(ctx, b.eth.odr, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash)) +func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return light.GetBlockLogs(ctx, b.eth.odr, hash, number) } func (b *LesApiBackend) GetTd(blockHash common.Hash) *big.Int { diff --git a/les/backend.go b/les/backend.go index d5c54a7feb71..fe3ffa80b5cb 100644 --- a/les/backend.go +++ b/les/backend.go @@ -190,7 +190,7 @@ func (s *LightEthereum) APIs() []rpc.API { }, { Namespace: "eth", Version: "1.0", - Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute), + Service: filters.NewFilterAPI(filters.NewFilterSystem(s.ApiBackend, filters.Config{LogCacheSize: s.config.FilterLogCacheSize}), true), Public: true, }, { Namespace: "net", diff --git a/node/node.go b/node/node.go index c54328913742..a9f767bf28f1 100644 --- a/node/node.go +++ b/node/node.go @@ -48,6 +48,7 @@ type Node struct { serverConfig p2p.Config server *p2p.Server // Currently running P2P networking layer + state int // Tracks state of node lifecycle serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services @@ -74,6 +75,10 @@ type Node struct { log log.Logger } +const ( + initializingState = iota +) + // New creates a new P2P node, ready for protocol registration. func New(conf *Config) (*Node, error) { // Copy config and resolve the datadir so future changes to the current @@ -302,6 +307,17 @@ func (n *Node) stopInProc() { } } +// RegisterAPIs registers the APIs a service provides on the node. +func (n *Node) RegisterAPIs(apis []rpc.API) { + n.lock.Lock() + defer n.lock.Unlock() + + if n.state != initializingState { + panic("can't register APIs on running/stopped node") + } + n.rpcAPIs = append(n.rpcAPIs, apis...) +} + // startIPC initializes and starts the IPC RPC endpoint. func (n *Node) startIPC(apis []rpc.API) error { // Short circuit if the IPC endpoint isn't being exposed