Skip to content

Commit

Permalink
Kevjue/tx pool worker multicurrency fee support (ethereum#127)
Browse files Browse the repository at this point in the history
* tx_pool and worker changes to support multiple fee currencies

* cleanup and commenting of some code

* Add support C$ transaction

Add support for non-native currency. Those transactions are processed if
contract addresses are available and passed via command-line.

* modified mobiles/types.go:NewTransaction to accept parameter gasCurrency as type *Address

* fixed newTxLookup to correctly initialize it's txCurrCount field

* made a few cosmetic changes based on PR feedback

* tx_pool and worker changes to support multiple fee currencies

* cleanup and commenting of some code

* Add support C$ transaction

Add support for non-native currency. Those transactions are processed if
contract addresses are available and passed via command-line.

* fixed newTxLookup to correctly initialize it's txCurrCount field

* made a few cosmetic changes based on PR feedback

* fixed geth tests so that a core.PriceComparator object is passed into core.NewTxPool

* fixed lint issues.  specifically ran go fmt on files that needed it

* 1) Removed the miner.currencyaddresses flag  2) listed some files   3) fixed bug with parsing and storing the utils.TxPoolCurrencyAddressesFlag flag

* changes based on feedback from PR
  • Loading branch information
kevjue authored Mar 26, 2019
1 parent 0c824e1 commit ec059a8
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 97 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolCurrencyAddressesFlag,
utils.SyncModeFlag,
utils.GCModeFlag,
utils.LightServFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolCurrencyAddressesFlag,
},
},
{
Expand Down
28 changes: 28 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/fdlimit"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
Expand Down Expand Up @@ -298,6 +299,11 @@ var (
Usage: "Maximum amount of time non-executable transaction are queued",
Value: eth.DefaultConfig.TxPool.Lifetime,
}
TxPoolCurrencyAddressesFlag = cli.StringFlag{
Name: "txpool.gascurrencyaddresses",
Usage: "Comma separated list of contract addresses of the currency accepted by the tx pool, 0x1234...,0xf4ee... etc. All addresses should start with 0x and followed by 40 hex character",
Value: "",
}
// Performance tuning settings
CacheFlag = cli.IntFlag{
Name: "cache",
Expand Down Expand Up @@ -1099,6 +1105,28 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}
if ctx.GlobalIsSet(TxPoolCurrencyAddressesFlag.Name) {
currencyAddresses := make([]common.Address, 0)
// 0x1234,0x123443,...
currencies := ctx.GlobalString(TxPoolCurrencyAddressesFlag.Name)
currencyAddressesAsString := strings.Split(currencies, ",")
// Validation
for i := range currencyAddressesAsString {
currencyAddress := currencyAddressesAsString[i]

// MustDecode will check for a "0x" prefix and all the remaining characters are valid for an address
addressBytes := hexutil.MustDecode(currencyAddress)

// "0x" followed by 40 hex characters.
if len(currencyAddress[2:]) != common.AddressLength*2 {
panic(fmt.Sprintf("Incorrect currency code, it does not has 40 characters: \"%s\"", currencyAddress))
}

currencyAddresses = append(currencyAddresses, common.BytesToAddress(addressBytes))
}
cfg.CurrencyAddresses = &currencyAddresses
log.Debug("Currencies parsed", "currencyAddresses", currencyAddressesAsString)
}
}

func setEthash(ctx *cli.Context, cfg *eth.Config) {
Expand Down
77 changes: 77 additions & 0 deletions core/currency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2017 The Celo Authors
// This file is part of the celo library.
//
// The celo library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The celo library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the celo library. If not, see <http://www.gnu.org/licenses/>.

package core

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
)

var (
cgExchangeRateNum = big.NewInt(1)
cgExchangeRateDen = big.NewInt(1)
)

type exchangeRate struct {
Numerator *big.Int
Denominator *big.Int
}

type PriceComparator struct {
exchangeRates map[common.Address]*exchangeRate // indexedCurrency:CeloGold exchange rate
}

func (pc *PriceComparator) getNumDenom(currency *common.Address) (*big.Int, *big.Int) {
if currency == nil {
return cgExchangeRateNum, cgExchangeRateDen
} else {
exchangeRate := pc.exchangeRates[*currency]
return exchangeRate.Numerator, exchangeRate.Denominator
}
}

func (pc *PriceComparator) Cmp(val1 *big.Int, currency1 *common.Address, val2 *big.Int, currency2 *common.Address) int {
if currency1 == currency2 {
return val1.Cmp(val2)
}

exchangeRate1Num, exchangeRate1Den := pc.getNumDenom(currency1)
exchangeRate2Num, exchangeRate2Den := pc.getNumDenom(currency2)

// Below code block is basically evaluating this comparison:
// val1 * exchangeRate1Num/exchangeRate1Den < val2 * exchangeRate2Num/exchangeRate2Den
// It will transform that comparison to this, to remove having to deal with fractional values.
// val1 * exchangeRate1Num * exchangeRate2Den < val2 * exchangeRate2Num * exchangeRate1Den
leftSide := new(big.Int).Mul(val1, new(big.Int).Mul(exchangeRate1Num, exchangeRate2Den))
rightSide := new(big.Int).Mul(val2, new(big.Int).Mul(exchangeRate2Num, exchangeRate1Den))
return leftSide.Cmp(rightSide)
}

func NewPriceComparator() *PriceComparator {
// TODO(kevjue): Integrate implementation of issue https://github.com/celo-org/celo-monorepo/issues/2706, so that the
// exchange rate is retrieved from the smart contract.
// For now, hard coding in some exchange rates. Will modify this to retrieve the
// exchange rates from the Celo's exchange smart contract.
// C$ will have a 2:1 exchange rate with CG
exchangeRates := make(map[common.Address]*exchangeRate)
exchangeRates[common.HexToAddress("0x0000000000000000000000000000000ce10d011a")] = &exchangeRate{Numerator: big.NewInt(2), Denominator: big.NewInt(1)}

return &PriceComparator{
exchangeRates: exchangeRates,
}
}
125 changes: 98 additions & 27 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,22 +407,34 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
heaps map[common.Address]*priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
pc *PriceComparator // Comparator object used to compare prices that are using different currencies
}

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList {
func newTxPricedList(all *txLookup, pc *PriceComparator) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
heaps: make(map[common.Address]*priceHeap),
pc: pc,
}
}

// Gets the price heap for the given currency
func (l *txPricedList) getPriceHeap(tx *types.Transaction) *priceHeap {
gasCurrency := *(tx.NonNilGasCurrency())
if _, ok := l.heaps[gasCurrency]; !ok {
l.heaps[gasCurrency] = new(priceHeap)
}
return l.heaps[gasCurrency]
}

// Put inserts a new transaction into the heap.
func (l *txPricedList) Put(tx *types.Transaction) {
heap.Push(l.items, tx)
pHeap := l.getPriceHeap(tx)
heap.Push(pHeap, tx)
}

// Removed notifies the prices transaction list that an old transaction dropped
Expand All @@ -431,38 +443,47 @@ func (l *txPricedList) Put(tx *types.Transaction) {
func (l *txPricedList) Removed() {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales++
if l.stales <= len(*l.items)/4 {
if l.stales <= l.Len()/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
reheap := make(priceHeap, 0, l.all.Count())
reheapMap := make(map[common.Address]*priceHeap)
for gasCurrency, count := range l.all.txCurrCount {
reheap := make(priceHeap, 0, count)
reheapMap[gasCurrency] = &reheap
}

l.stales, l.items = 0, &reheap
l.stales, l.heaps = 0, reheapMap
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx)
pHeap := l.getPriceHeap(tx)
*pHeap = append(*pHeap, tx)
return true
})
heap.Init(l.items)

for _, h := range l.heaps {
heap.Init(h)
}
}

// Cap finds all the transactions below the given price threshold, drops them
// Cap finds all the transactions below the given celo gold price threshold, drops them
// from the priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
func (l *txPricedList) Cap(cgThreshold *big.Int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep

for len(*l.items) > 0 {
for l.Len() > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
tx := l.pop()
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
}
// Stop the discards if we've reached the threshold
if tx.GasPrice().Cmp(threshold) >= 0 {

if l.pc.Cmp(tx.GasPrice(), tx.GasCurrency(), cgThreshold, nil) >= 0 {
save = append(save, tx)
break
}

// Non stale transaction found, discard unless local
if local.containsTx(tx) {
save = append(save, tx)
Expand All @@ -471,7 +492,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
}
}
for _, tx := range save {
heap.Push(l.items, tx)
l.Put(tx)
}
return drop
}
Expand All @@ -484,22 +505,23 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
return false
}
// Discard stale price points if found at the heap start
for len(*l.items) > 0 {
head := []*types.Transaction(*l.items)[0]
for l.Len() > 0 {
head := l.getMinPricedTx()
if l.all.Get(head.Hash()) == nil {
l.stales--
heap.Pop(l.items)
l.pop()
continue
}
break
}
// Check if the transaction is underpriced or not
if len(*l.items) == 0 {
if l.Len() == 0 {
log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors
return false
}
cheapest := []*types.Transaction(*l.items)[0]
return cheapest.GasPrice().Cmp(tx.GasPrice()) >= 0

cheapest := l.getMinPricedTx()
return l.pc.Cmp(cheapest.GasPrice(), cheapest.GasCurrency(), tx.GasPrice(), tx.GasCurrency()) >= 0
}

// Discard finds a number of most underpriced transactions, removes them from the
Expand All @@ -508,9 +530,9 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep

for len(*l.items) > 0 && count > 0 {
for l.Len() > 0 && count > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
tx := l.pop()
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
Expand All @@ -524,7 +546,56 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
}
}
for _, tx := range save {
heap.Push(l.items, tx)
l.Put(tx)
}
return drop
}

// Retrieves the heap with the lowest normalized price at it's head
func (l *txPricedList) getHeapWithMinHead() (*priceHeap, *types.Transaction) {
var cheapestHeap *priceHeap = nil
var cheapestTxn *types.Transaction = nil
for _, priceHeap := range l.heaps {
if len(*priceHeap) > 0 {
if cheapestHeap == nil {
cheapestHeap = priceHeap
cheapestTxn = []*types.Transaction(*cheapestHeap)[0]
} else {
txn := []*types.Transaction(*priceHeap)[0]
if l.pc.Cmp(cheapestTxn.GasPrice(), cheapestTxn.GasCurrency(), txn.GasPrice(), txn.GasCurrency()) < 0 {
cheapestHeap = priceHeap
}
}
}
}

return cheapestHeap, cheapestTxn
}

// Retrieves the tx with the lowest normalized price among all the heaps
func (l *txPricedList) getMinPricedTx() *types.Transaction {
_, minTx := l.getHeapWithMinHead()

return minTx
}

// Retrieves the total number of txns within the priced list
func (l *txPricedList) Len() int {
totalLen := 0
for _, h := range l.heaps {
totalLen += len(*h)
}

return totalLen
}

// Pops the tx with the lowest normalized price.
func (l *txPricedList) pop() *types.Transaction {
cheapestHeap, _ := l.getHeapWithMinHead()

if cheapestHeap != nil {
return heap.Pop(cheapestHeap).(*types.Transaction)
} else {
return nil
}
}
Loading

0 comments on commit ec059a8

Please sign in to comment.