Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool opt async priced #246

Merged
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolEnableAsyncPricedFlag,
utils.TxPoolAccountSlotsFlag,
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ var (
Value: ethconfig.Defaults.TxPool.PriceBump,
Category: flags.TxPoolCategory,
}
TxPoolEnableAsyncPricedFlag = &cli.BoolFlag{
Name: "txpool.asyncpriced",
Usage: "enable async-priced-sorted list for txpool",
Value: false,
Category: flags.TxPoolCategory,
}
TxPoolAccountSlotsFlag = &cli.Uint64Flag{
Name: "txpool.accountslots",
Usage: "Minimum number of executable transaction slots guaranteed per account",
Expand Down Expand Up @@ -1723,6 +1729,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolPriceBumpFlag.Name) {
cfg.PriceBump = ctx.Uint64(TxPoolPriceBumpFlag.Name)
}
if ctx.IsSet(TxPoolEnableAsyncPricedFlag.Name) {
cfg.EnableAsyncPriced = ctx.Bool(TxPoolEnableAsyncPricedFlag.Name)
}
if ctx.IsSet(TxPoolAccountSlotsFlag.Name) {
cfg.AccountSlots = ctx.Uint64(TxPoolAccountSlotsFlag.Name)
}
Expand Down
191 changes: 191 additions & 0 deletions core/txpool/legacypool/async_priced_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package legacypool

import (
"math/big"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/core/types"
)

var _ pricedListInterface = &asyncPricedList{}

type addEvent struct {
tx *types.Transaction
local bool
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
}

type discardEvent struct {
slots int
force bool
done chan *discardResult
}
type discardResult struct {
discardTxs types.Transactions
succ bool
}

type asyncPricedList struct {
priced *pricedList
floatingLowest atomic.Value
urgentLowest atomic.Value
mu sync.Mutex

// events
quit chan struct{}
reheap chan struct{}
add chan *addEvent
remove chan int
discard chan *discardEvent
setBaseFee chan *big.Int
}

func newAsyncPricedList(all *lookup) *asyncPricedList {
a := &asyncPricedList{
priced: newPricedList(all),
quit: make(chan struct{}),
reheap: make(chan struct{}),
add: make(chan *addEvent),
remove: make(chan int),
discard: make(chan *discardEvent),
setBaseFee: make(chan *big.Int),
}
go a.run()
return a
}

// run is a loop that handles async operations:
// - reheap: reheap the whole priced list, to get the lowest gas price
// - put: add a transaction to the priced list
// - remove: remove transactions from the priced list
// - discard: remove transactions to make room for new ones
func (a *asyncPricedList) run() {
var reheap bool
var newOnes []*types.Transaction
var toRemove int = 0
// current loop state
var currentDone chan struct{} = nil
var baseFee *big.Int = nil
for {
if currentDone == nil {
currentDone = make(chan struct{})
go func(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int) {
a.handle(reheap, newOnes, toRemove, baseFee, currentDone)
<-currentDone
currentDone = nil
}(reheap, newOnes, toRemove, baseFee)

reheap, newOnes, toRemove, baseFee = false, nil, 0, nil
}
select {
case <-a.reheap:
reheap = true

case add := <-a.add:
newOnes = append(newOnes, add.tx)

case remove := <-a.remove:
toRemove += remove

case baseFee = <-a.setBaseFee:

case <-a.quit:
return
}
}
}

func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int, finished chan struct{}) {
defer close(finished)
a.mu.Lock()
defer a.mu.Unlock()
// add new transactions to the priced list
for _, tx := range newOnes {
a.priced.Put(tx, false)
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
}
// remove staled transactions from the priced list
a.priced.Removed(toRemove)
// reheap if needed
if reheap {
a.priced.Reheap()
// set the lowest priced transaction when reheap is done
var emptyTx *types.Transaction = nil
if len(a.priced.floating.list) > 0 {
a.floatingLowest.Store(a.priced.floating.list[0])
} else {
a.floatingLowest.Store(emptyTx)
}
if len(a.priced.urgent.list) > 0 {
a.urgentLowest.Store(a.priced.urgent.list[0])
} else {
a.urgentLowest.Store(emptyTx)
}
}
if baseFee != nil {
a.priced.SetBaseFee(baseFee)
}
}

func (a *asyncPricedList) Staled() int {
return a.priced.Staled()
}

func (a *asyncPricedList) Put(tx *types.Transaction, local bool) {
a.add <- &addEvent{tx, local}
}

func (a *asyncPricedList) Removed(count int) {
a.remove <- count
}

func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool {
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
var urgentLowest, floatingLowest *types.Transaction = nil, nil
ul, fl := a.urgentLowest.Load(), a.floatingLowest.Load()
if ul != nil {
// be careful that ul might be nil
urgentLowest = ul.(*types.Transaction)
}
if fl != nil {
// be careful that fl might be nil
floatingLowest = fl.(*types.Transaction)
}
return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) &&
(floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) &&
(floatingLowest != nil || urgentLowest != nil)
}

// Disacard cleans staled transactions to make room for new ones
func (a *asyncPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
a.mu.Lock()
defer a.mu.Unlock()
return a.priced.Discard(slots, force)
}

func (a *asyncPricedList) NeedReheap(currHead *types.Header) bool {
return false
}

func (a *asyncPricedList) Reheap() {
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetBaseFee(baseFee *big.Int) {
a.setBaseFee <- baseFee
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetHead(currHead *types.Header) {
//do nothing
}

func (a *asyncPricedList) GetBaseFee() *big.Int {
return a.priced.floating.baseFee
}

func (a *asyncPricedList) Close() {
close(a.quit)
}

func (a *asyncPricedList) TxCount() int {
return a.priced.TxCount()
}
27 changes: 13 additions & 14 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
EnableAsyncPriced bool // enable async pricedlist. Set as true only --txpool.enableasyncpriced option is enabled

Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Expand Down Expand Up @@ -268,7 +270,7 @@ type LegacyPool struct {
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
priced pricedListInterface // All transactions sorted by price

pendingCache *cacheForMiner //pending list cache for miner

Expand Down Expand Up @@ -319,7 +321,11 @@ func New(config Config, chain BlockChain) *LegacyPool {
pool.locals.add(addr)
pool.pendingCache.markLocal(addr)
}
pool.priced = newPricedList(pool.all)
if config.EnableAsyncPriced {
pool.priced = newAsyncPricedList(pool.all)
} else {
pool.priced = newPricedList(pool.all)
}

if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
Expand Down Expand Up @@ -426,6 +432,7 @@ func (pool *LegacyPool) loop() {
defer evict.Stop()
defer journal.Stop()
defer reannounce.Stop()
defer pool.priced.Close()

// Notify tests that the init phase is done
close(pool.initDoneCh)
Expand All @@ -440,7 +447,7 @@ func (pool *LegacyPool) loop() {
pool.mu.RLock()
pending, queued := pool.stats()
pool.mu.RUnlock()
stales := int(pool.priced.stales.Load())
stales := pool.priced.Staled()

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down Expand Up @@ -839,16 +846,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
currHead := pool.currentHead.Load()
if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) {
if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) {
baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1)
pool.priced.SetBaseFee(baseFee)
}
pool.priced.Reheap()
pool.priced.currHead = currHead
}

// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
Expand Down Expand Up @@ -1462,11 +1459,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
demoteTimer.UpdateSince(t0)
var pendingBaseFee = pool.priced.urgent.baseFee
var pendingBaseFee = pool.priced.GetBaseFee()
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
Expand Down
Loading