diff --git a/builder/block_submission_rate_limiter.go b/builder/block_submission_rate_limiter.go deleted file mode 100644 index ec83cc90b3a8..000000000000 --- a/builder/block_submission_rate_limiter.go +++ /dev/null @@ -1,92 +0,0 @@ -package builder - -import ( - "context" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core/types" -) - -type blockRateLimitSubmission struct { - resultCh chan bool - block *types.Block -} - -type BlockSubmissionRateLimiter struct { - submissionsCh chan blockRateLimitSubmission - started uint32 - ctx context.Context - cancel context.CancelFunc -} - -func NewBlockSubmissionRateLimiter() *BlockSubmissionRateLimiter { - ctx, cancel := context.WithCancel(context.Background()) - r := &BlockSubmissionRateLimiter{ - submissionsCh: make(chan blockRateLimitSubmission), - started: uint32(0), - ctx: ctx, - cancel: cancel, - } - - return r -} -func (r *BlockSubmissionRateLimiter) Limit(block *types.Block) chan bool { - resultCh := make(chan bool, 1) - if atomic.LoadUint32(&r.started) != 1 { - resultCh <- true - return resultCh - } - - select { - case r.submissionsCh <- blockRateLimitSubmission{ - resultCh: resultCh, - block: block, - }: - case <-r.ctx.Done(): - resultCh <- true - } - return resultCh -} - -func (r *BlockSubmissionRateLimiter) Start() { - if !atomic.CompareAndSwapUint32(&r.started, 0, 1) { - return - } - - go r.rateLimit() -} - -func (r *BlockSubmissionRateLimiter) rateLimit() { - for r.ctx.Err() == nil { - // Beginning of the rate limit bucket - bestSubmission := <-r.submissionsCh - - bucketCutoffCh := time.After(100 * time.Millisecond) - - bucketClosed := false - for !bucketClosed { - select { - case <-r.ctx.Done(): - bucketClosed = true - break - case <-bucketCutoffCh: - bucketClosed = true - break - case newSubmission := <-r.submissionsCh: - if bestSubmission.block.Profit.Cmp(newSubmission.block.Profit) < 0 { - bestSubmission.resultCh <- false - bestSubmission = newSubmission - } else { - newSubmission.resultCh <- false - } - } - } - - bestSubmission.resultCh <- true - } -} - -func (r *BlockSubmissionRateLimiter) Stop() { - r.cancel() -} diff --git a/builder/block_submission_rate_limiter_test.go b/builder/block_submission_rate_limiter_test.go deleted file mode 100644 index 4d30e75912cc..000000000000 --- a/builder/block_submission_rate_limiter_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package builder - -import ( - "math/big" - "testing" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" -) - -func TestLimit(t *testing.T) { - rl := NewBlockSubmissionRateLimiter() - - // Check that before starting requests are passed through - ch1 := rl.Limit(&types.Block{Profit: new(big.Int)}) - ch2 := rl.Limit(&types.Block{Profit: new(big.Int)}) - ch3 := rl.Limit(&types.Block{Profit: new(big.Int)}) - - time.Sleep(200 * time.Millisecond) - - for _, ch := range []chan bool{ch1, ch2, ch3} { - select { - case shouldSubmit := <-ch: - require.True(t, shouldSubmit) - default: - t.Error("chan was not ready") - } - } - - // Check that after starting requests are rate limited - rl.Start() - - // Check that before starting requests are passed through - ch1 = rl.Limit(&types.Block{Profit: new(big.Int)}) - ch2 = rl.Limit(&types.Block{Profit: new(big.Int)}) - ch3 = rl.Limit(&types.Block{Profit: big.NewInt(1)}) - - time.Sleep(200 * time.Millisecond) - - for _, ch := range []chan bool{ch1, ch2, ch3} { - select { - case shouldSubmit := <-ch: - if ch == ch3 { - require.True(t, shouldSubmit) - } else { - require.False(t, shouldSubmit) - } - default: - t.Error("chan was not ready") - } - } - - // Check that after stopping requests are passed through - rl.Stop() - - ch1 = rl.Limit(&types.Block{Profit: new(big.Int)}) - ch2 = rl.Limit(&types.Block{Profit: new(big.Int)}) - ch3 = rl.Limit(&types.Block{Profit: new(big.Int)}) - - time.Sleep(200 * time.Millisecond) - - for _, ch := range []chan bool{ch1, ch2, ch3} { - select { - case shouldSubmit := <-ch: - require.True(t, shouldSubmit) - default: - t.Error("chan was not ready") - } - } -} diff --git a/builder/builder.go b/builder/builder.go index 582482316907..ceb5732a5e16 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -1,7 +1,9 @@ package builder import ( + "context" "errors" + "golang.org/x/time/rate" "math/big" _ "os" "sync" @@ -44,66 +46,53 @@ type IBuilder interface { } type Builder struct { - ds flashbotsextra.IDatabaseService - beaconClient IBeaconClient - relay IRelay - eth IEthereumService - resubmitter Resubmitter - blockSubmissionRateLimiter *BlockSubmissionRateLimiter - builderSecretKey *bls.SecretKey - builderPublicKey boostTypes.PublicKey - builderSigningDomain boostTypes.Domain - - bestMu sync.Mutex - bestAttrs BuilderPayloadAttributes - bestBlockProfit *big.Int + ds flashbotsextra.IDatabaseService + relay IRelay + eth IEthereumService + builderSecretKey *bls.SecretKey + builderPublicKey boostTypes.PublicKey + builderSigningDomain boostTypes.Domain + + limiter *rate.Limiter + + slotMu sync.Mutex + slot uint64 + slotAttrs []BuilderPayloadAttributes + slotCtx context.Context + slotCtxCancel context.CancelFunc } -func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { +func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { pkBytes := bls.PublicKeyFromSecretKey(sk).Compress() pk := boostTypes.PublicKey{} pk.FromSlice(pkBytes) + slotCtx, slotCtxCancel := context.WithCancel(context.Background()) return &Builder{ - ds: ds, - beaconClient: bc, - relay: relay, - eth: eth, - resubmitter: Resubmitter{}, - blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(), - builderSecretKey: sk, - builderPublicKey: pk, - + ds: ds, + relay: relay, + eth: eth, + builderSecretKey: sk, + builderPublicKey: pk, builderSigningDomain: builderSigningDomain, - bestBlockProfit: big.NewInt(0), + + limiter: rate.NewLimiter(rate.Every(time.Second), 1), + slot: 0, + slotCtx: slotCtx, + slotCtxCancel: slotCtxCancel, } } func (b *Builder) Start() error { - b.blockSubmissionRateLimiter.Start() return nil } func (b *Builder) Stop() error { - b.blockSubmissionRateLimiter.Stop() return nil } func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error { - b.bestMu.Lock() - defer b.bestMu.Unlock() - - // Do not submit blocks that don't improve the profit - if b.bestAttrs != *attrs { - b.bestAttrs = *attrs - b.bestBlockProfit.SetInt64(0) - } else { - if block.Profit.Cmp(b.bestBlockProfit) <= 0 { - log.Info("Ignoring block that is not improving the profit") - return nil - } - } - + start := time.Now() executableData := beacon.BlockToExecutableData(block) payload, err := executableDataToExecutionPayload(executableData) if err != nil { @@ -152,9 +141,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun log.Info("could submit block", "bundles", len(bundles)) } - log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg) + log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg, "time", time.Since(start)) - b.bestBlockProfit.Set(block.Profit) return nil } @@ -188,29 +176,100 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error { return errors.New("parent block not found in blocktree") } - blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { - select { - case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block): - if !shouldSubmit { - log.Info("Block rate limited", "blochHash", block.Hash()) - return + b.slotMu.Lock() + defer b.slotMu.Unlock() + + if b.slot != attrs.Slot { + if b.slotCtxCancel != nil { + b.slotCtxCancel() + } + + slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) + b.slot = attrs.Slot + b.slotAttrs = nil + b.slotCtx = slotCtx + b.slotCtxCancel = slotCtxCancel + } + + for _, currentAttrs := range b.slotAttrs { + if *attrs == currentAttrs { + log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash) + return nil + } + } + b.slotAttrs = append(b.slotAttrs, *attrs) + + go b.runBuildingJob(b.slotCtx, proposerPubkey, vd.FeeRecipient, attrs) + return nil +} + +func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTypes.PublicKey, feeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) { + ctx, cancel := context.WithTimeout(slotCtx, 12*time.Second) + defer cancel() + + // Submission queue for the given payload attributes + // multiple jobs can run for different attributes fot the given slot + // 1. When new block is ready we check if its profit is higher than profit of last best block + // if it is we set queueBest* to values of the new block and notify queueSignal channel. + // 2. Submission goroutine waits for queueSignal and submits queueBest* if its more valuable than + // queueLastSubmittedProfit keeping queueLastSubmittedProfit to be the profit of the last submission. + // Submission goroutine is globally rate limited to have fixed rate of submissions for all jobs. + var ( + queueSignal = make(chan struct{}, 1) + + queueMu sync.Mutex + queueLastSubmittedProfit = new(big.Int) + queueBestProfit = new(big.Int) + queueBestBlock *types.Block + queueBestBundles []types.SimulatedBundle + ) + + log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash) + + submitBestBlock := func() { + queueMu.Lock() + if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 { + err := b.onSealedBlock(queueBestBlock, queueBestBundles, proposerPubkey, feeRecipient, attrs) + if err != nil { + log.Error("could not run sealed block hook", "err", err) + } else { + queueLastSubmittedProfit.Set(queueBestProfit) } - case <-time.After(200 * time.Millisecond): - log.Info("Block rate limit timeout, submitting the block anyway") } + queueMu.Unlock() + } - err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs) - if err != nil { - log.Error("could not run sealed block hook", "err", err) + // Empties queue, submits the best block for current job with rate limit (global for all jobs) + go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock) + + // Populates queue with submissions that increase block profit + blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { + if ctx.Err() != nil { + return + } + + queueMu.Lock() + defer queueMu.Unlock() + if block.Profit.Cmp(queueBestProfit) > 0 { + queueBestBlock = block + queueBestBundles = bundles + queueBestProfit.Set(block.Profit) + + select { + case queueSignal <- struct{}{}: + default: + } } } - firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error { - log.Info("Resubmitting build job") - return b.eth.BuildBlock(attrs, blockHook) + // resubmits block builder requests every second + runRetryLoop(ctx, time.Second, func() { + log.Debug("retrying BuildBlock", "slot", attrs.Slot, "parent", attrs.HeadHash) + err := b.eth.BuildBlock(attrs, blockHook) + if err != nil { + log.Warn("Failed to build block", "err", err) + } }) - - return firstBlockResult } func executableDataToExecutionPayload(data *beacon.ExecutableDataV1) (*boostTypes.ExecutionPayload, error) { diff --git a/builder/builder_test.go b/builder/builder_test.go index b3a3f815d926..85afcaba316c 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -75,12 +75,13 @@ func TestOnPayloadAttributes(t *testing.T) { testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock} - builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService) + builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService) builder.Start() defer builder.Stop() err = builder.OnPayloadAttribute(testPayloadAttributes) require.NoError(t, err) + time.Sleep(time.Second * 3) require.NotNil(t, testRelay.submittedMsg) expectedProposerPubkey, err := boostTypes.HexToPubkey(testBeacon.validator.Pk.String()) @@ -128,11 +129,11 @@ func TestOnPayloadAttributes(t *testing.T) { // Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the profit is the same testRelay.submittedMsg = nil - time.Sleep(1200 * time.Millisecond) + time.Sleep(2200 * time.Millisecond) require.Nil(t, testRelay.submittedMsg) // Up the profit, expect to get the block testEthService.testBlock.Profit.SetInt64(11) - time.Sleep(1200 * time.Millisecond) + time.Sleep(2200 * time.Millisecond) require.NotNil(t, testRelay.submittedMsg) } diff --git a/builder/local_relay_test.go b/builder/local_relay_test.go index c30779a3fa46..6ee0bfcc90a0 100644 --- a/builder/local_relay_test.go +++ b/builder/local_relay_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "golang.org/x/time/rate" "math/big" "net/http" "net/http/httptest" @@ -30,9 +31,11 @@ func newTestBackend(t *testing.T, forkchoiceData *beacon.ExecutableDataV1, block beaconClient := &testBeaconClient{validator: validator} localRelay := NewLocalRelay(sk, beaconClient, bDomain, cDomain, ForkData{}, true) ethService := &testEthereumService{synced: true, testExecutableData: forkchoiceData, testBlock: block} - backend := NewBuilder(sk, flashbotsextra.NilDbService{}, beaconClient, localRelay, bDomain, ethService) + backend := NewBuilder(sk, flashbotsextra.NilDbService{}, localRelay, bDomain, ethService) // service := NewService("127.0.0.1:31545", backend) + backend.limiter = rate.NewLimiter(rate.Inf, 0) + return backend, localRelay, validator } @@ -143,6 +146,7 @@ func TestGetHeader(t *testing.T) { require.Equal(t, 204, rr.Code) backend.OnPayloadAttribute(&BuilderPayloadAttributes{}) + time.Sleep(2 * time.Second) path = fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", 0, forkchoiceData.ParentHash.Hex(), validator.Pk.String()) rr = testRequest(t, relay, "GET", path, nil) @@ -190,6 +194,7 @@ func TestGetPayload(t *testing.T) { registerValidator(t, validator, relay) backend.OnPayloadAttribute(&BuilderPayloadAttributes{}) + time.Sleep(2 * time.Second) path := fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", 0, forkchoiceData.ParentHash.Hex(), validator.Pk.String()) rr := testRequest(t, relay, "GET", path, nil) diff --git a/builder/resubmit_utils.go b/builder/resubmit_utils.go new file mode 100644 index 000000000000..749bbc2e65fe --- /dev/null +++ b/builder/resubmit_utils.go @@ -0,0 +1,64 @@ +package builder + +import ( + "context" + "github.com/ethereum/go-ethereum/log" + "golang.org/x/time/rate" + "time" +) + +// runResubmitLoop checks for update signal and calls submit respecting provided rate limiter and context +func runResubmitLoop(ctx context.Context, limiter *rate.Limiter, updateSignal chan struct{}, submit func()) { + for { + select { + case <-ctx.Done(): + return + case <-updateSignal: + res := limiter.Reserve() + if !res.OK() { + log.Warn("resubmit loop failed to make limiter reservation") + return + } + + // check if we could make submission before context ctxDeadline + if ctxDeadline, ok := ctx.Deadline(); ok { + delayDeadline := time.Now().Add(res.Delay()) + if delayDeadline.After(ctxDeadline) { + res.Cancel() + return + } + } + + delay := res.Delay() + if delay == 0 { + submit() + continue + } + + t := time.NewTimer(delay) + select { + case <-t.C: + submit() + continue + case <-ctx.Done(): + res.Cancel() + t.Stop() + return + } + } + } +} + +// runRetryLoop calls retry periodically with the provided interval respecting context cancellation +func runRetryLoop(ctx context.Context, interval time.Duration, retry func()) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + retry() + } + } +} diff --git a/builder/resubmit_utils_test.go b/builder/resubmit_utils_test.go new file mode 100644 index 000000000000..d451097fd545 --- /dev/null +++ b/builder/resubmit_utils_test.go @@ -0,0 +1,75 @@ +package builder + +import ( + "context" + "golang.org/x/time/rate" + "math/rand" + "sort" + "sync" + "testing" + "time" +) + +type submission struct { + t time.Time + v int +} + +func TestResubmitUtils(t *testing.T) { + const ( + totalTime = time.Second + rateLimitTime = 100 * time.Millisecond + resubmitInterval = 10 * time.Millisecond + ) + + ctx, cancel := context.WithTimeout(context.Background(), totalTime) + defer cancel() + limiter := rate.NewLimiter(rate.Every(rateLimitTime), 1) + + var ( + signal = make(chan struct{}, 1) + subMu sync.Mutex + subLast int + subBest int + subAll []submission + ) + + go runResubmitLoop(ctx, limiter, signal, func() { + subMu.Lock() + defer subMu.Unlock() + + if subBest > subLast { + subAll = append(subAll, submission{time.Now(), subBest}) + subLast = subBest + } + }) + + runRetryLoop(ctx, resubmitInterval, func() { + subMu.Lock() + defer subMu.Unlock() + + value := rand.Int() + if value > subBest { + subBest = value + + select { + case signal <- struct{}{}: + default: + } + } + }) + + sorted := sort.SliceIsSorted(subAll, func(i, j int) bool { + return subAll[i].v < subAll[j].v + }) + if !sorted { + t.Error("submissions are not sorted") + } + + for i := 0; i < len(subAll)-1; i++ { + interval := subAll[i+1].t.Sub(subAll[i].t) + if interval+10*time.Millisecond < rateLimitTime { + t.Errorf("submissions are not rate limited: interval %s, limit %s", interval, rateLimitTime) + } + } +} diff --git a/builder/resubmitter.go b/builder/resubmitter.go deleted file mode 100644 index e167badbab20..000000000000 --- a/builder/resubmitter.go +++ /dev/null @@ -1,42 +0,0 @@ -package builder - -import ( - "context" - "sync" - "time" -) - -type Resubmitter struct { - mu sync.Mutex - cancel context.CancelFunc -} - -func (r *Resubmitter) newTask(repeatFor time.Duration, interval time.Duration, fn func() error) error { - repeatUntilCh := time.After(repeatFor) - - r.mu.Lock() - if r.cancel != nil { - r.cancel() - } - ctx, cancel := context.WithCancel(context.Background()) - r.cancel = cancel - r.mu.Unlock() - - firstRunErr := fn() - - go func() { - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return - case <-repeatUntilCh: - cancel() - return - case <-time.After(interval): - fn() - } - } - }() - - return firstRunErr -} diff --git a/builder/resubmitter_test.go b/builder/resubmitter_test.go deleted file mode 100644 index e60ff3ef7711..000000000000 --- a/builder/resubmitter_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package builder - -import ( - "errors" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestResubmitter(t *testing.T) { - resubmitter := Resubmitter{} - - pingCh := make(chan error) - go func() { - res := resubmitter.newTask(time.Second, 100*time.Millisecond, func() error { - return <-pingCh - }) - require.ErrorContains(t, res, "xx") - }() - - select { - case pingCh <- errors.New("xx"): - case <-time.After(time.Second): - t.Error("timeout waiting for the function") - } - - select { - case pingCh <- nil: - t.Error("function restarted too soon") - default: - } - - time.Sleep(200 * time.Millisecond) - - select { - case pingCh <- nil: - default: - t.Error("function restarted too late") - } - - time.Sleep(800 * time.Millisecond) - - select { - case pingCh <- nil: - default: - t.Error("function restarted too late") - } - - select { - case pingCh <- nil: - t.Error("function restarted after deadline") - case <-time.After(200 * time.Millisecond): - } -} diff --git a/builder/service.go b/builder/service.go index f4b99413f306..6a1ace06d67d 100644 --- a/builder/service.go +++ b/builder/service.go @@ -193,7 +193,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error go bundleFetcher.Run() ethereumService := NewEthereumService(backend) - builderBackend := NewBuilder(builderSk, ds, beaconClient, relay, builderSigningDomain, ethereumService) + builderBackend := NewBuilder(builderSk, ds, relay, builderSigningDomain, ethereumService) builderService := NewService(cfg.ListenAddr, localRelay, builderBackend) stack.RegisterAPIs([]rpc.API{ diff --git a/eth/block-validation/api_test.go b/eth/block-validation/api_test.go index e93092467e03..f7945399d69c 100644 --- a/eth/block-validation/api_test.go +++ b/eth/block-validation/api_test.go @@ -142,7 +142,7 @@ func TestValidateBuilderSubmissionV1(t *testing.T) { invalidPayload.LogsBloom = boostTypes.Bloom{} copy(invalidPayload.ReceiptsRoot[:], hexutil.MustDecode("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")[:32]) blockRequest.ExecutionPayload = invalidPayload - copy(blockRequest.Message.BlockHash[:], hexutil.MustDecode("0x08eaca07137419b4b7f8770530b2db870f6e1cc3eabd355b8f8301cec9fca5f3")[:32]) + copy(blockRequest.Message.BlockHash[:], hexutil.MustDecode("0x65cded68b85277f489f22497731d8cece9e42f0429a250a5022a9417408f3998")[:32]) require.ErrorContains(t, api.ValidateBuilderSubmissionV1(blockRequest), "could not apply tx 3", "insufficient funds for gas * price + value") } diff --git a/miner/bundle_cache.go b/miner/bundle_cache.go new file mode 100644 index 000000000000..42a810bd21bd --- /dev/null +++ b/miner/bundle_cache.go @@ -0,0 +1,82 @@ +package miner + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "sync" +) + +const ( + maxHeaders = 3 +) + +type BundleCache struct { + mu sync.Mutex + entries []*BundleCacheEntry +} + +func NewBundleCache() *BundleCache { + return &BundleCache{ + entries: make([]*BundleCacheEntry, maxHeaders), + } +} + +func (b *BundleCache) GetBundleCache(header common.Hash) *BundleCacheEntry { + b.mu.Lock() + defer b.mu.Unlock() + + for _, entry := range b.entries { + if entry != nil && entry.headerHash == header { + return entry + } + } + newEntry := newCacheEntry(header) + b.entries = b.entries[1:] + b.entries = append(b.entries, newEntry) + + return newEntry +} + +type BundleCacheEntry struct { + mu sync.Mutex + headerHash common.Hash + successfulBundles map[common.Hash]*simulatedBundle + failedBundles map[common.Hash]struct{} +} + +func newCacheEntry(header common.Hash) *BundleCacheEntry { + return &BundleCacheEntry{ + headerHash: header, + successfulBundles: make(map[common.Hash]*simulatedBundle), + failedBundles: make(map[common.Hash]struct{}), + } +} + +func (c *BundleCacheEntry) GetSimulatedBundle(bundle common.Hash) (*types.SimulatedBundle, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if simmed, ok := c.successfulBundles[bundle]; ok { + return simmed, true + } + + if _, ok := c.failedBundles[bundle]; ok { + return nil, true + } + + return nil, false +} + +func (c *BundleCacheEntry) UpdateSimulatedBundles(result []*types.SimulatedBundle, bundles []types.MevBundle) { + c.mu.Lock() + defer c.mu.Unlock() + + for i, simBundle := range result { + bundleHash := bundles[i].Hash + if simBundle != nil { + c.successfulBundles[bundleHash] = simBundle + } else { + c.failedBundles[bundleHash] = struct{}{} + } + } +} diff --git a/miner/bundle_cache_test.go b/miner/bundle_cache_test.go new file mode 100644 index 000000000000..71fcf376f6d4 --- /dev/null +++ b/miner/bundle_cache_test.go @@ -0,0 +1,68 @@ +package miner + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "testing" +) + +func TestBundleCacheEntry(t *testing.T) { + entry := newCacheEntry(common.HexToHash("0x01")) + + failingBundle := common.HexToHash("0xff") + successBundle := common.HexToHash("0xaa") + + sim, found := entry.GetSimulatedBundle(failingBundle) + if sim != nil || found { + t.Errorf("found bundle in empty cache: %s", failingBundle) + } + sim, found = entry.GetSimulatedBundle(successBundle) + if sim != nil || found { + t.Errorf("found bundle in empty cache: %s", successBundle) + } + + bundles := []types.MevBundle{types.MevBundle{Hash: failingBundle}, types.MevBundle{Hash: successBundle}} + simResult := []*types.SimulatedBundle{nil, &types.SimulatedBundle{OriginalBundle: bundles[1]}} + entry.UpdateSimulatedBundles(simResult, bundles) + + sim, found = entry.GetSimulatedBundle(failingBundle) + if sim != nil || !found { + t.Error("incorrect failing bundle result") + } + sim, found = entry.GetSimulatedBundle(successBundle) + if sim != simResult[1] || !found { + t.Error("incorrect successful bundle result") + } +} + +func TestBundleCache(t *testing.T) { + cache := NewBundleCache() + + header1 := common.HexToHash("0x01") + header2 := common.HexToHash("0x02") + header3 := common.HexToHash("0x03") + header4 := common.HexToHash("0x04") + + cache1 := cache.GetBundleCache(header1) + if cache1.headerHash != header1 { + t.Error("incorrect header cache") + } + + cache2 := cache.GetBundleCache(header2) + if cache2.headerHash != header2 { + t.Error("incorrect header cache") + } + + cache2Again := cache.GetBundleCache(header2) + if cache2 != cache2Again { + t.Error("header cache is not reused") + } + + cache.GetBundleCache(header3) + cache.GetBundleCache(header4) + + cache1Again := cache.GetBundleCache(header1) + if cache1 == cache1Again { + t.Error("cache1 should be removed after insertions") + } +} diff --git a/miner/multi_worker.go b/miner/multi_worker.go index e47ea12fca8d..41ef0ea81313 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -156,6 +156,7 @@ func newMultiWorkerGreedy(config *Config, chainConfig *params.ChainConfig, engin queue: queue, algoType: ALGO_GREEDY, maxMergedBundles: config.MaxMergedBundles, + bundleCache: NewBundleCache(), }) log.Info("creating new greedy worker") @@ -168,11 +169,14 @@ func newMultiWorkerGreedy(config *Config, chainConfig *params.ChainConfig, engin func newMultiWorkerMevGeth(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *multiWorker { queue := make(chan *task) + bundleCache := NewBundleCache() + regularWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ isFlashbots: false, queue: queue, algoType: ALGO_MEV_GETH, maxMergedBundles: config.MaxMergedBundles, + bundleCache: bundleCache, }) workers := []*worker{regularWorker} @@ -184,6 +188,7 @@ func newMultiWorkerMevGeth(config *Config, chainConfig *params.ChainConfig, engi queue: queue, algoType: ALGO_MEV_GETH, maxMergedBundles: i, + bundleCache: bundleCache, })) } } @@ -200,4 +205,5 @@ type flashbotsData struct { queue chan *task maxMergedBundles int algoType AlgoType + bundleCache *BundleCache } diff --git a/miner/worker.go b/miner/worker.go index 1706cb616e2b..81caf3d8f4a2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1732,37 +1732,16 @@ func (w *worker) mergeBundles(env *environment, bundles []simulatedBundle, pendi }, mergedBundles, count, nil } -var ( - g_bundleCacheMu sync.Mutex - g_bundleCacheHeaderHash common.Hash - g_bundleCacheSuccess map[common.Hash]simulatedBundle = make(map[common.Hash]simulatedBundle) - g_bundleCacheFailed map[common.Hash]struct{} = make(map[common.Hash]struct{}) -) - func (w *worker) simulateBundles(env *environment, bundles []types.MevBundle, pendingTxs map[common.Address]types.Transactions) ([]simulatedBundle, error) { - g_bundleCacheMu.Lock() - defer g_bundleCacheMu.Unlock() - - var cacheSuccess map[common.Hash]simulatedBundle - var cacheFailed map[common.Hash]struct{} - if g_bundleCacheHeaderHash == env.header.Hash() { - cacheSuccess = g_bundleCacheSuccess - cacheFailed = g_bundleCacheFailed - } else { - cacheSuccess = make(map[common.Hash]simulatedBundle) - cacheFailed = make(map[common.Hash]struct{}) - } + headerHash := env.header.Hash() + simCache := w.flashbots.bundleCache.GetBundleCache(headerHash) simResult := make([]*simulatedBundle, len(bundles)) var wg sync.WaitGroup for i, bundle := range bundles { - if simmed, ok := cacheSuccess[bundle.Hash]; ok { - simResult[i] = &simmed - continue - } - - if _, ok := cacheFailed[bundle.Hash]; ok { + if simmed, ok := simCache.GetSimulatedBundle(bundle.Hash); ok { + simResult[i] = simmed continue } @@ -1785,20 +1764,15 @@ func (w *worker) simulateBundles(env *environment, bundles []types.MevBundle, pe wg.Wait() + simCache.UpdateSimulatedBundles(simResult, bundles) + simulatedBundles := make([]simulatedBundle, 0, len(bundles)) - for i, bundle := range simResult { + for _, bundle := range simResult { if bundle != nil { simulatedBundles = append(simulatedBundles, *bundle) - cacheSuccess[bundle.OriginalBundle.Hash] = *bundle - } else { - cacheFailed[bundles[i].Hash] = struct{}{} } } - g_bundleCacheHeaderHash = env.header.Hash() - g_bundleCacheSuccess = cacheSuccess - g_bundleCacheFailed = cacheFailed - return simulatedBundles, nil } diff --git a/miner/worker_test.go b/miner/worker_test.go index 628cc82df2be..b07371914eed 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -206,6 +206,7 @@ func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consens w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, &flashbotsData{ isFlashbots: false, queue: nil, + bundleCache: NewBundleCache(), }) w.setEtherbase(testBankAddress) return w, backend