From ec059a8bd44bce34312957d8e069fe64313c6a98 Mon Sep 17 00:00:00 2001 From: Kevin Jue Date: Mon, 25 Mar 2019 18:10:05 -0700 Subject: [PATCH] Kevjue/tx pool worker multicurrency fee support (#127) * tx_pool and worker changes to support multiple fee currencies * cleanup and commenting of some code * Add support C$ transaction Add support for non-native currency. Those transactions are processed if contract addresses are available and passed via command-line. * modified mobiles/types.go:NewTransaction to accept parameter gasCurrency as type *Address * fixed newTxLookup to correctly initialize it's txCurrCount field * made a few cosmetic changes based on PR feedback * tx_pool and worker changes to support multiple fee currencies * cleanup and commenting of some code * Add support C$ transaction Add support for non-native currency. Those transactions are processed if contract addresses are available and passed via command-line. * fixed newTxLookup to correctly initialize it's txCurrCount field * made a few cosmetic changes based on PR feedback * fixed geth tests so that a core.PriceComparator object is passed into core.NewTxPool * fixed lint issues. specifically ran go fmt on files that needed it * 1) Removed the miner.currencyaddresses flag 2) listed some files 3) fixed bug with parsing and storing the utils.TxPoolCurrencyAddressesFlag flag * changes based on feedback from PR --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 28 ++++++++ core/currency.go | 77 ++++++++++++++++++++ core/tx_list.go | 125 ++++++++++++++++++++++++++------- core/tx_pool.go | 62 +++++++++++----- core/tx_pool_test.go | 53 +++++++++----- core/types/transaction.go | 59 ++++++++++------ core/types/transaction_test.go | 6 +- eth/backend.go | 5 +- les/handler_test.go | 3 +- miner/miner.go | 4 +- miner/worker.go | 14 ++-- miner/worker_test.go | 6 +- 14 files changed, 347 insertions(+), 97 deletions(-) create mode 100644 core/currency.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 494afd1fb414..0f579af08b6b 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -82,6 +82,7 @@ var ( utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolCurrencyAddressesFlag, utils.SyncModeFlag, utils.GCModeFlag, utils.LightServFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index d936620b1c09..327eb61343f9 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -126,6 +126,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.TxPoolCurrencyAddressesFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b81dab1db088..d6b0aea0edd5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/fdlimit" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -298,6 +299,11 @@ var ( Usage: "Maximum amount of time non-executable transaction are queued", Value: eth.DefaultConfig.TxPool.Lifetime, } + TxPoolCurrencyAddressesFlag = cli.StringFlag{ + Name: "txpool.gascurrencyaddresses", + Usage: "Comma separated list of contract addresses of the currency accepted by the tx pool, 0x1234...,0xf4ee... etc. All addresses should start with 0x and followed by 40 hex character", + Value: "", + } // Performance tuning settings CacheFlag = cli.IntFlag{ Name: "cache", @@ -1099,6 +1105,28 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } + if ctx.GlobalIsSet(TxPoolCurrencyAddressesFlag.Name) { + currencyAddresses := make([]common.Address, 0) + // 0x1234,0x123443,... + currencies := ctx.GlobalString(TxPoolCurrencyAddressesFlag.Name) + currencyAddressesAsString := strings.Split(currencies, ",") + // Validation + for i := range currencyAddressesAsString { + currencyAddress := currencyAddressesAsString[i] + + // MustDecode will check for a "0x" prefix and all the remaining characters are valid for an address + addressBytes := hexutil.MustDecode(currencyAddress) + + // "0x" followed by 40 hex characters. + if len(currencyAddress[2:]) != common.AddressLength*2 { + panic(fmt.Sprintf("Incorrect currency code, it does not has 40 characters: \"%s\"", currencyAddress)) + } + + currencyAddresses = append(currencyAddresses, common.BytesToAddress(addressBytes)) + } + cfg.CurrencyAddresses = ¤cyAddresses + log.Debug("Currencies parsed", "currencyAddresses", currencyAddressesAsString) + } } func setEthash(ctx *cli.Context, cfg *eth.Config) { diff --git a/core/currency.go b/core/currency.go new file mode 100644 index 000000000000..77694dde063c --- /dev/null +++ b/core/currency.go @@ -0,0 +1,77 @@ +// Copyright 2017 The Celo Authors +// This file is part of the celo library. +// +// The celo library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The celo library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the celo library. If not, see . + +package core + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) + +var ( + cgExchangeRateNum = big.NewInt(1) + cgExchangeRateDen = big.NewInt(1) +) + +type exchangeRate struct { + Numerator *big.Int + Denominator *big.Int +} + +type PriceComparator struct { + exchangeRates map[common.Address]*exchangeRate // indexedCurrency:CeloGold exchange rate +} + +func (pc *PriceComparator) getNumDenom(currency *common.Address) (*big.Int, *big.Int) { + if currency == nil { + return cgExchangeRateNum, cgExchangeRateDen + } else { + exchangeRate := pc.exchangeRates[*currency] + return exchangeRate.Numerator, exchangeRate.Denominator + } +} + +func (pc *PriceComparator) Cmp(val1 *big.Int, currency1 *common.Address, val2 *big.Int, currency2 *common.Address) int { + if currency1 == currency2 { + return val1.Cmp(val2) + } + + exchangeRate1Num, exchangeRate1Den := pc.getNumDenom(currency1) + exchangeRate2Num, exchangeRate2Den := pc.getNumDenom(currency2) + + // Below code block is basically evaluating this comparison: + // val1 * exchangeRate1Num/exchangeRate1Den < val2 * exchangeRate2Num/exchangeRate2Den + // It will transform that comparison to this, to remove having to deal with fractional values. + // val1 * exchangeRate1Num * exchangeRate2Den < val2 * exchangeRate2Num * exchangeRate1Den + leftSide := new(big.Int).Mul(val1, new(big.Int).Mul(exchangeRate1Num, exchangeRate2Den)) + rightSide := new(big.Int).Mul(val2, new(big.Int).Mul(exchangeRate2Num, exchangeRate1Den)) + return leftSide.Cmp(rightSide) +} + +func NewPriceComparator() *PriceComparator { + // TODO(kevjue): Integrate implementation of issue https://github.com/celo-org/celo-monorepo/issues/2706, so that the + // exchange rate is retrieved from the smart contract. + // For now, hard coding in some exchange rates. Will modify this to retrieve the + // exchange rates from the Celo's exchange smart contract. + // C$ will have a 2:1 exchange rate with CG + exchangeRates := make(map[common.Address]*exchangeRate) + exchangeRates[common.HexToAddress("0x0000000000000000000000000000000ce10d011a")] = &exchangeRate{Numerator: big.NewInt(2), Denominator: big.NewInt(1)} + + return &PriceComparator{ + exchangeRates: exchangeRates, + } +} diff --git a/core/tx_list.go b/core/tx_list.go index e3f8088a3faf..390aee7a0ec1 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -407,22 +407,34 @@ func (h *priceHeap) Pop() interface{} { // txPricedList is a price-sorted heap to allow operating on transactions pool // contents in a price-incrementing way. type txPricedList struct { - all *txLookup // Pointer to the map of all transactions - items *priceHeap // Heap of prices of all the stored transactions - stales int // Number of stale price points to (re-heap trigger) + all *txLookup // Pointer to the map of all transactions + heaps map[common.Address]*priceHeap // Heap of prices of all the stored transactions + stales int // Number of stale price points to (re-heap trigger) + pc *PriceComparator // Comparator object used to compare prices that are using different currencies } // newTxPricedList creates a new price-sorted transaction heap. -func newTxPricedList(all *txLookup) *txPricedList { +func newTxPricedList(all *txLookup, pc *PriceComparator) *txPricedList { return &txPricedList{ all: all, - items: new(priceHeap), + heaps: make(map[common.Address]*priceHeap), + pc: pc, } } +// Gets the price heap for the given currency +func (l *txPricedList) getPriceHeap(tx *types.Transaction) *priceHeap { + gasCurrency := *(tx.NonNilGasCurrency()) + if _, ok := l.heaps[gasCurrency]; !ok { + l.heaps[gasCurrency] = new(priceHeap) + } + return l.heaps[gasCurrency] +} + // Put inserts a new transaction into the heap. func (l *txPricedList) Put(tx *types.Transaction) { - heap.Push(l.items, tx) + pHeap := l.getPriceHeap(tx) + heap.Push(pHeap, tx) } // Removed notifies the prices transaction list that an old transaction dropped @@ -431,38 +443,47 @@ func (l *txPricedList) Put(tx *types.Transaction) { func (l *txPricedList) Removed() { // Bump the stale counter, but exit if still too low (< 25%) l.stales++ - if l.stales <= len(*l.items)/4 { + if l.stales <= l.Len()/4 { return } // Seems we've reached a critical number of stale transactions, reheap - reheap := make(priceHeap, 0, l.all.Count()) + reheapMap := make(map[common.Address]*priceHeap) + for gasCurrency, count := range l.all.txCurrCount { + reheap := make(priceHeap, 0, count) + reheapMap[gasCurrency] = &reheap + } - l.stales, l.items = 0, &reheap + l.stales, l.heaps = 0, reheapMap l.all.Range(func(hash common.Hash, tx *types.Transaction) bool { - *l.items = append(*l.items, tx) + pHeap := l.getPriceHeap(tx) + *pHeap = append(*pHeap, tx) return true }) - heap.Init(l.items) + + for _, h := range l.heaps { + heap.Init(h) + } } -// Cap finds all the transactions below the given price threshold, drops them +// Cap finds all the transactions below the given celo gold price threshold, drops them // from the priced list and returns them for further removal from the entire pool. -func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions { +func (l *txPricedList) Cap(cgThreshold *big.Int, local *accountSet) types.Transactions { drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep - for len(*l.items) > 0 { + for l.Len() > 0 { // Discard stale transactions if found during cleanup - tx := heap.Pop(l.items).(*types.Transaction) + tx := l.pop() if l.all.Get(tx.Hash()) == nil { l.stales-- continue } - // Stop the discards if we've reached the threshold - if tx.GasPrice().Cmp(threshold) >= 0 { + + if l.pc.Cmp(tx.GasPrice(), tx.GasCurrency(), cgThreshold, nil) >= 0 { save = append(save, tx) break } + // Non stale transaction found, discard unless local if local.containsTx(tx) { save = append(save, tx) @@ -471,7 +492,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact } } for _, tx := range save { - heap.Push(l.items, tx) + l.Put(tx) } return drop } @@ -484,22 +505,23 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo return false } // Discard stale price points if found at the heap start - for len(*l.items) > 0 { - head := []*types.Transaction(*l.items)[0] + for l.Len() > 0 { + head := l.getMinPricedTx() if l.all.Get(head.Hash()) == nil { l.stales-- - heap.Pop(l.items) + l.pop() continue } break } // Check if the transaction is underpriced or not - if len(*l.items) == 0 { + if l.Len() == 0 { log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors return false } - cheapest := []*types.Transaction(*l.items)[0] - return cheapest.GasPrice().Cmp(tx.GasPrice()) >= 0 + + cheapest := l.getMinPricedTx() + return l.pc.Cmp(cheapest.GasPrice(), cheapest.GasCurrency(), tx.GasPrice(), tx.GasCurrency()) >= 0 } // Discard finds a number of most underpriced transactions, removes them from the @@ -508,9 +530,9 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep - for len(*l.items) > 0 && count > 0 { + for l.Len() > 0 && count > 0 { // Discard stale transactions if found during cleanup - tx := heap.Pop(l.items).(*types.Transaction) + tx := l.pop() if l.all.Get(tx.Hash()) == nil { l.stales-- continue @@ -524,7 +546,56 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions } } for _, tx := range save { - heap.Push(l.items, tx) + l.Put(tx) } return drop } + +// Retrieves the heap with the lowest normalized price at it's head +func (l *txPricedList) getHeapWithMinHead() (*priceHeap, *types.Transaction) { + var cheapestHeap *priceHeap = nil + var cheapestTxn *types.Transaction = nil + for _, priceHeap := range l.heaps { + if len(*priceHeap) > 0 { + if cheapestHeap == nil { + cheapestHeap = priceHeap + cheapestTxn = []*types.Transaction(*cheapestHeap)[0] + } else { + txn := []*types.Transaction(*priceHeap)[0] + if l.pc.Cmp(cheapestTxn.GasPrice(), cheapestTxn.GasCurrency(), txn.GasPrice(), txn.GasCurrency()) < 0 { + cheapestHeap = priceHeap + } + } + } + } + + return cheapestHeap, cheapestTxn +} + +// Retrieves the tx with the lowest normalized price among all the heaps +func (l *txPricedList) getMinPricedTx() *types.Transaction { + _, minTx := l.getHeapWithMinHead() + + return minTx +} + +// Retrieves the total number of txns within the priced list +func (l *txPricedList) Len() int { + totalLen := 0 + for _, h := range l.heaps { + totalLen += len(*h) + } + + return totalLen +} + +// Pops the tx with the lowest normalized price. +func (l *txPricedList) pop() *types.Transaction { + cheapestHeap, _ := l.getHeapWithMinHead() + + if cheapestHeap != nil { + return heap.Pop(cheapestHeap).(*types.Transaction) + } else { + return nil + } +} diff --git a/core/tx_pool.go b/core/tx_pool.go index f062cae126bd..65655bb5c5ac 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -76,6 +76,9 @@ var ( // than some meaningful limit a user might use. This is not a consensus error // making the transaction invalid, rather a DOS protection. ErrOversizedData = errors.New("oversized data") + + // ErrUnregisteredGasCurrency is returned if the txn gas currency is not registered + ErrUnregisteredGasCurrency = errors.New("unregistered gas currency") ) var ( @@ -137,6 +140,8 @@ type TxPoolConfig struct { GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + + CurrencyAddresses *[]common.Address // The addresses of all the currencies that are accepted by the node } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -225,38 +230,50 @@ type TxPool struct { queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + priced *txPricedList // All transactions sorted by price. One heap per gas currency. wg sync.WaitGroup // for shutdown sync homestead bool + + pc *PriceComparator + currencyAddresses map[common.Address]bool } // NewTxPool creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, pc *PriceComparator) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + pc: pc, + currencyAddresses: make(map[common.Address]bool), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) pool.locals.add(addr) } - pool.priced = newTxPricedList(pool.all) + pool.priced = newTxPricedList(pool.all, pool.pc) + + if config.CurrencyAddresses != nil { + for _, address := range *config.CurrencyAddresses { + pool.currencyAddresses[address] = true + } + } + pool.reset(nil, chain.CurrentBlock().Header()) // If local transactions and journaling is enabled, load from disk @@ -605,9 +622,15 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if err != nil { return ErrInvalidSender } + + if tx.GasCurrency() != nil && // Non native gas in the tx + !pool.currencyAddresses[*tx.GasCurrency()] { // The tx currency is not in the user specified list + return ErrUnregisteredGasCurrency + } + // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network - if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + if !local && pool.pc.Cmp(pool.gasPrice, nil, tx.GasPrice(), tx.GasCurrency()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering @@ -1238,14 +1261,16 @@ func (as *accountSet) flatten() []common.Address { // peeking into the pool in TxPool.Get without having to acquire the widely scoped // TxPool.mu mutex. type txLookup struct { - all map[common.Hash]*types.Transaction - lock sync.RWMutex + all map[common.Hash]*types.Transaction + txCurrCount map[common.Address]uint64 + lock sync.RWMutex } // newTxLookup returns a new txLookup structure. func newTxLookup() *txLookup { return &txLookup{ - all: make(map[common.Hash]*types.Transaction), + all: make(map[common.Hash]*types.Transaction), + txCurrCount: make(map[common.Address]uint64), } } @@ -1282,6 +1307,7 @@ func (t *txLookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() + t.txCurrCount[*(tx.NonNilGasCurrency())]++ t.all[tx.Hash()] = tx } @@ -1290,5 +1316,7 @@ func (t *txLookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() + t.txCurrCount[*(t.all[hash].NonNilGasCurrency())]-- + delete(t.all, hash) } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index eb7f7fab628b..9d329ab3a8ea 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -82,7 +82,8 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) return pool, key } @@ -97,7 +98,7 @@ func validateTxPoolInternals(pool *TxPool) error { if total := pool.all.Count(); total != pending+queued { return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) } - if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued { + if priced := pool.priced.Len() - pool.priced.stales; priced != pending+queued { return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued) } // Ensure the next nonce to assign is the correct one @@ -192,7 +193,8 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { tx0 := transaction(0, 100000, key) tx1 := transaction(1, 100000, key) - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -557,7 +559,8 @@ func TestTransactionPostponing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create two test accounts to produce different gap profiles with @@ -776,7 +779,8 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) @@ -864,7 +868,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not @@ -1017,7 +1022,8 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create a number of test accounts and fund them @@ -1065,7 +1071,8 @@ func TestTransactionCapClearsFromAll(t *testing.T) { config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create a number of test accounts and fund them @@ -1097,7 +1104,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = 1 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create a number of test accounts and fund them @@ -1142,7 +1150,8 @@ func TestTransactionPoolRepricing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1263,7 +1272,8 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create a number of test accounts and fund them @@ -1329,7 +1339,8 @@ func TestTransactionPoolUnderpricing(t *testing.T) { config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1435,7 +1446,8 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { config.GlobalSlots = 128 config.GlobalQueue = 0 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1497,7 +1509,8 @@ func TestTransactionReplacement(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1596,7 +1609,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(config, params.TestChainConfig, blockchain, pc) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -1633,7 +1647,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pc = NewPriceComparator() + pool = NewTxPool(config, params.TestChainConfig, blockchain, pc) pending, queued = pool.Stats() if queued != 0 { @@ -1659,7 +1674,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pc = NewPriceComparator() + pool = NewTxPool(config, params.TestChainConfig, blockchain, pc) pending, queued = pool.Stats() if pending != 0 { @@ -1689,7 +1705,8 @@ func TestTransactionStatusCheck(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pc := NewPriceComparator() + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, pc) defer pool.Stop() // Create the test accounts to check various transaction statuses with diff --git a/core/types/transaction.go b/core/types/transaction.go index 9d851a8d876e..49e6bb340a63 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -175,15 +176,15 @@ func (tx *Transaction) UnmarshalJSON(input []byte) error { return nil } -func (tx *Transaction) Data() []byte { return common.CopyBytes(tx.data.Payload) } -func (tx *Transaction) Gas() uint64 { return tx.data.GasLimit } -func (tx *Transaction) GasPrice() *big.Int { return new(big.Int).Set(tx.data.Price) } -func (tx *Transaction) GasCurrency() *common.Address { +func (tx *Transaction) Data() []byte { return common.CopyBytes(tx.data.Payload) } +func (tx *Transaction) Gas() uint64 { return tx.data.GasLimit } +func (tx *Transaction) GasPrice() *big.Int { return new(big.Int).Set(tx.data.Price) } +func (tx *Transaction) GasCurrency() *common.Address { return tx.data.GasCurrency } +func (tx *Transaction) NonNilGasCurrency() *common.Address { if tx.data.GasCurrency == nil { - return nil + return ¶ms.AuthorizedTransferAddress } else { - address := common.BytesToAddress(tx.data.GasCurrency.Bytes()) - return &address + return tx.data.GasCurrency } } func (tx *Transaction) Value() *big.Int { return new(big.Int).Set(tx.data.Amount) } @@ -312,24 +313,35 @@ func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // TxByPrice implements both the sort and the heap interface, making it useful // for all at once sorting as well as individually adding and removing elements. -type TxByPrice Transactions +type TxByPrice struct { + txs Transactions + txCmpFunc func(tx1, tx2 *Transaction) int +} -func (s TxByPrice) Len() int { return len(s) } -func (s TxByPrice) Less(i, j int) bool { return s[i].data.Price.Cmp(s[j].data.Price) > 0 } -func (s TxByPrice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s TxByPrice) Len() int { return len(s.txs) } +func (s TxByPrice) Less(i, j int) bool { return s.txCmpFunc(s.txs[i], s.txs[j]) > 0 } +func (s TxByPrice) Swap(i, j int) { s.txs[i], s.txs[j] = s.txs[j], s.txs[i] } func (s *TxByPrice) Push(x interface{}) { - *s = append(*s, x.(*Transaction)) + s.txs = append(s.txs, x.(*Transaction)) } func (s *TxByPrice) Pop() interface{} { - old := *s + old := s.txs n := len(old) x := old[n-1] - *s = old[0 : n-1] + s.txs = old[0 : n-1] return x } +func (s *TxByPrice) Peek() *Transaction { + return s.txs[0] +} + +func (s *TxByPrice) Add(tx *Transaction) { + s.txs[0] = tx +} + // TransactionsByPriceAndNonce represents a set of transactions that can return // transactions in a profit-maximizing sorted order, while supporting removing // entire batches of transactions for non-executable accounts. @@ -344,11 +356,14 @@ type TransactionsByPriceAndNonce struct { // // Note, the input map is reowned so the caller should not interact any more with // if after providing it to the constructor. -func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) *TransactionsByPriceAndNonce { +func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, txCmpFunc func(tx1, tx2 *Transaction) int) *TransactionsByPriceAndNonce { // Initialize a price based heap with the head transactions - heads := make(TxByPrice, 0, len(txs)) + heads := TxByPrice{ + txs: make(Transactions, 0, len(txs)), + txCmpFunc: txCmpFunc, + } for from, accTxs := range txs { - heads = append(heads, accTxs[0]) + heads.Push(accTxs[0]) // Ensure the sender address is from the signer acc, _ := Sender(signer, accTxs[0]) txs[acc] = accTxs[1:] @@ -368,17 +383,19 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa // Peek returns the next transaction by price. func (t *TransactionsByPriceAndNonce) Peek() *Transaction { - if len(t.heads) == 0 { + if t.heads.Len() == 0 { return nil } - return t.heads[0] + return t.heads.Peek() } // Shift replaces the current best head with the next one from the same account. func (t *TransactionsByPriceAndNonce) Shift() { - acc, _ := Sender(t.signer, t.heads[0]) + acc, _ := Sender(t.signer, t.heads.Peek()) if txs, ok := t.txs[acc]; ok && len(txs) > 0 { - t.heads[0], t.txs[acc] = txs[0], txs[1:] + next := txs[0] + t.txs[acc] = txs[1:] + t.heads.Add(next) heap.Fix(&t.heads, 0) } else { heap.Pop(&t.heads) diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index b6d303a8a962..848e97e1fecf 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -49,7 +49,7 @@ var ( common.FromHex("5544"), ).WithSignature( HomesteadSigner{}, - common.Hex2Bytes("693987a6a1238f28b63fd27c43b7e96bc2611ade7ef18d5ddaab55fc9eca0d3d1c740fb1a05c5fbccc00e7b0116b5b7e69a93e435c5c4f3bbe96c5909f910e1e00"), + common.Hex2Bytes("98ff921201554726367d2be8c804a7ff89ccf285ebc57dff8ae4c44b9c19ac4a8887321be575c8095f789dd4c743dfe42c1820f9231f98a962b210e3ac2452a301"), ) ) @@ -68,7 +68,7 @@ func TestTransactionEncode(t *testing.T) { if err != nil { t.Fatalf("encode error: %v", err) } - should := common.FromHex("f86203018207d08094b94f5374fce5edbc8e2a8697c15331677e6ebf0b0a8255441ba0693987a6a1238f28b63fd27c43b7e96bc2611ade7ef18d5ddaab55fc9eca0d3da01c740fb1a05c5fbccc00e7b0116b5b7e69a93e435c5c4f3bbe96c5909f910e1e") + should := common.FromHex("f86203018207d08094b94f5374fce5edbc8e2a8697c15331677e6ebf0b0a8255441ca098ff921201554726367d2be8c804a7ff89ccf285ebc57dff8ae4c44b9c19ac4aa08887321be575c8095f789dd4c743dfe42c1820f9231f98a962b210e3ac2452a3") if !bytes.Equal(txb, should) { t.Errorf("encoded RLP mismatch, got %x", txb) } @@ -146,7 +146,7 @@ func TestTransactionPriceNonceSort(t *testing.T) { } } // Sort the transactions and cross check the nonce ordering - txset := NewTransactionsByPriceAndNonce(signer, groups) + txset := NewTransactionsByPriceAndNonce(signer, groups, func(tx1, tx2 *Transaction) int { return tx1.GasPrice().Cmp(tx2.GasPrice()) }) txs := Transactions{} for tx := txset.Peek(); tx != nil; tx = txset.Peek() { diff --git a/eth/backend.go b/eth/backend.go index f3ea38f4cee2..53681a082249 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -173,12 +173,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) + pc := core.NewPriceComparator() + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain, pc) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil, eth.isLocalBlock, config.MinerVerificationServiceUrl, config.MinerVerificationRewards) + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil, eth.isLocalBlock, config.MinerVerificationServiceUrl, config.MinerVerificationRewards, pc) eth.miner.SetExtra(makeExtraData(config.MinerExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} diff --git a/les/handler_test.go b/les/handler_test.go index f4810299d6f1..200e013c0cb0 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -498,7 +498,8 @@ func TestTransactionStatusLes2(t *testing.T) { chain := pm.blockchain.(*core.BlockChain) config := core.DefaultTxPoolConfig config.Journal = "" - txpool := core.NewTxPool(config, params.TestChainConfig, chain) + pc := core.NewPriceComparator() + txpool := core.NewTxPool(config, params.TestChainConfig, chain, pc) pm.txpool = txpool peer, _ := newTestPeer(t, "peer", 2, pm, true) defer peer.close() diff --git a/miner/miner.go b/miner/miner.go index 763a3eb42e04..c7b1a47d482a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -54,13 +54,13 @@ type Miner struct { shouldStart int32 // should start indicates whether we should start after sync } -func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool, verificationService string, verificationRewards common.Address) *Miner { +func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool, verificationService string, verificationRewards common.Address, pc *core.PriceComparator) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, exitCh: make(chan struct{}), - worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock, verificationService, verificationRewards), + worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock, verificationService, verificationRewards, pc), canStart: 1, } go miner.update() diff --git a/miner/worker.go b/miner/worker.go index 1ea533cfddbf..d2399cf0f393 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -182,9 +182,10 @@ type worker struct { // Verification Service verificationService string verificationRewards common.Address + pc *core.PriceComparator } -func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool, verificationService string, verificationRewards common.Address) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool, verificationService string, verificationRewards common.Address, pc *core.PriceComparator) *worker { worker := &worker{ config: config, engine: engine, @@ -210,6 +211,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + pc: pc, } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -294,6 +296,10 @@ func (w *worker) close() { close(w.exitCh) } +func (w *worker) txCmp(tx1 *types.Transaction, tx2 *types.Transaction) int { + return w.pc.Cmp(tx1.GasPrice(), tx1.GasCurrency(), tx2.GasPrice(), tx2.GasCurrency()) +} + // newWorkLoop is a standalone goroutine to submit new mining work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { var ( @@ -472,7 +478,7 @@ func (w *worker) mainLoop() { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs) + txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.txCmp) w.commitTransactions(txset, coinbase, nil) w.updateSnapshot() } else { @@ -949,13 +955,13 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) } } if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, w.txCmp) if w.commitTransactions(txs, w.coinbase, interrupt) { return } } if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, w.txCmp) if w.commitTransactions(txs, w.coinbase, interrupt) { return } diff --git a/miner/worker_test.go b/miner/worker_test.go index 52bc6113fcd1..2e77bcfa47e1 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -104,7 +104,8 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine genesis := gspec.MustCommit(db) chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}, nil) - txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain) + pc := core.NewPriceComparator() + txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain, pc) // Generate a small n-block chain and an uncle block for it if n > 0 { @@ -144,7 +145,8 @@ func (b *testWorkerBackend) PostChainEvents(events []interface{}) { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, blocks int) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, blocks) backend.txPool.AddLocals(pendingTxs) - w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second, params.GenesisGasLimit, params.GenesisGasLimit, nil, testVerificationService, testVerificationRewardsAddress) + pc := core.NewPriceComparator() + w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second, params.GenesisGasLimit, params.GenesisGasLimit, nil, testVerificationService, testVerificationRewardsAddress, pc) w.setEtherbase(testBankAddress) return w, backend }