Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade tx pool #528

Merged
merged 23 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
702e0ca
core: txpool stable underprice drop order, perf fixes (#16494)
gzliudan Apr 30, 2024
2060ff7
core: Ensure that local transactions aren't discarded as underpriced …
gzliudan May 7, 2024
88695ca
core: use a wrapped map w/ sync.RWMutex for TxPool.all to remove cont…
gzliudan Apr 30, 2024
fb89a54
core: fix transaction event asynchronicity (#16843)
gzliudan May 7, 2024
4f0f6e0
core: concurrent background transaction sender ecrecover (#16882)
gzliudan May 7, 2024
859308c
core: change comment to match code more closely (#16963)
gzliudan May 8, 2024
5b883de
core/tx_pool: reduce judgement levels (#16980)
gzliudan May 8, 2024
241201c
core: fixed typo (#17214)
gzliudan May 8, 2024
41b29a8
Fixed typo addresssByHeartbeat -> addressesByHeartbeat (#17243)
gzliudan May 8, 2024
6c657ef
core: priority mining (#17472)
gzliudan May 8, 2024
ec50ca3
core, eth, trie: use common/prque (#17508)
gzliudan May 9, 2024
a666465
core: fix a typo (#17733)
gzliudan May 9, 2024
6b87c07
core: fix comment typo (#18144)
gzliudan May 9, 2024
676c4e8
core: sanitize more TxPoolConfig fields (#17210)
gzliudan May 9, 2024
c5b22fb
core: make txpool handle reorg due to setHead (#19308)
gzliudan May 9, 2024
65baaaa
core: cache tx signature before obtaining lock (#19351)
gzliudan May 10, 2024
ddbf5d2
core: expose various counter metrics for grafana (#19692)
gzliudan May 10, 2024
74c7236
core: move TxPool reorg and events to background goroutine (#19705)
gzliudan May 10, 2024
6338a41
core: kill off managed state, use own tiny noncer for txpool (#19810)
gzliudan May 10, 2024
edaed4f
core: fix write concurrency in txpool (#19835)
gzliudan May 10, 2024
b708614
core, les: fix les unit tests (#19823)
gzliudan May 10, 2024
67825d8
core, light, params: implement eip2028 (#19931)
gzliudan May 10, 2024
742a7f9
core, metrics: switch some invalid counters to gauges (#20047)
gzliudan May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions XDCx/XDCx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/syncmap"
Expand Down Expand Up @@ -105,7 +104,7 @@ func New(cfg *Config) *XDCX {
}
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
tokenDecimalCache: tokenDecimalCache,
orderCache: orderCache,
}
Expand Down
11 changes: 5 additions & 6 deletions XDCxlending/XDCxlending.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func New(XDCx *XDCx.XDCX) *Lending {
lendingTradeCache, _ := lru.New(defaultCacheLimit)
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
lendingItemHistory: itemCache,
lendingTradeHistory: lendingTradeCache,
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/XDPoS/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

type Masternode struct {
Expand Down
21 changes: 13 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
Expand All @@ -53,7 +52,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -201,7 +199,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
Expand Down Expand Up @@ -1268,18 +1266,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))
if tradingTrieDb != nil {
tradingTrieDb.Reference(tradingRoot, common.Hash{})
}
if tradingService != nil {
tradingService.GetTriegc().Push(tradingRoot, -float32(block.NumberU64()))
tradingService.GetTriegc().Push(tradingRoot, -int64(block.NumberU64()))
}
if lendingTrieDb != nil {
lendingTrieDb.Reference(lendingRoot, common.Hash{})
}
if lendingService != nil {
lendingService.GetTriegc().Push(lendingRoot, -float32(block.NumberU64()))
lendingService.GetTriegc().Push(lendingRoot, -int64(block.NumberU64()))
}
if current := block.NumberU64(); current > triesInMemory {
// Find the next state trie we need to commit
Expand Down Expand Up @@ -1450,6 +1448,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)

// Do a sanity check that the provided chain is actually ordered and linked
Expand Down Expand Up @@ -1491,6 +1493,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)

// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks
Expand Down
14 changes: 6 additions & 8 deletions core/lending_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"sync"
"time"

"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -998,11 +996,11 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
Expand Down Expand Up @@ -1066,7 +1064,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand Down
11 changes: 5 additions & 6 deletions core/order_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import (
"time"

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -914,11 +913,11 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
Expand Down Expand Up @@ -982,7 +981,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand Down
105 changes: 105 additions & 0 deletions core/tx_cacher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package core

import (
"runtime"

"github.com/XinFinOrg/XDPoSChain/core/types"
)

// senderCacher is a concurrent tranaction sender recoverer anc cacher.
var senderCacher = newTxSenderCacher(runtime.NumCPU())

// txSenderCacherRequest is a request for recovering transaction senders with a
// specific signature scheme and caching it into the transactions themselves.
//
// The inc field defines the number of transactions to skip after each recovery,
// which is used to feed the same underlying input array to different threads but
// ensure they process the early transactions fast.
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
}

// txSenderCacher is a helper structure to concurrently ecrecover transaction
// senders from digital signatures on background threads.
type txSenderCacher struct {
threads int
tasks chan *txSenderCacherRequest
}

// newTxSenderCacher creates a new transaction sender background cacher and starts
// as many procesing goroutines as allowed by the GOMAXPROCS on construction.
func newTxSenderCacher(threads int) *txSenderCacher {
cacher := &txSenderCacher{
tasks: make(chan *txSenderCacherRequest, threads),
threads: threads,
}
for i := 0; i < threads; i++ {
go cacher.cache()
}
return cacher
}

// cache is an infinite loop, caching transaction senders from various forms of
// data structures.
func (cacher *txSenderCacher) cache() {
for task := range cacher.tasks {
for i := 0; i < len(task.txs); i += task.inc {
types.Sender(task.signer, task.txs[i])
}
}
}

// recover recovers the senders from a batch of transactions and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
// If there's nothing to recover, abort
if len(txs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
}
}
}

// recoverFromBlocks recovers the senders from a batch of blocks and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*types.Block) {
count := 0
for _, block := range blocks {
count += len(block.Transactions())
}
txs := make([]*types.Transaction, 0, count)
for _, block := range blocks {
txs = append(txs, block.Transactions()...)
}
cacher.recover(signer, txs)
}
Loading