From 842250e656c2b16255250ce27822b209502bf802 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 23 Dec 2024 15:50:46 +0800 Subject: [PATCH 01/11] add async priced list add async priced list --- core/txpool/legacypool/async_priced_list.go | 174 ++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 core/txpool/legacypool/async_priced_list.go diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go new file mode 100644 index 000000000..34170a326 --- /dev/null +++ b/core/txpool/legacypool/async_priced_list.go @@ -0,0 +1,174 @@ +package legacypool + +import ( + "math/big" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/core/types" +) + +var _ pricedListInterface = &asyncPricedList{} + +type addEvent struct { + tx *types.Transaction + local bool +} + +type discardEvent struct { + slots int + force bool + done chan *discardResult +} +type discardResult struct { + discardTxs types.Transactions + succ bool +} + +type asyncPricedList struct { + priced *pricedList + floatingLowest atomic.Value + urgentLowest atomic.Value + mu sync.Mutex + + // events + quit chan struct{} + reheap chan struct{} + add chan *addEvent + remove chan int + discard chan *discardEvent + setBaseFee chan *big.Int +} + +func newAsyncPricedList(all *lookup) *asyncPricedList { + a := &asyncPricedList{ + priced: newPricedList(all), + quit: make(chan struct{}), + reheap: make(chan struct{}), + add: make(chan *addEvent), + remove: make(chan int), + discard: make(chan *discardEvent), + setBaseFee: make(chan *big.Int), + } + go a.run() + return a +} + +// run is a loop that handles async operations: +// - reheap: reheap the whole priced list, to get the lowest gas price +// - put: add a transaction to the priced list +// - remove: remove transactions from the priced list +// - discard: remove transactions to make room for new ones +func (a *asyncPricedList) run() { + var reheap bool + var newOnes []*types.Transaction + var toRemove int = 0 + // current loop state + var currentDone chan struct{} = nil + var baseFee *big.Int = nil + for { + if currentDone == nil { + currentDone = make(chan struct{}) + go func(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int) { + a.handle(reheap, newOnes, toRemove, baseFee, currentDone) + <-currentDone + close(currentDone) + currentDone = nil + }(reheap, newOnes, toRemove, baseFee) + + reheap, newOnes, toRemove, baseFee = false, nil, 0, nil + } + select { + case <-a.reheap: + reheap = true + + case add := <-a.add: + newOnes = append(newOnes, add.tx) + + case remove := <-a.remove: + toRemove += remove + + case baseFee = <-a.setBaseFee: + + case <-a.quit: + return + } + } +} + +func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int, finished chan struct{}) { + defer close(finished) + a.mu.Lock() + defer a.mu.Unlock() + // add new transactions to the priced list + for _, tx := range newOnes { + a.priced.Put(tx, false) + } + // remove staled transactions from the priced list + a.priced.Removed(toRemove) + // reheap if needed + if reheap { + a.priced.Reheap() + // set the lowest priced transaction when reheap is done + if len(a.priced.floating.list) > 0 { + a.floatingLowest.Store(a.priced.floating.list[0]) + } else { + a.floatingLowest.Store(nil) + } + if len(a.priced.urgent.list) > 0 { + a.urgentLowest.Store(a.priced.urgent.list[0]) + } else { + a.urgentLowest.Store(nil) + } + } + if baseFee != nil { + a.priced.SetBaseFee(baseFee) + } +} + +func (a *asyncPricedList) Put(tx *types.Transaction, local bool) { + a.add <- &addEvent{tx, local} +} + +func (a *asyncPricedList) Removed(count int) { + a.remove <- count +} + +func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool { + urgentLowest, floatingLowest := a.urgentLowest.Load(), a.floatingLowest.Load() + return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest.(*types.Transaction), tx) >= 0) && + (floatingLowest == nil || a.priced.floating.cmp(floatingLowest.(*types.Transaction), tx) >= 0) && + (floatingLowest != nil && urgentLowest != nil) +} + +// Disacard cleans staled transactions to make room for new ones +func (a *asyncPricedList) Discard(slots int, force bool) (types.Transactions, bool) { + a.mu.Lock() + defer a.mu.Unlock() + return a.priced.Discard(slots, force) +} + +func (a *asyncPricedList) NeedReheap(currHead *types.Header) bool { + return false +} + +func (a *asyncPricedList) Reheap() { + a.reheap <- struct{}{} +} + +func (a *asyncPricedList) SetBaseFee(baseFee *big.Int) { + a.setBaseFee <- baseFee + a.reheap <- struct{}{} +} + +func (a *asyncPricedList) SetHead(currHead *types.Header) { + //do nothing +} + +func (a *asyncPricedList) GetBaseFee() *big.Int { + return a.priced.floating.baseFee +} + +func (a *asyncPricedList) Stop() { + close(a.quit) +} From 40e41d8beabb890bf45fdf8e1e5cdee4eb88924d Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 23 Dec 2024 16:47:49 +0800 Subject: [PATCH 02/11] fix bug: 1. panic when storing nil pointer into atomic.Value; 2. failed to check Underpriced() when floating or urgent list is empty --- core/txpool/legacypool/async_priced_list.go | 23 ++++++++++++++------- core/txpool/legacypool/legacypool_test.go | 2 +- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index 34170a326..685984ac3 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -72,7 +72,6 @@ func (a *asyncPricedList) run() { go func(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int) { a.handle(reheap, newOnes, toRemove, baseFee, currentDone) <-currentDone - close(currentDone) currentDone = nil }(reheap, newOnes, toRemove, baseFee) @@ -110,15 +109,16 @@ func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRe if reheap { a.priced.Reheap() // set the lowest priced transaction when reheap is done + var emptyTx *types.Transaction = nil if len(a.priced.floating.list) > 0 { a.floatingLowest.Store(a.priced.floating.list[0]) } else { - a.floatingLowest.Store(nil) + a.floatingLowest.Store(emptyTx) } if len(a.priced.urgent.list) > 0 { a.urgentLowest.Store(a.priced.urgent.list[0]) } else { - a.urgentLowest.Store(nil) + a.urgentLowest.Store(emptyTx) } } if baseFee != nil { @@ -135,10 +135,19 @@ func (a *asyncPricedList) Removed(count int) { } func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool { - urgentLowest, floatingLowest := a.urgentLowest.Load(), a.floatingLowest.Load() - return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest.(*types.Transaction), tx) >= 0) && - (floatingLowest == nil || a.priced.floating.cmp(floatingLowest.(*types.Transaction), tx) >= 0) && - (floatingLowest != nil && urgentLowest != nil) + var urgentLowest, floatingLowest *types.Transaction = nil, nil + ul, fl := a.urgentLowest.Load(), a.floatingLowest.Load() + if ul != nil { + // be careful that ul might be nil + urgentLowest = ul.(*types.Transaction) + } + if fl != nil { + // be careful that fl might be nil + floatingLowest = fl.(*types.Transaction) + } + return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) && + (floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) && + (floatingLowest != nil || urgentLowest != nil) } // Disacard cleans staled transactions to make room for new ones diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 6b900ed2c..0ed32c654 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -1852,7 +1852,7 @@ func TestUnderpricing(t *testing.T) { // Tests that more expensive transactions push out cheap ones from the pool, but // without producing instability by creating gaps that start jumping transactions // back and forth between queued/pending. -func TestStableUnderpricing(t *testing.T) { +func TestStableUnderpricingForAsyncPriced(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with From b254b06b156eec798e98c095f027d6a4a4dbd8d8 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Mon, 23 Dec 2024 16:10:29 +0800 Subject: [PATCH 03/11] add option for enabling async priced --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 +++++++++ core/txpool/legacypool/legacypool.go | 8 +++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 784be6d7f..e45efc9e2 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -79,6 +79,7 @@ var ( utils.TxPoolRejournalFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, + utils.TxPoolEnableAsyncPricedFlag, utils.TxPoolAccountSlotsFlag, utils.TxPoolGlobalSlotsFlag, utils.TxPoolAccountQueueFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ac46716bc..ef009ac3b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -410,6 +410,12 @@ var ( Value: ethconfig.Defaults.TxPool.PriceBump, Category: flags.TxPoolCategory, } + TxPoolEnableAsyncPricedFlag = &cli.BoolFlag{ + Name: "txpool.asyncpriced", + Usage: "enable async-priced-sorted list for txpool", + Value: false, + Category: flags.TxPoolCategory, + } TxPoolAccountSlotsFlag = &cli.Uint64Flag{ Name: "txpool.accountslots", Usage: "Minimum number of executable transaction slots guaranteed per account", @@ -1723,6 +1729,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { if ctx.IsSet(TxPoolPriceBumpFlag.Name) { cfg.PriceBump = ctx.Uint64(TxPoolPriceBumpFlag.Name) } + if ctx.IsSet(TxPoolEnableAsyncPricedFlag.Name) { + cfg.EnableAsyncPriced = ctx.Bool(TxPoolEnableAsyncPricedFlag.Name) + } if ctx.IsSet(TxPoolAccountSlotsFlag.Name) { cfg.AccountSlots = ctx.Uint64(TxPoolAccountSlotsFlag.Name) } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 202f495dc..3a282b441 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -153,6 +153,8 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { + EnableAsyncPriced bool // enable async pricedlist. Set as true only --txpool.enableasyncpriced option is enabled + Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts @@ -319,7 +321,11 @@ func New(config Config, chain BlockChain) *LegacyPool { pool.locals.add(addr) pool.pendingCache.markLocal(addr) } - pool.priced = newPricedList(pool.all) + if config.EnableAsyncPriced { + pool.priced = newAsyncPricedList(pool.all) + } else { + pool.priced = newPricedList(pool.all) + } if (!config.NoLocals || config.JournalRemote) && config.Journal != "" { pool.journal = newTxJournal(config.Journal) From 428556d933eda900b46cf9ed5b39a5ced107cdef Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 26 Dec 2024 16:18:42 +0800 Subject: [PATCH 04/11] fix ut of underpriced --- core/txpool/legacypool/async_priced_list.go | 8 + core/txpool/legacypool/legacypool.go | 7 +- core/txpool/legacypool/legacypool_test.go | 235 +++++++++++++++++++- core/txpool/legacypool/list.go | 28 +++ 4 files changed, 272 insertions(+), 6 deletions(-) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index 685984ac3..e988ad922 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -126,6 +126,10 @@ func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRe } } +func (a *asyncPricedList) Staled() int { + return a.priced.Staled() +} + func (a *asyncPricedList) Put(tx *types.Transaction, local bool) { a.add <- &addEvent{tx, local} } @@ -181,3 +185,7 @@ func (a *asyncPricedList) GetBaseFee() *big.Int { func (a *asyncPricedList) Stop() { close(a.quit) } + +func (a *asyncPricedList) TxCount() int { + return a.priced.TxCount() +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 3a282b441..4381a85bc 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -270,7 +270,7 @@ type LegacyPool struct { queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *lookup // All transactions to allow lookups - priced *pricedList // All transactions sorted by price + priced pricedListInterface // All transactions sorted by price pendingCache *cacheForMiner //pending list cache for miner @@ -446,7 +446,7 @@ func (pool *LegacyPool) loop() { pool.mu.RLock() pending, queued := pool.stats() pool.mu.RUnlock() - stales := int(pool.priced.stales.Load()) + stales := pool.priced.Staled() if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) @@ -852,7 +852,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.priced.SetBaseFee(baseFee) } pool.priced.Reheap() - pool.priced.currHead = currHead } // If the new transaction is underpriced, don't accept it @@ -1468,7 +1467,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, if reset != nil { pool.demoteUnexecutables(demoteAddrs) demoteTimer.UpdateSince(t0) - var pendingBaseFee = pool.priced.urgent.baseFee + var pendingBaseFee = pool.priced.GetBaseFee() if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 0ed32c654..7388ecca9 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -183,7 +183,11 @@ func validatePoolInternals(pool *LegacyPool) error { return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) } pool.priced.Reheap() - priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount() + if pool.config.EnableAsyncPriced { + // sleep a bit to wait for the reheap to finish + time.Sleep(50 * time.Millisecond) + } + priced, remote := pool.priced.TxCount(), pool.all.RemoteCount() if priced != remote { return fmt.Errorf("total priced transaction count %d != %d", priced, remote) } @@ -1849,10 +1853,126 @@ func TestUnderpricing(t *testing.T) { } } +func TestAsyncUnderpricing(t *testing.T) { + t.Parallel() + + // Create the pool to test the pricing enforcement with + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) + + config := testTxPoolConfig + config.GlobalSlots = 2 + config.GlobalQueue = 2 + config.EnableAsyncPriced = true + + pool := New(config, blockchain) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + defer pool.Close() + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Generate and queue a batch of transactions, both pending and queued + txs := types.Transactions{} + + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0])) + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) + + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1])) + + ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[2]) + + // Import the batch and that both pending and queued transactions match up + pool.addRemotes(txs) + pool.addLocal(ltx) + + // sleep a bit to wait for priced transactions to be processed in parallel + time.Sleep(50 * time.Millisecond) + + pending, queued := pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 3); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding an underpriced transaction on block limit fails + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keys[1])); !errors.Is(err, txpool.ErrUnderpriced) { + t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) + } + // Replace a future transaction with a future transaction + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1 + t.Fatalf("failed to add well priced transaction: %v", err) + } + // Ensure that adding high priced transactions drops cheap ones, but not own + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + t.Fatalf("failed to add well priced transaction: %v", err) + } + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2 + t.Fatalf("failed to add well priced transaction: %v", err) + } + if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 + t.Fatalf("failed to add well priced transaction: %v", err) + } + // Ensure that replacing a pending transaction with a future transaction fails + if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending { + t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, txpool.ErrFutureReplacePending) + } + pending, queued = pool.Stats() + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + if queued != 2 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding local transactions can push out even higher priced ones + ltx = pricedTransaction(1, 100000, big.NewInt(0), keys[2]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to append underpriced local transaction: %v", err) + } + ltx = pricedTransaction(0, 100000, big.NewInt(0), keys[3]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to add new underpriced local transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("local event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that more expensive transactions push out cheap ones from the pool, but // without producing instability by creating gaps that start jumping transactions // back and forth between queued/pending. -func TestStableUnderpricingForAsyncPriced(t *testing.T) { +func TestAsyncStableUnderpricing(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with @@ -1917,6 +2037,117 @@ func TestStableUnderpricingForAsyncPriced(t *testing.T) { } } +// Tests that when the pool reaches its global transaction limit, underpriced +// transactions (legacy & dynamic fee) are gradually shifted out for more +// expensive ones and any gapped pending transactions are moved into the queue. +// +// Note, local transactions are never allowed to be dropped. +func TestAsyncUnderpricingDynamicFee(t *testing.T) { + t.Parallel() + + pool, _ := setupPoolWithConfig(eip1559Config) + defer pool.Close() + + pool.config.GlobalSlots = 2 + pool.config.GlobalQueue = 2 + pool.config.EnableAsyncPriced = true + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 4) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + + // Generate and queue a batch of transactions, both pending and queued + txs := types.Transactions{} + + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])) + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) + txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1])) + + ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2]) + + // Import the batch and that both pending and queued transactions match up + pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1 + pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1 + + pending, queued := pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 3); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + + // Ensure that adding an underpriced transaction fails + tx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[1]) + if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 + t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) + } + + // Ensure that adding high priced transactions drops cheap ones, but not own + tx = pricedTransaction(0, 100000, big.NewInt(2), keys[1]) + if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + t.Fatalf("failed to add well priced transaction: %v", err) + } + + tx = pricedTransaction(1, 100000, big.NewInt(3), keys[1]) + if err := pool.addRemoteSync(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2 + t.Fatalf("failed to add well priced transaction: %v", err) + } + tx = dynamicFeeTx(2, 100000, big.NewInt(4), big.NewInt(1), keys[1]) + if err := pool.addRemoteSync(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3 + t.Fatalf("failed to add well priced transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + if queued != 2 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding local transactions can push out even higher priced ones + ltx = dynamicFeeTx(1, 100000, big.NewInt(0), big.NewInt(0), keys[2]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to append underpriced local transaction: %v", err) + } + ltx = dynamicFeeTx(0, 100000, big.NewInt(0), big.NewInt(0), keys[3]) + if err := pool.addLocal(ltx); err != nil { + t.Fatalf("failed to add new underpriced local transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + } + if err := validateEvents(events, 2); err != nil { + t.Fatalf("local event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that when the pool reaches its global transaction limit, underpriced // transactions (legacy & dynamic fee) are gradually shifted out for more // expensive ones and any gapped pending transactions are moved into the queue. diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 6b823a4a7..80e32edea 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -537,6 +537,21 @@ func (h *priceHeap) Pop() interface{} { return x } +var _ pricedListInterface = (*pricedList)(nil) + +type pricedListInterface interface { + Put(tx *types.Transaction, local bool) + Removed(count int) + Underpriced(tx *types.Transaction) bool + Discard(slots int, force bool) (types.Transactions, bool) + NeedReheap(currHead *types.Header) bool + Reheap() + SetBaseFee(baseFee *big.Int) + GetBaseFee() *big.Int + Staled() int + TxCount() int +} + // pricedList is a price-sorted heap to allow operating on transactions pool // contents in a price-incrementing way. It's built upon the all transactions // in txpool but only interested in the remote part. It means only remote transactions @@ -571,6 +586,10 @@ func newPricedList(all *lookup) *pricedList { } } +func (l *pricedList) Staled() int { + return int(l.stales.Load()) +} + // Put inserts a new transaction into the heap. func (l *pricedList) Put(tx *types.Transaction, local bool) { if local { @@ -704,3 +723,12 @@ func (l *pricedList) Reheap() { func (l *pricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee } + +// GetBaseFee returns the current base fee used for sorting the urgent heap. +func (l *pricedList) GetBaseFee() *big.Int { + return l.urgent.baseFee +} + +func (l *pricedList) TxCount() int { + return len(l.urgent.list) + len(l.floating.list) +} From 77ece0b4b2c373b7733d440e3f9f015da8a44802 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 26 Dec 2024 16:32:32 +0800 Subject: [PATCH 05/11] revert opt of reheap frequency; close async pricedlist when txpool is shutdown --- core/txpool/legacypool/async_priced_list.go | 2 +- core/txpool/legacypool/legacypool.go | 12 +++--------- core/txpool/legacypool/legacypool_test.go | 1 - core/txpool/legacypool/list.go | 12 ++++++------ 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index e988ad922..613a25cfa 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -182,7 +182,7 @@ func (a *asyncPricedList) GetBaseFee() *big.Int { return a.priced.floating.baseFee } -func (a *asyncPricedList) Stop() { +func (a *asyncPricedList) Close() { close(a.quit) } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 4381a85bc..3e84c93ad 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -432,6 +432,7 @@ func (pool *LegacyPool) loop() { defer evict.Stop() defer journal.Stop() defer reannounce.Stop() + defer pool.priced.Close() // Notify tests that the init phase is done close(pool.initDoneCh) @@ -845,15 +846,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { - currHead := pool.currentHead.Load() - if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) { - if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) { - baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1) - pool.priced.SetBaseFee(baseFee) - } - pool.priced.Reheap() - } - // If the new transaction is underpriced, don't accept it if !isLocal && pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) @@ -1472,6 +1464,8 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) pool.priced.SetBaseFee(pendingBaseFee) + } else { + pool.priced.Reheap() } } gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 7388ecca9..41dcb64fe 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -2304,7 +2304,6 @@ func TestDualHeapEviction(t *testing.T) { add(false) for baseFee = 0; baseFee <= 1000; baseFee += 100 { pool.priced.SetBaseFee(big.NewInt(int64(baseFee))) - pool.priced.Reheap() add(true) check(highCap, "fee cap") add(false) diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 80e32edea..181fadb47 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -544,12 +544,12 @@ type pricedListInterface interface { Removed(count int) Underpriced(tx *types.Transaction) bool Discard(slots int, force bool) (types.Transactions, bool) - NeedReheap(currHead *types.Header) bool Reheap() SetBaseFee(baseFee *big.Int) GetBaseFee() *big.Int Staled() int TxCount() int + Close() } // pricedList is a price-sorted heap to allow operating on transactions pool @@ -564,7 +564,6 @@ type pricedListInterface interface { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type pricedList struct { - currHead *types.Header // Current block header for effective tip calculation // Number of stale price points to (re-heap trigger). stales atomic.Int64 @@ -687,10 +686,6 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { return drop, true } -func (l *pricedList) NeedReheap(currHead *types.Header) bool { - return l.currHead == nil || currHead == nil || currHead.Hash().Cmp(l.currHead.Hash()) != 0 -} - // Reheap forcibly rebuilds the heap based on the current remote transaction set. func (l *pricedList) Reheap() { l.reheapMu.Lock() @@ -722,6 +717,7 @@ func (l *pricedList) Reheap() { // necessary to call right before SetBaseFee when processing a new block. func (l *pricedList) SetBaseFee(baseFee *big.Int) { l.urgent.baseFee = baseFee + l.Reheap() } // GetBaseFee returns the current base fee used for sorting the urgent heap. @@ -732,3 +728,7 @@ func (l *pricedList) GetBaseFee() *big.Int { func (l *pricedList) TxCount() int { return len(l.urgent.list) + len(l.floating.list) } + +func (l *pricedList) Close() { + //do nothing +} From db9d024b10a5c1fb616b29ca72698d22d7e54fb2 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 26 Dec 2024 16:50:57 +0800 Subject: [PATCH 06/11] add ut of StableUnderpricing --- core/txpool/legacypool/legacypool_test.go | 69 +++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 41dcb64fe..1e9c82a68 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -1969,6 +1969,74 @@ func TestAsyncUnderpricing(t *testing.T) { } } +// Tests that more expensive transactions push out cheap ones from the pool, but +// without producing instability by creating gaps that start jumping transactions +// back and forth between queued/pending. +func TestStableUnderpricing(t *testing.T) { + t.Parallel() + + // Create the pool to test the pricing enforcement with + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) + + config := testTxPoolConfig + config.GlobalSlots = 128 + config.GlobalQueue = 0 + + pool := New(config, blockchain) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + defer pool.Close() + + // Keep track of transaction events to ensure all executables get announced + events := make(chan core.NewTxsEvent, 32) + sub := pool.txFeed.Subscribe(events) + defer sub.Unsubscribe() + + // Create a number of test accounts and fund them + keys := make([]*ecdsa.PrivateKey, 2) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Fill up the entire queue with the same transaction price points + txs := types.Transactions{} + for i := uint64(0); i < config.GlobalSlots; i++ { + txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0])) + } + pool.addRemotesSync(txs) + + pending, queued := pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, int(config.GlobalSlots)); err != nil { + t.Fatalf("original event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { + t.Fatalf("failed to add well priced transaction: %v", err) + } + pending, queued = pool.Stats() + if pending != int(config.GlobalSlots) { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateEvents(events, 1); err != nil { + t.Fatalf("additional event firing failed: %v", err) + } + if err := validatePoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that more expensive transactions push out cheap ones from the pool, but // without producing instability by creating gaps that start jumping transactions // back and forth between queued/pending. @@ -1982,6 +2050,7 @@ func TestAsyncStableUnderpricing(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = 128 config.GlobalQueue = 0 + config.EnableAsyncPriced = true pool := New(config, blockchain) pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) From ce70be62f50e46bbc59d42374c31410121a247d5 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 7 Jan 2025 15:58:14 +0800 Subject: [PATCH 07/11] add mu to make Underpriced() thread-safe --- core/txpool/legacypool/async_priced_list.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index 613a25cfa..839eb52df 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -149,6 +149,8 @@ func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool { // be careful that fl might be nil floatingLowest = fl.(*types.Transaction) } + a.mu.Lock() + defer a.mu.Unlock() return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) && (floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) && (floatingLowest != nil || urgentLowest != nil) From 2e1780bac2596fbab4669e4742f35a8c8ff9e3f2 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 7 Jan 2025 16:01:43 +0800 Subject: [PATCH 08/11] add enable log --- core/txpool/legacypool/legacypool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 3e84c93ad..f0990dc9a 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -238,6 +238,9 @@ func (config *Config) sanitize() Config { log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute) conf.ReannounceTime = time.Minute } + if config.EnableAsyncPriced { + log.Info("Enabling async pricedlist") + } return conf } From d269be14286f1c76d3db39ebf4322a6186ebd2ab Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 7 Jan 2025 16:57:20 +0800 Subject: [PATCH 09/11] add back the 'txs' debug info --- miner/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/worker.go b/miner/worker.go index 3ea2bfc76..5cf742bcd 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1296,7 +1296,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 99d362e092f2fdf6806868f38e12e3dbe38f5eff Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 8 Jan 2025 09:25:12 +0800 Subject: [PATCH 10/11] resolve some potential thread-safe risk --- core/txpool/legacypool/async_priced_list.go | 25 +++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index 839eb52df..64ab63ed4 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -29,6 +29,7 @@ type asyncPricedList struct { priced *pricedList floatingLowest atomic.Value urgentLowest atomic.Value + baseFee atomic.Value mu sync.Mutex // events @@ -69,12 +70,7 @@ func (a *asyncPricedList) run() { for { if currentDone == nil { currentDone = make(chan struct{}) - go func(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int) { - a.handle(reheap, newOnes, toRemove, baseFee, currentDone) - <-currentDone - currentDone = nil - }(reheap, newOnes, toRemove, baseFee) - + go a.handle(reheap, newOnes, toRemove, baseFee, currentDone) reheap, newOnes, toRemove, baseFee = false, nil, 0, nil } select { @@ -89,7 +85,14 @@ func (a *asyncPricedList) run() { case baseFee = <-a.setBaseFee: + case <-currentDone: + currentDone = nil + case <-a.quit: + // Wait for current run to finish. + if currentDone != nil { + <-currentDone + } return } } @@ -122,11 +125,13 @@ func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRe } } if baseFee != nil { + a.baseFee.Store(baseFee) a.priced.SetBaseFee(baseFee) } } func (a *asyncPricedList) Staled() int { + // the Staled() of pricedList is thread-safe, so we don't need to lock here return a.priced.Staled() } @@ -181,7 +186,11 @@ func (a *asyncPricedList) SetHead(currHead *types.Header) { } func (a *asyncPricedList) GetBaseFee() *big.Int { - return a.priced.floating.baseFee + baseFee := a.baseFee.Load() + if baseFee == nil { + return big.NewInt(0) + } + return baseFee.(*big.Int) } func (a *asyncPricedList) Close() { @@ -189,5 +198,7 @@ func (a *asyncPricedList) Close() { } func (a *asyncPricedList) TxCount() int { + a.mu.Lock() + defer a.mu.Unlock() return a.priced.TxCount() } From 21ca7bc5097f8209abe22704333033f0abae5311 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 8 Jan 2025 09:27:26 +0800 Subject: [PATCH 11/11] remove useless data structs --- core/txpool/legacypool/async_priced_list.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/core/txpool/legacypool/async_priced_list.go b/core/txpool/legacypool/async_priced_list.go index 64ab63ed4..9ac5c3124 100644 --- a/core/txpool/legacypool/async_priced_list.go +++ b/core/txpool/legacypool/async_priced_list.go @@ -15,16 +15,6 @@ type addEvent struct { local bool } -type discardEvent struct { - slots int - force bool - done chan *discardResult -} -type discardResult struct { - discardTxs types.Transactions - succ bool -} - type asyncPricedList struct { priced *pricedList floatingLowest atomic.Value @@ -37,7 +27,6 @@ type asyncPricedList struct { reheap chan struct{} add chan *addEvent remove chan int - discard chan *discardEvent setBaseFee chan *big.Int } @@ -48,7 +37,6 @@ func newAsyncPricedList(all *lookup) *asyncPricedList { reheap: make(chan struct{}), add: make(chan *addEvent), remove: make(chan int), - discard: make(chan *discardEvent), setBaseFee: make(chan *big.Int), } go a.run()