Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool optimization: filter out staled transactions of "nonce too low" when providing all pending list to miner #244

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
3 changes: 0 additions & 3 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
55 changes: 5 additions & 50 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
Expand All @@ -22,18 +20,12 @@ type cacheForMiner struct {
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

Expand Down Expand Up @@ -75,9 +67,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -86,46 +77,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty
}
}
pc.txLock.Unlock()

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for _, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -136,7 +91,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
112 changes: 71 additions & 41 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ var (
reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil)
truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil)
reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil)

//nonce too low
nonceTooLowHeaderTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/header/duration", nil)
nonceTooLowBlockTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/block/duration", nil)
nonceTooLowTxMeter = metrics.NewRegisteredMeter("txpool/nonce/too/low/tx", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -349,9 +354,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -386,27 +388,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(400 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -645,35 +629,85 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int

blockNumber uint64 = 0
blockHash common.Hash = common.Hash{}
nonceTooLowCount = 0
staled = make(map[common.Hash]struct{})
)
defer func() {
log.Debug("perf-trace txpool-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled))
}()
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
if currHeader := pool.chain.CurrentBlock(); currHeader != nil {
blockNumber = currHeader.Number.Uint64()
blockHash = currHeader.Hash()
currBlock := pool.chain.GetBlock(blockHash, currHeader.Number.Uint64())
staled = make(map[common.Hash]struct{}, len(currBlock.Transactions()))
for _, tx := range currBlock.Transactions() {
staled[tx.Hash()] = struct{}{}
}
}
for addr, txs := range pool.pendingCache.dump() {
// remove nonce too low transactions
if len(staled) > 0 {
noncetoolow := -1
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
if _, hit := staled[tx.Hash()]; !hit {
break
}
noncetoolow = i
}
nonceTooLowCount += noncetoolow + 1
txs = txs[noncetoolow+1:]
}

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
}
}
return txs
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1469,10 +1503,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.SetBaseFee(pendingBaseFee)
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
2 changes: 0 additions & 2 deletions ethclient/simulated/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ func (n *Backend) Close() error {

// Commit seals a block and moves the chain forward to a new empty block.
func (n *Backend) Commit() common.Hash {
// wait for the transactions to be sync into cache
time.Sleep(350 * time.Millisecond)
return n.beacon.Commit()
}

Expand Down
1 change: 0 additions & 1 deletion ethclient/simulated/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func TestForkResendTx(t *testing.T) {
t.Fatalf("could not create transaction: %v", err)
}
client.SendTransaction(ctx, tx)
time.Sleep(1 * time.Second)
sim.Commit()

// 3.
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
pendingBlobTxs := w.eth.TxPool().Pending(filter)

packFromTxpoolTimer.UpdateSince(start)
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs))
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash())
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved

// Split the pending transactions into locals and remotes.
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
Expand Down
9 changes: 5 additions & 4 deletions miner/worker_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package miner

import (
"errors"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/holiman/uint256"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (w *worker) fillTransactionsAndBundles(interrupt *atomic.Int32, env *enviro
pendingBlobTxs := w.eth.TxPool().Pending(filter)

packFromTxpoolTimer.UpdateSince(start)
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash())
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs))

// Split the pending transactions into locals and remotes.
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
Expand Down