Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool optimization: disable cache if --mine is not enabled #245

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
4 changes: 4 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
cfg.Eth.OverrideVerkle = &v
}

if ctx.Bool(utils.MiningEnabledFlag.Name) {
cfg.Eth.TxPool.EnableCache = true
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down
63 changes: 13 additions & 50 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,35 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil)
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

type pendingCache interface {
add(types.Transactions, types.Signer)
del(types.Transactions, types.Signer)
dump() map[common.Address]types.Transactions
markLocal(common.Address)
flattenLocals() []common.Address
}

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

Expand Down Expand Up @@ -75,9 +75,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -86,46 +85,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty
}
}
pc.txLock.Unlock()

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for _, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -136,7 +99,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
91 changes: 49 additions & 42 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled

Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Expand Down Expand Up @@ -236,6 +238,12 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
// log to inform user if the cache is enabled or not
if conf.EnableCache {
log.Info("legacytxpool Pending Cache is enabled")
} else {
log.Info("legacytxpool Pending Cache is disabled")
}
return conf
}

Expand Down Expand Up @@ -270,7 +278,7 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCache *cacheForMiner //pending list cache for miner
pendingCache pendingCache //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand Down Expand Up @@ -313,6 +321,9 @@ func New(config Config, chain BlockChain) *LegacyPool {
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
}
if !config.EnableCache {
pool.pendingCache = newNoneCacheForMiner(pool)
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -349,9 +360,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -386,27 +394,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(400 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -645,35 +635,56 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
}
}
return txs
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1469,10 +1480,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.SetBaseFee(pendingBaseFee)
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
26 changes: 25 additions & 1 deletion core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ var (
// sideeffects used during testing.
testTxPoolConfig Config

//
testTxPoolConfigEnableCache Config

enableCache bool

// eip1559Config is a chain config with EIP-1559 enabled at block 0.
eip1559Config *params.ChainConfig
)
Expand All @@ -55,6 +60,10 @@ func init() {
testTxPoolConfig = DefaultConfig
testTxPoolConfig.Journal = ""

testTxPoolConfigEnableCache = DefaultConfig
testTxPoolConfigEnableCache.Journal = ""
testTxPoolConfigEnableCache.EnableCache = true

cpy := *params.TestChainConfig
eip1559Config = &cpy
eip1559Config.BerlinBlock = common.Big0
Expand Down Expand Up @@ -163,7 +172,12 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat
blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed))

key, _ := crypto.GenerateKey()
pool := New(testTxPoolConfig, blockchain)
var pool *LegacyPool
if enableCache {
pool = New(testTxPoolConfigEnableCache, blockchain)
} else {
pool = New(testTxPoolConfig, blockchain)
}
if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil {
panic(err)
}
Expand Down Expand Up @@ -1534,12 +1548,22 @@ func TestMinGasPriceEnforced(t *testing.T) {
}
}

func TestRepricingDynamicFeeEnableCache(t *testing.T) {
enableCache = true
repricingDynamicFee(t)
enableCache = false
}

// Tests that setting the transaction pool gas price to a higher value correctly
// discards everything cheaper (legacy & dynamic fee) than that and moves any
// gapped transactions back from the pending pool to the queue.
//
// Note, local transactions are never allowed to be dropped.
func TestRepricingDynamicFee(t *testing.T) {
repricingDynamicFee(t)
}

func repricingDynamicFee(t *testing.T) {
t.Parallel()

// Create the pool to test the pricing enforcement with
Expand Down
Loading