From 2d334d739cb9abae00eab7b0f6d3ad6134350da7 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Mon, 21 Feb 2022 11:06:38 +0100 Subject: [PATCH] Flashbots changes v0.4 to v0.5 * fix issue with geth not shutting down (#97) * Add eth_callBundle rpc method (#14) * flashbots: add eth_estimateGasBundle (#102) * feat(ethash): flashbots_getWork RPC with profit (#106) * Calculate megabundle as soon as it's received (#112) * Add v0.5 specification link (#118) --- cmd/evm/internal/t8ntool/block.go | 2 +- consensus/beacon/consensus.go | 4 +- consensus/clique/clique.go | 2 +- consensus/consensus.go | 2 +- consensus/ethash/api.go | 7 +- consensus/ethash/ethash.go | 6 + consensus/ethash/ethash_test.go | 6 +- consensus/ethash/flashbots_api.go | 38 ++++ consensus/ethash/sealer.go | 21 +- consensus/ethash/sealer_test.go | 10 +- core/state_processor.go | 56 ++++++ core/txpool/txpool.go | 9 +- eth/backend.go | 2 +- infra/Dockerfile.node | 2 +- infra/Dockerfile.updater | 2 +- infra/start-mev-geth-node.sh | 5 +- infra/start-mev-geth-updater.sh | 2 + internal/ethapi/api.go | 308 ++++++++++++++++++++++++++++++ internal/ethapi/backend.go | 5 +- les/client.go | 2 +- miner/multi_worker.go | 29 ++- miner/worker.go | 47 +++-- 22 files changed, 518 insertions(+), 49 deletions(-) create mode 100644 consensus/ethash/flashbots_api.go diff --git a/cmd/evm/internal/t8ntool/block.go b/cmd/evm/internal/t8ntool/block.go index 1140daa2c27f..ed72229cb3a5 100644 --- a/cmd/evm/internal/t8ntool/block.go +++ b/cmd/evm/internal/t8ntool/block.go @@ -191,7 +191,7 @@ func (i *bbInput) sealEthash(block *types.Block) (*types.Block, error) { // If the testmode is used, the sealer will return quickly, and complain // "Sealing result is not read by miner" if it cannot write the result. results := make(chan *types.Block, 1) - if err := engine.Seal(nil, block, results, nil); err != nil { + if err := engine.Seal(nil, block, nil, results, nil); err != nil { panic(fmt.Sprintf("failed to seal block: %v", err)) } found := <-results diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index eb5aa58ca887..625864cf8676 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -375,9 +375,9 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea // // Note, the method returns immediately and will send the result async. More // than one result may also be returned depending on the consensus algorithm. -func (beacon *Beacon) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (beacon *Beacon) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { if !beacon.IsPoSHeader(block.Header()) { - return beacon.ethone.Seal(chain, block, results, stop) + return beacon.ethone.Seal(chain, block, profit, results, stop) } // The seal verification is done by the external consensus engine, // return directly without pushing any block back. In another word diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 4706bbac1ca9..e87c63c5579f 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -599,7 +599,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) { // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. -func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() // Sealing the genesis block is not supported diff --git a/consensus/consensus.go b/consensus/consensus.go index 190d5ae12c8a..012ec3b7f576 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -105,7 +105,7 @@ type Engine interface { // // Note, the method returns immediately and will send the result async. More // than one result may also be returned depending on the consensus algorithm. - Seal(chain ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error + Seal(chain ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error // SealHash returns the hash of a block prior to it being sealed. SealHash(header *types.Header) common.Hash diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index da3b0751be54..6f0bfcd7d396 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -45,7 +45,7 @@ func (api *API) GetWork() ([4]string, error) { } var ( - workCh = make(chan [4]string, 1) + workCh = make(chan [5]string, 1) errc = make(chan error, 1) ) select { @@ -54,7 +54,10 @@ func (api *API) GetWork() ([4]string, error) { return [4]string{}, errEthashStopped } select { - case work := <-workCh: + case fullWork := <-workCh: + var work [4]string + copy(work[:], fullWork[:4]) + return work, nil case err := <-errc: return [4]string{}, err diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 6cb312482795..f26115865358 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -690,6 +690,12 @@ func (ethash *Ethash) APIs(chain consensus.ChainHeaderReader) []rpc.API { Namespace: "ethash", Service: &API{ethash}, }, + { + Namespace: "flashbots", + Version: "1.0", + Service: &FlashbotsAPI{ethash}, + Public: true, + }, } } diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index eb6bad96226b..6e3f3bb59fe7 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -37,7 +37,7 @@ func TestTestMode(t *testing.T) { defer ethash.Close() results := make(chan *types.Block) - err := ethash.Seal(nil, types.NewBlockWithHeader(header), results, nil) + err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil, results, nil) if err != nil { t.Fatalf("failed to seal block: %v", err) } @@ -112,7 +112,7 @@ func TestRemoteSealer(t *testing.T) { // Push new work. results := make(chan *types.Block) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) var ( work [4]string @@ -129,7 +129,7 @@ func TestRemoteSealer(t *testing.T) { header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} block = types.NewBlockWithHeader(header) sealhash = ethash.SealHash(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() { t.Error("expect to return the latest pushed work") diff --git a/consensus/ethash/flashbots_api.go b/consensus/ethash/flashbots_api.go new file mode 100644 index 000000000000..527d2a44352e --- /dev/null +++ b/consensus/ethash/flashbots_api.go @@ -0,0 +1,38 @@ +package ethash + +import "errors" + +// FlashbotsAPI exposes Flashbots related methods for the RPC interface. +type FlashbotsAPI struct { + ethash *Ethash +} + +// GetWork returns a work package for external miner. +// +// The work package consists of 5 strings: +// result[0] - 32 bytes hex encoded current block header pow-hash +// result[1] - 32 bytes hex encoded seed hash used for DAG +// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +// result[3] - hex encoded block number +// result[4] - hex encoded profit generated from this block +func (api *FlashbotsAPI) GetWork() ([5]string, error) { + if api.ethash.remote == nil { + return [5]string{}, errors.New("not supported") + } + + var ( + workCh = make(chan [5]string, 1) + errc = make(chan error, 1) + ) + select { + case api.ethash.remote.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: + case <-api.ethash.remote.exitCh: + return [5]string{}, errEthashStopped + } + select { + case work := <-workCh: + return work, nil + case err := <-errc: + return [5]string{}, err + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 340ad440f380..e577307edc74 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -48,7 +48,7 @@ var ( // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. -func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { // If we're running a fake PoW, simply return a 0 nonce immediately if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake { header := block.Header() @@ -62,7 +62,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block } // If we're running a shared PoW, delegate sealing to it if ethash.shared != nil { - return ethash.shared.Seal(chain, block, results, stop) + return ethash.shared.Seal(chain, block, profit, results, stop) } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) @@ -86,7 +86,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block } // Push new work to remote sealer if ethash.remote != nil { - ethash.remote.workCh <- &sealTask{block: block, results: results} + ethash.remote.workCh <- &sealTask{block: block, profit: profit, results: results} } var ( pend sync.WaitGroup @@ -117,7 +117,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block case <-ethash.update: // Thread count was changed on user request, restart close(abort) - if err := ethash.Seal(chain, block, results, stop); err != nil { + if err := ethash.Seal(chain, block, profit, results, stop); err != nil { ethash.config.Log.Error("Failed to restart sealing after update", "err", err) } } @@ -194,7 +194,7 @@ type remoteSealer struct { works map[common.Hash]*types.Block rates map[common.Hash]hashrate currentBlock *types.Block - currentWork [4]string + currentWork [5]string notifyCtx context.Context cancelNotify context.CancelFunc // cancels all notification requests reqWG sync.WaitGroup // tracks notification request goroutines @@ -215,6 +215,7 @@ type remoteSealer struct { // sealTask wraps a seal block with relative result channel for remote sealer thread. type sealTask struct { block *types.Block + profit *big.Int results chan<- *types.Block } @@ -239,7 +240,7 @@ type hashrate struct { // sealWork wraps a seal work package for remote sealer. type sealWork struct { errc chan error - res chan [4]string + res chan [5]string } func startRemoteSealer(ethash *Ethash, urls []string, noverify bool) *remoteSealer { @@ -281,7 +282,7 @@ func (s *remoteSealer) loop() { // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. s.results = work.results - s.makeWork(work.block) + s.makeWork(work.block, work.profit) s.notifyWork() case work := <-s.fetchWorkCh: @@ -351,6 +352,10 @@ func (s *remoteSealer) makeWork(block *types.Block) { s.currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() s.currentWork[3] = hexutil.EncodeBig(block.Number()) + if profit != nil { + s.currentWork[4] = hexutil.EncodeBig(profit) + } + // Trace the seal work fetched by remote sealer. s.currentBlock = block s.works[hash] = block @@ -376,7 +381,7 @@ func (s *remoteSealer) notifyWork() { } } -func (s *remoteSealer) sendNotification(ctx context.Context, url string, json []byte, work [4]string) { +func (s *remoteSealer) sendNotification(ctx context.Context, url string, json []byte, work [5]string) { defer s.reqWG.Done() req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(json)) diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go index e338f7529065..4957788f215f 100644 --- a/consensus/ethash/sealer_test.go +++ b/consensus/ethash/sealer_test.go @@ -57,7 +57,7 @@ func TestRemoteNotify(t *testing.T) { header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil, nil) + ethash.Seal(nil, block, nil, nil, nil) select { case work := <-sink: if want := ethash.SealHash(header).Hex(); work[0] != want { @@ -105,7 +105,7 @@ func TestRemoteNotifyFull(t *testing.T) { header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil, nil) + ethash.Seal(nil, block, nil, nil, nil) select { case work := <-sink: if want := "0x" + strconv.FormatUint(header.Number.Uint64(), 16); work["number"] != want { @@ -151,7 +151,7 @@ func TestRemoteMultiNotify(t *testing.T) { for i := 0; i < cap(sink); i++ { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) } for i := 0; i < cap(sink); i++ { @@ -200,7 +200,7 @@ func TestRemoteMultiNotifyFull(t *testing.T) { for i := 0; i < cap(sink); i++ { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) } for i := 0; i < cap(sink); i++ { @@ -266,7 +266,7 @@ func TestStaleSubmission(t *testing.T) { for id, c := range testcases { for _, h := range c.headers { - ethash.Seal(nil, types.NewBlockWithHeader(h), results, nil) + ethash.Seal(nil, types.NewBlockWithHeader(h), nil, results, nil) } if res := api.SubmitWork(fakeNonce, ethash.SealHash(c.headers[c.submitIndex]), fakeDigest); res != c.submitRes { t.Errorf("case %d submit result mismatch, want %t, get %t", id+1, c.submitRes, res) diff --git a/core/state_processor.go b/core/state_processor.go index 163ea0a0200a..a79443ebeec9 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -142,6 +142,51 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, gp *GasPool return receipt, err } +func applyTransactionWithResult(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, *ExecutionResult, error) { + // Create a new context to be used in the EVM environment. + txContext := NewEVMTxContext(msg) + evm.Reset(txContext, statedb) + + // Apply the transaction to the current state (included in the env). + result, err := ApplyMessage(evm, msg, gp) + if err != nil { + return nil, nil, err + } + + // Update the state with pending changes. + var root []byte + if config.IsByzantium(header.Number) { + statedb.Finalise(true) + } else { + root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() + } + *usedGas += result.UsedGas + + // Create a new receipt for the transaction, storing the intermediate root and gas used + // by the tx. + receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas} + if result.Failed() { + receipt.Status = types.ReceiptStatusFailed + } else { + receipt.Status = types.ReceiptStatusSuccessful + } + receipt.TxHash = tx.Hash() + receipt.GasUsed = result.UsedGas + + // If the transaction created a contract, store the creation address in the receipt. + if msg.To() == nil { + receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce()) + } + + // Set the receipt logs and create the bloom filter. + receipt.Logs = statedb.GetLogs(tx.Hash(), header.Hash()) + receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) + receipt.BlockHash = header.Hash() + receipt.BlockNumber = header.Number + receipt.TransactionIndex = uint(statedb.TxIndex()) + return receipt, result, err +} + // ApplyTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. It returns the receipt // for the transaction, gas used and an error if the transaction failed, @@ -156,3 +201,14 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) return applyTransaction(msg, config, gp, statedb, header.Number, header.Hash(), tx, usedGas, vmenv) } + +func ApplyTransactionWithResult(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, *ExecutionResult, error) { + msg, err := tx.AsMessage(types.MakeSigner(config, header.Number), header.BaseFee) + if err != nil { + return nil, nil, err + } + // Create a new context to be used in the EVM environment + blockContext := NewEVMBlockContext(header, bc, author) + vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) + return applyTransactionWithResult(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv) +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 079d652f7fb0..8f9f8608ac3d 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -631,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio return errors.New("megabundle from non-trusted address") } - pool.megabundles[relayAddr] = types.MevBundle{ + megabundle := types.MevBundle{ Txs: txs, BlockNumber: blockNumber, MinTimestamp: minTimestamp, MaxTimestamp: maxTimestamp, RevertingTxHashes: revertingTxHashes, } + + pool.megabundles[relayAddr] = megabundle + + for _, hook := range pool.NewMegabundleHooks { + go hook(relayAddr, &megabundle) + } + return nil } diff --git a/eth/backend.go b/eth/backend.go index 6368c0e03c56..4a14e16536da 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -288,7 +288,7 @@ func makeExtraData(extra []byte) []byte { // APIs return the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *Ethereum) APIs() []rpc.API { - apis := ethapi.GetAPIs(s.APIBackend) + apis := ethapi.GetAPIs(s.APIBackend, s.BlockChain()) // Append any APIs exposed explicitly by the consensus engine apis = append(apis, s.engine.APIs(s.BlockChain())...) diff --git a/infra/Dockerfile.node b/infra/Dockerfile.node index db8e99ac937e..7868453eba7f 100644 --- a/infra/Dockerfile.node +++ b/infra/Dockerfile.node @@ -4,7 +4,7 @@ FROM golang:1.15-alpine as builder RUN apk add --no-cache make gcc musl-dev linux-headers git ADD . /go-ethereum -RUN cd /go-ethereum && make geth +RUN cd /go-ethereum && GO111MODULE=on go run build/ci.go install ./cmd/geth # Pull Geth into a second stage deploy alpine container FROM alpine:latest diff --git a/infra/Dockerfile.updater b/infra/Dockerfile.updater index d3099d19ce1a..808f55aa2b9c 100644 --- a/infra/Dockerfile.updater +++ b/infra/Dockerfile.updater @@ -4,7 +4,7 @@ FROM golang:1.15-alpine as builder RUN apk add --no-cache make gcc musl-dev linux-headers git ADD . /go-ethereum -RUN cd /go-ethereum && make geth +RUN cd /go-ethereum && GO111MODULE=on go run build/ci.go install ./cmd/geth # Pull Geth into a second stage deploy alpine container FROM alpine:latest diff --git a/infra/start-mev-geth-node.sh b/infra/start-mev-geth-node.sh index 05ad50c61003..45ef0c519734 100755 --- a/infra/start-mev-geth-node.sh +++ b/infra/start-mev-geth-node.sh @@ -33,6 +33,7 @@ start_node() { --ws.api eth,net,web3 \ --ws.origins '*' \ --syncmode $syncmode \ + --gcmode archive \ --cache 4096 \ --maxpeers $connections \ --goerli @@ -41,7 +42,7 @@ start_node() { echo "Node failed to start; exiting." exit 1 fi - else + else geth \ --port $netport \ --http \ @@ -59,7 +60,9 @@ start_node() { --ws.api eth,net,web3 \ --ws.origins '*' \ --syncmode $syncmode \ + --gcmode archive \ --cache 4096 \ + --snapshot=false \ --maxpeers $connections if [ $? -ne 0 ] then diff --git a/infra/start-mev-geth-updater.sh b/infra/start-mev-geth-updater.sh index 11a6a533aa14..abad72fab9bb 100755 --- a/infra/start-mev-geth-updater.sh +++ b/infra/start-mev-geth-updater.sh @@ -18,6 +18,7 @@ start_node() { --port $netport \ --syncmode $syncmode \ --cache 4096 \ + --gcmode archive \ --maxpeers $connections \ --goerli & if [ $? -ne 0 ] @@ -30,6 +31,7 @@ start_node() { --port $netport \ --syncmode $syncmode \ --cache 4096 \ + --gcmode archive \ --maxpeers $connections & if [ $? -ne 0 ] then diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 2c57cd6a789e..cd16b5a3dd5a 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -47,6 +47,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/tyler-smith/go-bip39" + "golang.org/x/crypto/sha3" ) // EthereumAPI provides an API to access Ethereum related information. @@ -2229,3 +2230,310 @@ func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabu } return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) } + +// BundleAPI offers an API for accepting bundled transactions +type BundleAPI struct { + b Backend + chain *core.BlockChain +} + +// NewBundleAPI creates a new Tx Bundle API instance. +func NewBundleAPI(b Backend, chain *core.BlockChain) *BundleAPI { + return &BundleAPI{b, chain} +} + +// CallBundleArgs represents the arguments for a call. +type CallBundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + BlockNumber rpc.BlockNumber `json:"blockNumber"` + StateBlockNumberOrHash rpc.BlockNumberOrHash `json:"stateBlockNumber"` + Coinbase *string `json:"coinbase"` + Timestamp *uint64 `json:"timestamp"` + Timeout *int64 `json:"timeout"` + GasLimit *uint64 `json:"gasLimit"` + Difficulty *big.Int `json:"difficulty"` + BaseFee *big.Int `json:"baseFee"` +} + +// CallBundle will simulate a bundle of transactions at the top of a given block +// number with the state of another (or the same) block. This can be used to +// simulate future blocks with the current state, or it can be used to simulate +// a past block. +// The sender is responsible for signing the transactions and using the correct +// nonce and ensuring validity +func (s *BundleAPI) CallBundle(ctx context.Context, args CallBundleArgs) (map[string]interface{}, error) { + if len(args.Txs) == 0 { + return nil, errors.New("bundle missing txs") + } + if args.BlockNumber == 0 { + return nil, errors.New("bundle missing blockNumber") + } + + var txs types.Transactions + + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return nil, err + } + txs = append(txs, tx) + } + defer func(start time.Time) { log.Debug("Executing EVM call finished", "runtime", time.Since(start)) }(time.Now()) + + timeoutMilliSeconds := int64(5000) + if args.Timeout != nil { + timeoutMilliSeconds = *args.Timeout + } + timeout := time.Millisecond * time.Duration(timeoutMilliSeconds) + state, parent, err := s.b.StateAndHeaderByNumberOrHash(ctx, args.StateBlockNumberOrHash) + if state == nil || err != nil { + return nil, err + } + blockNumber := big.NewInt(int64(args.BlockNumber)) + + timestamp := parent.Time + 1 + if args.Timestamp != nil { + timestamp = *args.Timestamp + } + coinbase := parent.Coinbase + if args.Coinbase != nil { + coinbase = common.HexToAddress(*args.Coinbase) + } + difficulty := parent.Difficulty + if args.Difficulty != nil { + difficulty = args.Difficulty + } + gasLimit := parent.GasLimit + if args.GasLimit != nil { + gasLimit = *args.GasLimit + } + var baseFee *big.Int + if args.BaseFee != nil { + baseFee = args.BaseFee + } else if s.b.ChainConfig().IsLondon(big.NewInt(args.BlockNumber.Int64())) { + baseFee = misc.CalcBaseFee(s.b.ChainConfig(), parent) + } + header := &types.Header{ + ParentHash: parent.Hash(), + Number: blockNumber, + GasLimit: gasLimit, + Time: timestamp, + Difficulty: difficulty, + Coinbase: coinbase, + BaseFee: baseFee, + } + + // Setup context so it may be cancelled the call has completed + // or, in case of unmetered gas, setup a context with a timeout. + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + // Make sure the context is cancelled when the call has completed + // this makes sure resources are cleaned up. + defer cancel() + + vmconfig := vm.Config{} + + // Setup the gas pool (also for unmetered requests) + // and apply the message. + gp := new(core.GasPool).AddGas(math.MaxUint64) + + results := []map[string]interface{}{} + coinbaseBalanceBefore := state.GetBalance(coinbase) + + bundleHash := sha3.NewLegacyKeccak256() + signer := types.MakeSigner(s.b.ChainConfig(), blockNumber) + var totalGasUsed uint64 + gasFees := new(big.Int) + for i, tx := range txs { + coinbaseBalanceBeforeTx := state.GetBalance(coinbase) + state.Prepare(tx.Hash(), i) + + receipt, result, err := core.ApplyTransactionWithResult(s.b.ChainConfig(), s.chain, &coinbase, gp, state, header, tx, &header.GasUsed, vmconfig) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + + txHash := tx.Hash().String() + from, err := types.Sender(signer, tx) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + to := "0x" + if tx.To() != nil { + to = tx.To().String() + } + jsonResult := map[string]interface{}{ + "txHash": txHash, + "gasUsed": receipt.GasUsed, + "fromAddress": from.String(), + "toAddress": to, + } + totalGasUsed += receipt.GasUsed + gasPrice, err := tx.EffectiveGasTip(header.BaseFee) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + gasFeesTx := new(big.Int).Mul(big.NewInt(int64(receipt.GasUsed)), gasPrice) + gasFees.Add(gasFees, gasFeesTx) + bundleHash.Write(tx.Hash().Bytes()) + if result.Err != nil { + jsonResult["error"] = result.Err.Error() + revert := result.Revert() + if len(revert) > 0 { + jsonResult["revert"] = string(revert) + } + } else { + dst := make([]byte, hex.EncodedLen(len(result.Return()))) + hex.Encode(dst, result.Return()) + jsonResult["value"] = "0x" + string(dst) + } + coinbaseDiffTx := new(big.Int).Sub(state.GetBalance(coinbase), coinbaseBalanceBeforeTx) + jsonResult["coinbaseDiff"] = coinbaseDiffTx.String() + jsonResult["gasFees"] = gasFeesTx.String() + jsonResult["ethSentToCoinbase"] = new(big.Int).Sub(coinbaseDiffTx, gasFeesTx).String() + jsonResult["gasPrice"] = new(big.Int).Div(coinbaseDiffTx, big.NewInt(int64(receipt.GasUsed))).String() + jsonResult["gasUsed"] = receipt.GasUsed + results = append(results, jsonResult) + } + + ret := map[string]interface{}{} + ret["results"] = results + coinbaseDiff := new(big.Int).Sub(state.GetBalance(coinbase), coinbaseBalanceBefore) + ret["coinbaseDiff"] = coinbaseDiff.String() + ret["gasFees"] = gasFees.String() + ret["ethSentToCoinbase"] = new(big.Int).Sub(coinbaseDiff, gasFees).String() + ret["bundleGasPrice"] = new(big.Int).Div(coinbaseDiff, big.NewInt(int64(totalGasUsed))).String() + ret["totalGasUsed"] = totalGasUsed + ret["stateBlockNumber"] = parent.Number.Int64() + + ret["bundleHash"] = "0x" + common.Bytes2Hex(bundleHash.Sum(nil)) + return ret, nil +} + +// EstimateGasBundleArgs represents the arguments for a call +type EstimateGasBundleArgs struct { + Txs []TransactionArgs `json:"txs"` + BlockNumber rpc.BlockNumber `json:"blockNumber"` + StateBlockNumberOrHash rpc.BlockNumberOrHash `json:"stateBlockNumber"` + Coinbase *string `json:"coinbase"` + Timestamp *uint64 `json:"timestamp"` + Timeout *int64 `json:"timeout"` +} + +func (s *BundleAPI) EstimateGasBundle(ctx context.Context, args EstimateGasBundleArgs) (map[string]interface{}, error) { + if len(args.Txs) == 0 { + return nil, errors.New("bundle missing txs") + } + if args.BlockNumber == 0 { + return nil, errors.New("bundle missing blockNumber") + } + + timeoutMS := int64(5000) + if args.Timeout != nil { + timeoutMS = *args.Timeout + } + timeout := time.Millisecond * time.Duration(timeoutMS) + + state, parent, err := s.b.StateAndHeaderByNumberOrHash(ctx, args.StateBlockNumberOrHash) + if state == nil || err != nil { + return nil, err + } + blockNumber := big.NewInt(int64(args.BlockNumber)) + timestamp := parent.Time + 1 + if args.Timestamp != nil { + timestamp = *args.Timestamp + } + coinbase := parent.Coinbase + if args.Coinbase != nil { + coinbase = common.HexToAddress(*args.Coinbase) + } + + header := &types.Header{ + ParentHash: parent.Hash(), + Number: blockNumber, + GasLimit: parent.GasLimit, + Time: timestamp, + Difficulty: parent.Difficulty, + Coinbase: coinbase, + BaseFee: parent.BaseFee, + } + + // Setup context so it may be cancelled when the call + // has completed or, in case of unmetered gas, setup + // a context with a timeout + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + + // Make sure the context is cancelled when the call has completed + // This makes sure resources are cleaned up + defer cancel() + + // RPC Call gas cap + globalGasCap := s.b.RPCGasCap() + + // Results + results := []map[string]interface{}{} + + // Copy the original db so we don't modify it + statedb := state.Copy() + + // Gas pool + gp := new(core.GasPool).AddGas(math.MaxUint64) + + // Block context + blockContext := core.NewEVMBlockContext(header, s.chain, &coinbase) + + // Feed each of the transactions into the VM ctx + // And try and estimate the gas used + for i, txArgs := range args.Txs { + // Since its a txCall we'll just prepare the + // state with a random hash + var randomHash common.Hash + rand.Read(randomHash[:]) + + // New random hash since its a call + statedb.Prepare(randomHash, i) + + // Convert tx args to msg to apply state transition + msg, err := txArgs.ToMessage(globalGasCap, header.BaseFee) + if err != nil { + return nil, err + } + + // Prepare the hashes + txContext := core.NewEVMTxContext(msg) + + // Get EVM Environment + vmenv := vm.NewEVM(blockContext, txContext, statedb, s.b.ChainConfig(), vm.Config{NoBaseFee: true}) + + // Apply state transition + result, err := core.ApplyMessage(vmenv, msg, gp) + if err != nil { + return nil, err + } + + // Modifications are committed to the state + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(blockNumber)) + + // Append result + jsonResult := map[string]interface{}{ + "gasUsed": result.UsedGas, + } + results = append(results, jsonResult) + } + + // Return results + ret := map[string]interface{}{} + ret["results"] = results + + return ret, nil +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 3dd472c98992..875af27d55fa 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -100,7 +100,7 @@ type Backend interface { ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } -func GetAPIs(apiBackend Backend) []rpc.API { +func GetAPIs(apiBackend Backend, chain *core.BlockChain) []rpc.API { nonceLock := new(AddrLocker) return []rpc.API{ { @@ -127,6 +127,9 @@ func GetAPIs(apiBackend Backend) []rpc.API { }, { Namespace: "eth", Service: NewPrivateTxBundleAPI(apiBackend), + }, { + Namespace: "eth", + Service: NewBundleAPI(apiBackend, chain), }, } } diff --git a/les/client.go b/les/client.go index 9ac85ecdac6f..3ab1c53e2d83 100644 --- a/les/client.go +++ b/les/client.go @@ -297,7 +297,7 @@ func (s *LightDummyAPI) Mining() bool { // APIs returns the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *LightEthereum) APIs() []rpc.API { - apis := ethapi.GetAPIs(s.ApiBackend) + apis := ethapi.GetAPIs(s.ApiBackend, nil) apis = append(apis, s.engine.APIs(s.BlockChain().HeaderChain())...) return append(apis, []rpc.API{ { diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 050ea38af4e5..da1471fa3e75 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons })) } + relayWorkerMap := make(map[common.Address]*worker) + for i := 0; i < len(config.TrustedRelays); i++ { - workers = append(workers, - newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - isMegabundleWorker: true, - queue: queue, - relayAddr: config.TrustedRelays[i], - })) + relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + }) + workers = append(workers, relayWorker) + relayWorkerMap[config.TrustedRelays[i]] = relayWorker } + eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) { + worker, found := relayWorkerMap[relayAddr] + if !found { + return + } + + select { + case worker.newMegabundleCh <- megabundle: + default: + } + }) + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, diff --git a/miner/worker.go b/miner/worker.go index 0edf3caaa405..4d2b42ecdba9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -233,6 +233,7 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust + newMegabundleCh chan *types.MevBundle wg sync.WaitGroup @@ -348,7 +349,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), + newWorkCh: make(chan *newWorkReq, 1), getWorkCh: make(chan *getWorkReq), taskCh: taskCh, resultCh: make(chan *types.Block, resultQueueSize), @@ -359,6 +360,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus coinbase: builderCoinbase, flashbots: flashbots, } + // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain @@ -523,26 +525,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of sealing. + runningInterrupt *int32 // Running task interrupt + queuedInterrupt *int32 // Queued task interrupt + minRecommit = recommit // minimal resubmit interval specified by user. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) defer timer.Stop() <-timer.C // discard the initial tick - // commit aborts in-flight transaction execution with given signal and resubmits a new one. + // commit aborts in-flight transaction execution with highest seen signal and resubmits a new one commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) - } - interrupt = new(int32) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return + case queuedRequest := <-w.newWorkCh: + // Previously queued request wasn't started yet, update the request and resubmit + queuedRequest.noempty = queuedRequest.noempty || noempty + queuedRequest.timestamp = timestamp + w.newWorkCh <- queuedRequest // guaranteed to be nonblocking + default: + // Previously queued request has already started, cycle interrupt pointer and submit new work + runningInterrupt = queuedInterrupt + queuedInterrupt = new(int32) + + w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking + } + + if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) { + atomic.StoreInt32(runningInterrupt, s) } + timer.Reset(recommit) atomic.StoreInt32(&w.newTxs, 0) } @@ -569,6 +583,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timestamp = time.Now().Unix() commit(false, commitInterruptNewHead) + case <-w.newMegabundleCh: + if w.isRunning() { + commit(true, commitInterruptNone) + } + case <-timer.C: // If sealing is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. @@ -637,7 +656,10 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + // Don't start if the work has already been interrupted + if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone { + w.commitWork(req.interrupt, req.noempty, req.timestamp) + } case req := <-w.getWorkCh: block, fees, err := w.generateWork(req.params) @@ -782,7 +804,7 @@ func (w *worker) taskLoop() { w.pendingTasks[sealHash] = task w.pendingMu.Unlock() - if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { + if err := w.engine.Seal(w.chain, task.block, task.profit, w.resultCh, stopCh); err != nil { log.Warn("Block sealing failed", "err", err) w.pendingMu.Lock() delete(w.pendingTasks, sealHash) @@ -1309,6 +1331,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC if err != nil { return err // no valid megabundle for this relay, nothing to do } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. // Megabundles API focuses on speed and runs everything in one cycle. coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)