Skip to content

Commit

Permalink
Add Flashbots profit switching to miner
Browse files Browse the repository at this point in the history
  • Loading branch information
jparyani committed Feb 19, 2021
1 parent c2b5b40 commit aa5840d
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 25 deletions.
10 changes: 5 additions & 5 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Config struct {
// Miner creates blocks and searches for proof-of-work values.
type Miner struct {
mux *event.TypeMux
worker *worker
worker *multiWorker
coinbase common.Address
eth Backend
engine consensus.Engine
Expand All @@ -72,7 +72,7 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
exitCh: make(chan struct{}),
startCh: make(chan common.Address),
stopCh: make(chan struct{}),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
worker: newMultiWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
}
go miner.update()

Expand Down Expand Up @@ -181,7 +181,7 @@ func (miner *Miner) SetRecommitInterval(interval time.Duration) {

// Pending returns the currently pending block and associated state.
func (miner *Miner) Pending() (*types.Block, *state.StateDB) {
return miner.worker.pending()
return miner.worker.regularWorker.pending()
}

// PendingBlock returns the currently pending block.
Expand All @@ -190,7 +190,7 @@ func (miner *Miner) Pending() (*types.Block, *state.StateDB) {
// simultaneously, please use Pending(), as the pending state can
// change between multiple method calls
func (miner *Miner) PendingBlock() *types.Block {
return miner.worker.pendingBlock()
return miner.worker.regularWorker.pendingBlock()
}

func (miner *Miner) SetEtherbase(addr common.Address) {
Expand Down Expand Up @@ -218,5 +218,5 @@ func (miner *Miner) DisablePreseal() {
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return miner.worker.pendingLogsFeed.Subscribe(ch)
return miner.worker.regularWorker.pendingLogsFeed.Subscribe(ch)
}
80 changes: 80 additions & 0 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package miner

import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)

type multiWorker struct {
regularWorker *worker
flashbotsWorker *worker
}

func (w *multiWorker) stop() {
w.regularWorker.stop()
w.flashbotsWorker.stop()
}

func (w *multiWorker) start() {
w.regularWorker.start()
w.flashbotsWorker.start()
}

func (w *multiWorker) close() {
w.regularWorker.close()
w.flashbotsWorker.close()
}

func (w *multiWorker) isRunning() bool {
return w.regularWorker.isRunning() || w.flashbotsWorker.isRunning()
}

func (w *multiWorker) setExtra(extra []byte) {
w.regularWorker.setExtra(extra)
w.flashbotsWorker.setExtra(extra)
}

func (w *multiWorker) setRecommitInterval(interval time.Duration) {
w.regularWorker.setRecommitInterval(interval)
w.flashbotsWorker.setRecommitInterval(interval)
}

func (w *multiWorker) setEtherbase(addr common.Address) {
w.regularWorker.setEtherbase(addr)
w.flashbotsWorker.setEtherbase(addr)
}

func (w *multiWorker) enablePreseal() {
w.regularWorker.enablePreseal()
w.flashbotsWorker.enablePreseal()
}

func (w *multiWorker) disablePreseal() {
w.regularWorker.disablePreseal()
w.flashbotsWorker.disablePreseal()
}

func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *multiWorker {
queue := make(chan *task)

return &multiWorker{
regularWorker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: false,
queue: queue,
}),
flashbotsWorker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: true,
queue: queue,
}),
}
}

type flashbotsData struct {
isFlashbots bool
queue chan *task
}
75 changes: 56 additions & 19 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ type task struct {
block *types.Block
createdAt time.Time

profit *big.Int
profit *big.Int
isFlashbots bool
}

const (
Expand Down Expand Up @@ -183,14 +184,39 @@ type worker struct {
// External functions
isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.

flashbots *flashbotsData

// Test hooks
newTaskHook func(*task) // Method to call upon receiving a new sealing task.
skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
fullTaskHook func() // Method to call before pushing the full sealing task.
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
}

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool, flashbots *flashbotsData) *worker {
exitCh := make(chan struct{})
taskCh := make(chan *task)
if flashbots.isFlashbots {
// publish to the flashbots queue
taskCh = flashbots.queue
} else {
// read from the flashbots queue
go func() {
for {
select {
case flashbotsTask := <-flashbots.queue:
select {
case taskCh <- flashbotsTask:
case <-exitCh:
return
}
case <-exitCh:
return
}
}
}()
}

worker := &worker{
config: config,
chainConfig: chainConfig,
Expand All @@ -207,12 +233,13 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
taskCh: taskCh,
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
exitCh: exitCh,
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
flashbots: flashbots,
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
Expand All @@ -229,8 +256,11 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus

go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
if !flashbots.isFlashbots {
// only mine if not flashbots
go worker.resultLoop()
go worker.taskLoop()
}

// Submit first work to initialize pending state.
if init {
Expand Down Expand Up @@ -570,7 +600,7 @@ func (w *worker) taskLoop() {
// Interrupt previous sealing operation
interrupt()
stopCh, prev = make(chan struct{}), sealHash
log.Info("Proposed miner block", "blockNumber", prevNumber, "profit", prevProfit, "sealhash", sealHash)
log.Info("Proposed miner block", "blockNumber", prevNumber, "profit", prevProfit, "isFlashbots", task.isFlashbots, "sealhash", sealHash)
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
Expand Down Expand Up @@ -1087,7 +1117,11 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
// Short circuit if there is no available pending transactions or bundles.
// But if we disable empty precommit already, ignore it. Since
// empty block is necessary to keep the liveness of the network.
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && len(w.eth.TxPool().AllMevBundles()) == 0 {
noBundles := true
if w.flashbots.isFlashbots && len(w.eth.TxPool().AllMevBundles()) > 0 {
noBundles = false
}
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && noBundles {
w.updateSnapshot()
return
}
Expand All @@ -1099,15 +1133,17 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
localTxs[account] = txs
}
}
bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
maxBundle, bundlePrice, ethToCoinbase, gasUsed := w.findMostProfitableBundle(bundles, w.coinbase, parent, header)
log.Info("Flashbots bundle", "ethToCoinbase", ethToCoinbase, "gasUsed", gasUsed, "bundlePrice", bundlePrice, "bundleLength", len(maxBundle))
if w.commitBundle(maxBundle, w.coinbase, interrupt) {
return
if w.flashbots.isFlashbots {
bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
maxBundle, bundlePrice, ethToCoinbase, gasUsed := w.findMostProfitableBundle(bundles, w.coinbase, parent, header)
log.Info("Flashbots bundle", "ethToCoinbase", ethToCoinbase, "gasUsed", gasUsed, "bundlePrice", bundlePrice, "bundleLength", len(maxBundle))
if w.commitBundle(maxBundle, w.coinbase, interrupt) {
return
}
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
Expand Down Expand Up @@ -1139,12 +1175,13 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), profit: w.current.profit}:
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), profit: w.current.profit, isFlashbots: w.flashbots.isFlashbots}:
w.unconfirmed.Shift(block.NumberU64() - 1)
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount,
"gas", block.GasUsed(), "fees", totalFees(block, receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
"elapsed", common.PrettyDuration(time.Since(start)),
"isFlashbots", w.flashbots.isFlashbots)

case <-w.exitCh:
log.Info("Worker has exited")
Expand Down
5 changes: 4 additions & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.AddLocals(pendingTxs)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, &flashbotsData{
isFlashbots: false,
queue: nil,
})
w.setEtherbase(testBankAddress)
return w, backend
}
Expand Down

0 comments on commit aa5840d

Please sign in to comment.