Skip to content

Commit 67aeba3

Browse files
committed
perf(miner):add mining prefetcher
1 parent 859186f commit 67aeba3

File tree

6 files changed

+205
-0
lines changed

6 files changed

+205
-0
lines changed

core/gaspool.go

+9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ import (
2525
// in a block. The zero value is a pool with zero gas available.
2626
type GasPool uint64
2727

28+
// SetGas set an initial value for gaspool
29+
func (gp *GasPool) SetGas(amount uint64) *GasPool {
30+
if amount > math.MaxUint64 {
31+
panic("gas pool pushed above uint64")
32+
}
33+
*(*uint64)(gp) = amount
34+
return gp
35+
}
36+
2837
// AddGas makes gas available for execution.
2938
func (gp *GasPool) AddGas(amount uint64) *GasPool {
3039
if uint64(*gp) > math.MaxUint64-amount {

core/state_prefetcher.go

+57
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
8888
}
8989
}
9090

91+
// PrefetchMining processes the state changes according to the Ethereum rules by running
92+
// the transaction messages using the statedb, but any changes are discarded. The
93+
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
94+
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
95+
var signer = types.MakeSigner(p.config, header.Number)
96+
97+
txCh := make(chan *types.Transaction, 2*prefetchThread)
98+
for i := 0; i < prefetchThread; i++ {
99+
go func(txCh <-chan *types.Transaction, stopCh <-chan struct{}) {
100+
idx := 0
101+
newStatedb := statedb.Copy()
102+
gaspool := new(GasPool).AddGas(gasLimit)
103+
blockContext := NewEVMBlockContext(header, p.bc, nil)
104+
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
105+
// Iterate over and process the individual transactions
106+
for {
107+
select {
108+
case tx := <-txCh:
109+
// Convert the transaction into an executable message and pre-cache its sender
110+
msg, err := tx.AsMessage(signer)
111+
if err != nil {
112+
return // Also invalid block, bail out
113+
}
114+
idx++
115+
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
116+
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
117+
gaspool.SetGas(gasLimit)
118+
case <-stopCh:
119+
return
120+
}
121+
}
122+
}(txCh, interruptCh)
123+
}
124+
go func(txs *types.TransactionsByPriceAndNonce) {
125+
count := 0
126+
for {
127+
tx := txs.Peek()
128+
if tx == nil {
129+
return
130+
}
131+
select {
132+
case <-interruptCh:
133+
return
134+
default:
135+
}
136+
if count++; count%10 == 0 {
137+
if *txCurr == nil {
138+
return
139+
}
140+
txs.Forward(*txCurr)
141+
}
142+
txCh <- tx
143+
txs.Shift()
144+
}
145+
}(txs)
146+
}
147+
91148
// precacheTransaction attempts to apply a transaction to the given state database
92149
// and uses the input parameters for its environment. The goal is not to execute
93150
// the transaction successfully, rather to warm up touched data slots.

core/types.go

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type Prefetcher interface {
4040
// the transaction messages using the statedb, but any changes are discarded. The
4141
// only goal is to pre-cache transaction signatures and state trie nodes.
4242
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
43+
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
44+
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
4345
}
4446

4547
// Processor is an interface for processing blocks using a given initial state.

core/types/transaction.go

+56
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
458458
}
459459
}
460460

461+
// Copy copy a new TransactionsPriceAndNonce with the same *transaction
462+
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
463+
heads := make([]*Transaction, len(t.heads))
464+
copy(heads, t.heads)
465+
txs := make(map[common.Address]Transactions, len(t.txs))
466+
for acc, txsTmp := range t.txs {
467+
txs[acc] = txsTmp
468+
}
469+
return &TransactionsByPriceAndNonce{
470+
heads: heads,
471+
txs: txs,
472+
signer: t.signer,
473+
}
474+
}
475+
461476
// Peek returns the next transaction by price.
462477
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
463478
if len(t.heads) == 0 {
@@ -488,6 +503,47 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
488503
return len(t.heads)
489504
}
490505

506+
//Forward move t to be one index behind tx, tx cant be nil
507+
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
508+
if tx == nil {
509+
txTmp := t.Peek()
510+
for txTmp != nil {
511+
t.Shift()
512+
txTmp = t.Peek()
513+
}
514+
return
515+
}
516+
517+
l := len(t.heads)
518+
acc, _ := Sender(t.signer, tx)
519+
for i := 0; i < l; i++ {
520+
accTmp, _ := Sender(t.signer, t.heads[i])
521+
if acc == accTmp {
522+
if tx == t.heads[i] {
523+
txTmp := t.Peek()
524+
for txTmp != tx {
525+
t.Shift()
526+
txTmp = t.Peek()
527+
}
528+
t.Shift()
529+
return
530+
}
531+
for _, txTmp := range t.txs[accTmp] {
532+
if txTmp == tx {
533+
txTmp = t.Peek()
534+
for txTmp != tx {
535+
t.Shift()
536+
txTmp = t.Peek()
537+
}
538+
t.Shift()
539+
return
540+
}
541+
}
542+
return
543+
}
544+
}
545+
}
546+
491547
// Message is a fully derived transaction and implements core.Message
492548
//
493549
// NOTE: In a future PR this will be removed.

core/types/transaction_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
358358
}
359359
}
360360

361+
func TestTransactionForward(t *testing.T) {
362+
// Generate a batch of accounts to start with
363+
keys := make([]*ecdsa.PrivateKey, 5)
364+
for i := 0; i < len(keys); i++ {
365+
keys[i], _ = crypto.GenerateKey()
366+
}
367+
signer := HomesteadSigner{}
368+
369+
// Generate a batch of transactions with overlapping prices, but different creation times
370+
groups := map[common.Address]Transactions{}
371+
for start, key := range keys {
372+
addr := crypto.PubkeyToAddress(key.PublicKey)
373+
374+
tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
375+
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
376+
377+
tx.time = time.Unix(0, int64(len(keys)-start))
378+
tx2.time = time.Unix(1, int64(len(keys)-start))
379+
380+
groups[addr] = append(groups[addr], tx)
381+
groups[addr] = append(groups[addr], tx2)
382+
383+
}
384+
// Sort the transactions
385+
txset := NewTransactionsByPriceAndNonce(signer, groups)
386+
txsetCpy := txset.Copy()
387+
388+
txs := Transactions{}
389+
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
390+
txs = append(txs, tx)
391+
txsetCpy.Shift()
392+
}
393+
394+
tmp := txset.Copy()
395+
for j := 0; j < 10; j++ {
396+
txset = tmp.Copy()
397+
txsetCpy = tmp.Copy()
398+
i := 0
399+
for ; i < j; i++ {
400+
txset.Shift()
401+
}
402+
tx := txset.Peek()
403+
txsetCpy.Forward(tx)
404+
txCpy := txsetCpy.Peek()
405+
if txCpy == nil {
406+
if tx == nil {
407+
continue
408+
}
409+
txset.Shift()
410+
if txset.Peek() != nil {
411+
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
412+
} else {
413+
continue
414+
}
415+
}
416+
txset.Shift()
417+
for ; i < len(txs)-1; i++ {
418+
tx = txset.Peek()
419+
txCpy = txsetCpy.Peek()
420+
if txCpy != tx {
421+
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
422+
}
423+
txsetCpy.Shift()
424+
txset.Shift()
425+
}
426+
427+
}
428+
}
429+
361430
// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
362431
func TestTransactionCoding(t *testing.T) {
363432
key, err := crypto.GenerateKey()

miner/worker.go

+12
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ type intervalAdjust struct {
130130
// worker is the main object which takes care of submitting new work to consensus engine
131131
// and gathering the sealing result.
132132
type worker struct {
133+
prefetcher core.Prefetcher
133134
config *Config
134135
chainConfig *params.ChainConfig
135136
engine consensus.Engine
@@ -196,6 +197,7 @@ type worker struct {
196197

197198
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
198199
worker := &worker{
200+
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
199201
config: config,
200202
chainConfig: chainConfig,
201203
engine: engine,
@@ -778,6 +780,13 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
778780
}
779781
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)
780782

783+
// var interruptPrefetch uint32
784+
interruptCh := make(chan struct{})
785+
var txCurr **types.Transaction
786+
//prefetch txs from all pending txs
787+
txsPrefetch := txs.Copy()
788+
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)
789+
781790
LOOP:
782791
for {
783792
// In the following three cases, we will interrupt the execution of the transaction.
@@ -798,6 +807,7 @@ LOOP:
798807
inc: true,
799808
}
800809
}
810+
close(interruptCh)
801811
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
802812
}
803813
// If we don't have enough gas for any further transactions then we're done
@@ -815,6 +825,7 @@ LOOP:
815825
}
816826
// Retrieve the next transaction and abort if all done
817827
tx := txs.Peek()
828+
txCurr = &tx
818829
if tx == nil {
819830
break
820831
}
@@ -868,6 +879,7 @@ LOOP:
868879
txs.Shift()
869880
}
870881
}
882+
close(interruptCh)
871883
bloomProcessors.Close()
872884

873885
if !w.isRunning() && len(coalescedLogs) > 0 {

0 commit comments

Comments
 (0)