Skip to content

Commit

Permalink
all: add global block logs cache (25459)
Browse files Browse the repository at this point in the history
  • Loading branch information
JukLee0ira committed Jul 22, 2024
1 parent a93c24b commit b1228c7
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 164 deletions.
32 changes: 16 additions & 16 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type SimulatedBackend struct {
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig
}
Expand Down Expand Up @@ -95,9 +96,7 @@ func SimulateWalletAddressAndSignFn() (common.Address, func(account accounts.Acc

// XDC simulated backend for testing purpose.
func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfig *params.ChainConfig) *SimulatedBackend {
// database := ethdb.NewMemDatabase()
database := rawdb.NewMemoryDatabase()

genesis := core.Genesis{
GasLimit: gasLimit, // need this big, support initial smart contract
Config: chainConfig,
Expand Down Expand Up @@ -128,7 +127,11 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
blockchain: blockchain,
config: genesis.Config,
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

blockchain.Client = backend
backend.rollback()
return backend
Expand All @@ -148,7 +151,11 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
blockchain: blockchain,
config: genesis.Config,
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback()
return backend
}
Expand Down Expand Up @@ -422,7 +429,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
Expand All @@ -434,7 +441,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -551,15 +558,8 @@ func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number)
return logs, nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
//utils.CacheDatabaseFlag,
//utils.CacheGCFlag,
//utils.TrieCacheGenFlag,
utils.CacheLogSizeFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
Expand Down
24 changes: 24 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
"github.com/XinFinOrg/XDPoSChain/eth/filters"
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/internal/ethapi"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/metrics/exp"
Expand All @@ -52,6 +54,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rpc"
whisper "github.com/XinFinOrg/XDPoSChain/whisper/whisperv6"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -313,6 +316,11 @@ var (
Usage: "Percentage of cache memory allowance to use for trie pruning",
Value: 25,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache.blocklogs",
Usage: "Size (in number of blocks) of the log cache for filtering",
Value: ethconfig.Defaults.FilterLogCacheSize,
}
// Miner settings
StakingEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1208,6 +1216,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(GasPriceFlag.Name) {
cfg.GasPrice = GlobalBig(ctx, GasPriceFlag.Name)
}
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name)
Expand Down Expand Up @@ -1407,6 +1418,19 @@ func WalkMatch(root, pattern string) ([]string, error) {
return matches, nil
}

// RegisterFilterAPI adds the eth log filtering RPC API to the node.
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})
return filterSystem
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
Expand Down
3 changes: 1 addition & 2 deletions cmd/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
}
}

// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
// th egiven node.
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
func RegisterEthStatsService(stack *node.Node, url string) {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
// Retrieve both eth and les services
Expand Down
9 changes: 9 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) {
}
}

// AddUncheckedTx forcefully adds a transaction to the block without any
// validation.
//
// AddUncheckedTx will cause consensus failures when used during real
// chain processing. This is best used in conjunction with raw block insertion.
func (b *BlockGen) AddUncheckedTx(tx *types.Transaction) {
b.txs = append(b.txs, tx)
}

// Number returns the block number of the block being generated.
func (b *BlockGen) Number() *big.Int {
return new(big.Int).Set(b.header.Number)
Expand Down
13 changes: 2 additions & 11 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,8 @@ func (b *EthApiBackend) GetReceipts(ctx context.Context, blockHash common.Hash)
return core.GetBlockReceipts(b.eth.chainDb, blockHash, core.GetBlockNumber(b.eth.chainDb, blockHash)), nil
}

func (b *EthApiBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
db := b.eth.ChainDb()
number := rawdb.ReadHeaderNumber(db, blockHash)
if number == nil {
return nil, errors.New("failed to get block number from hash")
}
logs := rawdb.ReadLogs(db, blockHash, *number)
if logs == nil {
return nil, errors.New("failed to get logs for block")
}
return logs, nil
func (b *EthApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
return rawdb.ReadLogs(b.eth.chainDb, hash, number), nil
}

func (b *EthApiBackend) GetTd(blockHash common.Hash) *big.Int {
Expand Down
3 changes: 1 addition & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/XDCx"
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
Expand Down Expand Up @@ -401,7 +400,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute),
Service: filters.NewFilterAPI(filters.NewFilterSystem(s.ApiBackend, filters.Config{LogCacheSize: s.config.FilterLogCacheSize}), true),
Public: true,
}, {
Namespace: "admin",
Expand Down
16 changes: 10 additions & 6 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ var Defaults = Config{
DatasetsInMem: 1,
DatasetsOnDisk: 2,
},
NetworkId: 88,
LightPeers: 100,
DatabaseCache: 768,
TrieCache: 256,
TrieTimeout: 5 * time.Minute,
GasPrice: big.NewInt(0.25 * params.Shannon),
NetworkId: 88,
LightPeers: 100,
DatabaseCache: 768,
TrieCache: 256,
TrieTimeout: 5 * time.Minute,
FilterLogCacheSize: 32,
GasPrice: big.NewInt(0.25 * params.Shannon),

TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 25000000,
Expand Down Expand Up @@ -111,6 +112,9 @@ type Config struct {
TrieCache int
TrieTimeout time.Duration

// This is the number of blocks for which logs will be cached in the filter system.
FilterLogCacheSize int

// Mining-related options
Etherbase common.Address `toml:",omitempty"`
MinerThreads int `toml:",omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type FilterAPI struct {
backend Backend
sys *FilterSystem
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
Expand All @@ -63,15 +63,15 @@ type FilterAPI struct {
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
api := &FilterAPI{
backend: backend,
chainDb: backend.ChainDb(),
events: NewEventSystem(backend, lightMode),
sys: system,
chainDb: system.backend.ChainDb(),
events: NewEventSystem(system, lightMode),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
timeout: system.cfg.Timeout,
}
go api.timeoutLoop(timeout)
go api.timeoutLoop(system.cfg.Timeout)

return api
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -353,7 +353,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -396,7 +396,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -408,7 +408,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
16 changes: 11 additions & 5 deletions eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,25 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {

b.Log("Running filter benchmarks...")
start = time.Now()
var backend *testBackend

var (
backend *testBackend
sys *FilterSystem
)

for i := 0; i < benchFilterCnt; i++ {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
backend = &testBackend{db: db, sections: cnt}
sys = NewFilterSystem(backend, Config{})
}
var addr common.Address
addr[0] = byte(i)
addr[1] = byte(i / 256)
filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
if _, err := filter.Logs(context.Background()); err != nil {
b.Error("filter.Find error:", err)
b.Error("filter.Logs error:", err)
}
}
d = time.Since(start)
Expand Down Expand Up @@ -186,10 +191,11 @@ func BenchmarkNoBloomBits(b *testing.B) {

clearBloomBits(db)

_, sys := newTestFilterSystem(b, db, Config{})

b.Log("Running filter benchmarks...")
start := time.Now()
backend := &testBackend{db: db}
filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil)
filter := sys.NewRangeFilter(0, int64(headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)
b.Log("Finished running filter benchmarks")
Expand Down
Loading

0 comments on commit b1228c7

Please sign in to comment.