Skip to content

Commit

Permalink
Build on multiple tips (ethereum#35)
Browse files Browse the repository at this point in the history
* remove unused beacon client from the builder

* build on multiple tips

* worker async generateWork

* rework bundle cache

* fix validate payload test in main
  • Loading branch information
dvush authored and avalonche committed Mar 9, 2023
1 parent 9ac9562 commit f94c8e3
Show file tree
Hide file tree
Showing 16 changed files with 432 additions and 357 deletions.
92 changes: 0 additions & 92 deletions builder/block_submission_rate_limiter.go

This file was deleted.

71 changes: 0 additions & 71 deletions builder/block_submission_rate_limiter_test.go

This file was deleted.

175 changes: 117 additions & 58 deletions builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package builder

import (
"context"
"errors"
"golang.org/x/time/rate"
"math/big"
_ "os"
"sync"
Expand Down Expand Up @@ -44,66 +46,53 @@ type IBuilder interface {
}

type Builder struct {
ds flashbotsextra.IDatabaseService
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
resubmitter Resubmitter
blockSubmissionRateLimiter *BlockSubmissionRateLimiter
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain

bestMu sync.Mutex
bestAttrs BuilderPayloadAttributes
bestBlockProfit *big.Int
ds flashbotsextra.IDatabaseService
relay IRelay
eth IEthereumService
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain

limiter *rate.Limiter

slotMu sync.Mutex
slot uint64
slotAttrs []BuilderPayloadAttributes
slotCtx context.Context
slotCtxCancel context.CancelFunc
}

func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)

slotCtx, slotCtxCancel := context.WithCancel(context.Background())
return &Builder{
ds: ds,
beaconClient: bc,
relay: relay,
eth: eth,
resubmitter: Resubmitter{},
blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(),
builderSecretKey: sk,
builderPublicKey: pk,

ds: ds,
relay: relay,
eth: eth,
builderSecretKey: sk,
builderPublicKey: pk,
builderSigningDomain: builderSigningDomain,
bestBlockProfit: big.NewInt(0),

limiter: rate.NewLimiter(rate.Every(time.Second), 1),
slot: 0,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,
}
}

func (b *Builder) Start() error {
b.blockSubmissionRateLimiter.Start()
return nil
}

func (b *Builder) Stop() error {
b.blockSubmissionRateLimiter.Stop()
return nil
}

func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error {
b.bestMu.Lock()
defer b.bestMu.Unlock()

// Do not submit blocks that don't improve the profit
if b.bestAttrs != *attrs {
b.bestAttrs = *attrs
b.bestBlockProfit.SetInt64(0)
} else {
if block.Profit.Cmp(b.bestBlockProfit) <= 0 {
log.Info("Ignoring block that is not improving the profit")
return nil
}
}

start := time.Now()
executableData := beacon.BlockToExecutableData(block)
payload, err := executableDataToExecutionPayload(executableData)
if err != nil {
Expand Down Expand Up @@ -152,9 +141,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun
log.Info("could submit block", "bundles", len(bundles))
}

log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg)
log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg, "time", time.Since(start))

b.bestBlockProfit.Set(block.Profit)
return nil
}

Expand Down Expand Up @@ -188,29 +176,100 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error {
return errors.New("parent block not found in blocktree")
}

blockHook := func(block *types.Block, bundles []types.SimulatedBundle) {
select {
case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block):
if !shouldSubmit {
log.Info("Block rate limited", "blochHash", block.Hash())
return
b.slotMu.Lock()
defer b.slotMu.Unlock()

if b.slot != attrs.Slot {
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel
}

for _, currentAttrs := range b.slotAttrs {
if *attrs == currentAttrs {
log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)
return nil
}
}
b.slotAttrs = append(b.slotAttrs, *attrs)

go b.runBuildingJob(b.slotCtx, proposerPubkey, vd.FeeRecipient, attrs)
return nil
}

func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTypes.PublicKey, feeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) {
ctx, cancel := context.WithTimeout(slotCtx, 12*time.Second)
defer cancel()

// Submission queue for the given payload attributes
// multiple jobs can run for different attributes fot the given slot
// 1. When new block is ready we check if its profit is higher than profit of last best block
// if it is we set queueBest* to values of the new block and notify queueSignal channel.
// 2. Submission goroutine waits for queueSignal and submits queueBest* if its more valuable than
// queueLastSubmittedProfit keeping queueLastSubmittedProfit to be the profit of the last submission.
// Submission goroutine is globally rate limited to have fixed rate of submissions for all jobs.
var (
queueSignal = make(chan struct{}, 1)

queueMu sync.Mutex
queueLastSubmittedProfit = new(big.Int)
queueBestProfit = new(big.Int)
queueBestBlock *types.Block
queueBestBundles []types.SimulatedBundle
)

log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash)

submitBestBlock := func() {
queueMu.Lock()
if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 {
err := b.onSealedBlock(queueBestBlock, queueBestBundles, proposerPubkey, feeRecipient, attrs)
if err != nil {
log.Error("could not run sealed block hook", "err", err)
} else {
queueLastSubmittedProfit.Set(queueBestProfit)
}
case <-time.After(200 * time.Millisecond):
log.Info("Block rate limit timeout, submitting the block anyway")
}
queueMu.Unlock()
}

err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs)
if err != nil {
log.Error("could not run sealed block hook", "err", err)
// Empties queue, submits the best block for current job with rate limit (global for all jobs)
go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock)

// Populates queue with submissions that increase block profit
blockHook := func(block *types.Block, bundles []types.SimulatedBundle) {
if ctx.Err() != nil {
return
}

queueMu.Lock()
defer queueMu.Unlock()
if block.Profit.Cmp(queueBestProfit) > 0 {
queueBestBlock = block
queueBestBundles = bundles
queueBestProfit.Set(block.Profit)

select {
case queueSignal <- struct{}{}:
default:
}
}
}

firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error {
log.Info("Resubmitting build job")
return b.eth.BuildBlock(attrs, blockHook)
// resubmits block builder requests every second
runRetryLoop(ctx, time.Second, func() {
log.Debug("retrying BuildBlock", "slot", attrs.Slot, "parent", attrs.HeadHash)
err := b.eth.BuildBlock(attrs, blockHook)
if err != nil {
log.Warn("Failed to build block", "err", err)
}
})

return firstBlockResult
}

func executableDataToExecutionPayload(data *beacon.ExecutableDataV1) (*boostTypes.ExecutionPayload, error) {
Expand Down
Loading

0 comments on commit f94c8e3

Please sign in to comment.