From 0d452bdebe18f4edad320b73129b06a1a855c59a Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 25 Dec 2024 08:55:03 +0800 Subject: [PATCH 1/6] Revert "optimization: txpool pending cache improvement (#177)" This reverts commit 4ccfc358d58fc35a349f5221ee63925f2bc81a74. --- accounts/abi/bind/util_test.go | 3 - core/txpool/legacypool/cache_for_miner.go | 55 ++-------------- core/txpool/legacypool/legacypool.go | 79 +++++++++++------------ ethclient/simulated/backend.go | 2 - ethclient/simulated/backend_test.go | 1 - miner/worker.go | 2 +- 6 files changed, 44 insertions(+), 98 deletions(-) diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 87917d43fa..592465f2ac 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) - time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index e8cedaa902..4d1ed2628d 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,10 +5,8 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" - "github.com/holiman/uint256" ) var ( @@ -22,18 +20,12 @@ type cacheForMiner struct { pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex - - allCache map[common.Address][]*txpool.LazyTransaction - filteredCache map[common.Address][]*txpool.LazyTransaction - cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), - allCache: make(map[common.Address][]*txpool.LazyTransaction), - filteredCache: make(map[common.Address][]*txpool.LazyTransaction), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), } } @@ -75,9 +67,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { +func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { pending := make(map[common.Address]types.Transactions) - pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -86,46 +77,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty } } pc.txLock.Unlock() - - // convert pending to lazyTransactions - filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) - allLazy := make(map[common.Address][]*txpool.LazyTransaction) - for addr, txs := range pending { + for _, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) - filterd := filter(txs, addr) - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i, tx := range txs { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: tx.Hash(), - Tx: tx, - Time: tx.Time(), - GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - GasTipCap: uint256.MustFromBig(tx.GasTipCap()), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), - } - } - allLazy[addr] = lazies - filteredLazy[addr] = lazies[:len(filterd)] - } - } - - pc.cacheLock.Lock() - pc.filteredCache = filteredLazy - pc.allCache = allLazy - pc.cacheLock.Unlock() -} - -func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { - pc.cacheLock.Lock() - pending := pc.allCache - if filtered { - pending = pc.filteredCache } - pc.cacheLock.Unlock() return pending } @@ -136,7 +91,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) IsLocal(addr common.Address) bool { +func (pc *cacheForMiner) isLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 202f495dc6..8198a31213 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -349,9 +349,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) - // set dumper - pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee)) - // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -386,27 +383,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() - go pool.loopOfSync() return nil } -func (pool *LegacyPool) loopOfSync() { - ticker := time.NewTicker(400 * time.Millisecond) - for { - select { - case <-pool.reorgShutdownCh: - return - case <-ticker.C: - gasTip := pool.gasTip.Load() - currHead := pool.currentHead.Load() - if gasTip == nil || currHead == nil { - continue - } - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee)) - } - } -} - // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -645,35 +624,57 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - empty := txpool.PendingFilter{} - if filter == empty { - // return all pending transactions, no filtering - return pool.pendingCache.dump(false) - } + // TODO need to confirm + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyBlobTxs { return nil } - defer func(t0 time.Time) { - getPendingDurationTimer.Update(time.Since(t0)) - }(time.Now()) - // It is a bit tricky here, we don't do the filtering here. - return pool.pendingCache.dump(true) -} -func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { - return func(txs types.Transactions, addr common.Address) types.Transactions { - if !pool.pendingCache.IsLocal(addr) { + // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool + var ( + minTipBig *big.Int + baseFeeBig *big.Int + ) + if filter.MinTip != nil { + minTipBig = filter.MinTip.ToBig() + } + if filter.BaseFee != nil { + baseFeeBig = filter.BaseFee.ToBig() + } + pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) + for addr, txs := range pool.pendingCache.dump() { + + // If the miner requests tip enforcement, cap the lists now + if minTipBig != nil && !pool.locals.contains(addr) { for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] break } } } - return txs + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i := 0; i < len(txs); i++ { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: txs[i].Hash(), + Tx: txs[i], + Time: txs[i].Time(), + GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), + GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), + Gas: txs[i].Gas(), + BlobGas: txs[i].BlobGas(), + } + } + pending[addr] = lazies + } } + return pending } // Locals retrieves the accounts currently considered local by the pool. @@ -1469,10 +1470,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.priced.SetBaseFee(pendingBaseFee) } } - gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee - go func() { - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee)) - }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 5c137d4079..1df0a73150 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,8 +178,6 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { - // wait for the transactions to be sync into cache - time.Sleep(350 * time.Millisecond) return n.beacon.Commit() } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index 9307e2105a..a8fd7913c3 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,7 +214,6 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) - time.Sleep(1 * time.Second) sim.Commit() // 3. diff --git a/miner/worker.go b/miner/worker.go index 5cf742bcd5..3ea2bfc76b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 4b1406272c12eeb4ffda54e34402a499da838f27 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 6 Nov 2024 14:34:20 +0800 Subject: [PATCH 2/6] to avoid 'nonce too low' transactions in procedure of building current block --- core/txpool/legacypool/legacypool.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 8198a31213..38c975da2b 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -646,7 +646,26 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] baseFeeBig = filter.BaseFee.ToBig() } pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) + var staled map[common.Hash]struct{} + if currHeader := pool.chain.CurrentBlock(); currHeader != nil { + currBlock := pool.chain.GetBlock(currHeader.Hash(), currHeader.Number.Uint64()) + staled = make(map[common.Hash]struct{}, len(currBlock.Transactions())) + for _, tx := range currBlock.Transactions() { + staled[tx.Hash()] = struct{}{} + } + } for addr, txs := range pool.pendingCache.dump() { + // remove nonce too low transactions + if len(staled) > 0 { + noncetoolow := 0 + for i, tx := range txs { + if _, hit := staled[tx.Hash()]; !hit { + break + } + noncetoolow = i + } + txs = txs[noncetoolow:] + } // If the miner requests tip enforcement, cap the lists now if minTipBig != nil && !pool.locals.contains(addr) { From bc100b0dcb7c55943af4a631f5b1cfb4b4e01807 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 7 Nov 2024 11:17:17 +0800 Subject: [PATCH 3/6] fix: 'nonce too low' list should be shift with a initial index of -1, but not 0 add more logs and metrics --- core/txpool/legacypool/legacypool.go | 30 ++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 38c975da2b..0fdf722ee5 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -133,6 +133,11 @@ var ( reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil) truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil) reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil) + + //nonce too low + nonceTooLowHeaderTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/header/duration", nil) + nonceTooLowBlockTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/block/duration", nil) + nonceTooLowTxMeter = metrics.NewRegisteredMeter("txpool/nonce/too/low/tx", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -638,7 +643,18 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] var ( minTipBig *big.Int baseFeeBig *big.Int + + blockNumber uint64 = 0 + blockHash common.Hash = common.Hash{} + nonceTooLowCount = 0 + currBlockDuration time.Duration = 0 + currHeaderDuration time.Duration = 0 + txHashesDuration time.Duration = 0 + staled = make(map[common.Hash]struct{}) ) + defer func() { + log.Info("perf-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled), "currHeaderDuration", currHeaderDuration, "currBlockDuration", currBlockDuration, "txHashesDuration", txHashesDuration) + }() if filter.MinTip != nil { minTipBig = filter.MinTip.ToBig() } @@ -646,25 +662,31 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] baseFeeBig = filter.BaseFee.ToBig() } pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - var staled map[common.Hash]struct{} + t0 := time.Now() if currHeader := pool.chain.CurrentBlock(); currHeader != nil { - currBlock := pool.chain.GetBlock(currHeader.Hash(), currHeader.Number.Uint64()) + currHeaderDuration = time.Since(t0) + blockNumber = currHeader.Number.Uint64() + blockHash = currHeader.Hash() + currBlock := pool.chain.GetBlock(blockHash, currHeader.Number.Uint64()) + currBlockDuration = time.Since(t0) - currHeaderDuration staled = make(map[common.Hash]struct{}, len(currBlock.Transactions())) for _, tx := range currBlock.Transactions() { staled[tx.Hash()] = struct{}{} } + txHashesDuration = time.Since(t0) - currBlockDuration - currHeaderDuration } for addr, txs := range pool.pendingCache.dump() { // remove nonce too low transactions if len(staled) > 0 { - noncetoolow := 0 + noncetoolow := -1 for i, tx := range txs { if _, hit := staled[tx.Hash()]; !hit { break } noncetoolow = i } - txs = txs[noncetoolow:] + nonceTooLowCount += noncetoolow + 1 + txs = txs[noncetoolow+1:] } // If the miner requests tip enforcement, cap the lists now From ad629baaa126d5d809b06d4ba9fdcfc364dab908 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 6 Jan 2025 10:57:29 +0800 Subject: [PATCH 4/6] fix: need transactions number in debug log --- miner/worker_builder.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/miner/worker_builder.go b/miner/worker_builder.go index 61a5672c2f..2055f67cc8 100644 --- a/miner/worker_builder.go +++ b/miner/worker_builder.go @@ -2,15 +2,16 @@ package miner import ( "errors" - mapset "github.com/deckarep/golang-set/v2" - "github.com/ethereum/go-ethereum/consensus/misc/eip4844" - "github.com/holiman/uint256" "math/big" "slices" "sync" "sync/atomic" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -78,7 +79,7 @@ func (w *worker) fillTransactionsAndBundles(interrupt *atomic.Int32, env *enviro pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 1eaf6ba6e2e76d4386bb943fb37c845ec4dc7939 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 6 Jan 2025 11:02:02 +0800 Subject: [PATCH 5/6] remove unnecessary performance debug info --- core/txpool/legacypool/legacypool.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 0fdf722ee5..cdda076371 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -629,7 +629,6 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - // TODO need to confirm defer func(t0 time.Time) { getPendingDurationTimer.Update(time.Since(t0)) }(time.Now()) @@ -644,16 +643,13 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] minTipBig *big.Int baseFeeBig *big.Int - blockNumber uint64 = 0 - blockHash common.Hash = common.Hash{} - nonceTooLowCount = 0 - currBlockDuration time.Duration = 0 - currHeaderDuration time.Duration = 0 - txHashesDuration time.Duration = 0 - staled = make(map[common.Hash]struct{}) + blockNumber uint64 = 0 + blockHash common.Hash = common.Hash{} + nonceTooLowCount = 0 + staled = make(map[common.Hash]struct{}) ) defer func() { - log.Info("perf-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled), "currHeaderDuration", currHeaderDuration, "currBlockDuration", currBlockDuration, "txHashesDuration", txHashesDuration) + log.Debug("perf-trace txpool-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled)) }() if filter.MinTip != nil { minTipBig = filter.MinTip.ToBig() @@ -662,18 +658,14 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] baseFeeBig = filter.BaseFee.ToBig() } pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - t0 := time.Now() if currHeader := pool.chain.CurrentBlock(); currHeader != nil { - currHeaderDuration = time.Since(t0) blockNumber = currHeader.Number.Uint64() blockHash = currHeader.Hash() currBlock := pool.chain.GetBlock(blockHash, currHeader.Number.Uint64()) - currBlockDuration = time.Since(t0) - currHeaderDuration staled = make(map[common.Hash]struct{}, len(currBlock.Transactions())) for _, tx := range currBlock.Transactions() { staled[tx.Hash()] = struct{}{} } - txHashesDuration = time.Since(t0) - currBlockDuration - currHeaderDuration } for addr, txs := range pool.pendingCache.dump() { // remove nonce too low transactions From c4a0606baed4bee14bf14a7eefa3d2ec995454d2 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 7 Jan 2025 18:21:11 +0800 Subject: [PATCH 6/6] remove useless metrics --- core/txpool/legacypool/legacypool.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index fad23b3c4b..b68332bfa6 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -133,11 +133,6 @@ var ( reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil) truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil) reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil) - - //nonce too low - nonceTooLowHeaderTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/header/duration", nil) - nonceTooLowBlockTimer = metrics.NewRegisteredTimer("txpool/nonce/too/low/block/duration", nil) - nonceTooLowTxMeter = metrics.NewRegisteredMeter("txpool/nonce/too/low/tx", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with