Skip to content

Commit

Permalink
op-batcher: Implement dynamic blob/calldata selection
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Jul 24, 2024
1 parent b7f8188 commit 9c988d2
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 54 deletions.
5 changes: 3 additions & 2 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,15 @@ func (s *channel) ID() derive.ChannelID {
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf)}
// TODO: consider changing MultiFrameTxs to UseBlobs, as we use it synonymously now
txdata := txData{frames: make([]frameData, 0, nf), asBlob: s.cfg.MultiFrameTxs}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames))
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
s.pendingTransactions[id] = txdata

return txdata
Expand Down
14 changes: 14 additions & 0 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type ChannelConfig struct {
MultiFrameTxs bool
}

// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static
// ChannelConfigProvider of itself.
func (cc ChannelConfig) ChannelConfig() ChannelConfig {
return cc
}

// InitCompressorConfig (re)initializes the channel configuration's compressor
// configuration using the given values. The TargetOutputSize will be set to a
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
Expand Down Expand Up @@ -75,6 +81,14 @@ func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib)
}

func (cc *ChannelConfig) ReinitCompressorConfig() {
cc.InitCompressorConfig(
cc.CompressorConfig.ApproxComprRatio,
cc.CompressorConfig.Kind,
cc.CompressorConfig.CompressionAlgo,
)
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
Expand Down
101 changes: 101 additions & 0 deletions op-batcher/batcher/channel_config_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package batcher

import (
"context"
"math/big"
"time"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

const randomByteCalldataGas = params.TxDataNonZeroGasEIP2028

type (
ChannelConfigProvider interface {
ChannelConfig() ChannelConfig
}

GasPricer interface {
SuggestGasPriceCaps(ctx context.Context) (tipCap *big.Int, baseFee *big.Int, blobBaseFee *big.Int, err error)
}

DynamicEthChannelConfig struct {
log log.Logger
timeout time.Duration // query timeout
gasPricer GasPricer

blobConfig ChannelConfig
calldataConfig ChannelConfig
lastConfig *ChannelConfig
}
)

func NewDynamicEthChannelConfig(lgr log.Logger,
reqTimeout time.Duration, gasPricer GasPricer,
blobConfig ChannelConfig, calldataConfig ChannelConfig,
) *DynamicEthChannelConfig {
// Copy blobConfig and statically configure fallback calldata config.
// In the future, we might want to make the calldata config configurable.
// cdCfg := blobConfig
// cdCfg.TargetNumFrames = 1
// cdCfg.MaxFrameSize = 120_000
// cdCfg.MultiFrameTxs = false

dec := &DynamicEthChannelConfig{
log: lgr,
timeout: reqTimeout,
gasPricer: gasPricer,
blobConfig: blobConfig,
calldataConfig: calldataConfig,
}
// start with blob config
dec.lastConfig = &dec.blobConfig
return dec
}

func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
ctx, cancel := context.WithTimeout(context.Background(), dec.timeout)
defer cancel()
tipCap, baseFee, blobBaseFee, err := dec.gasPricer.SuggestGasPriceCaps(ctx)
if err != nil {
dec.log.Warn("Error querying gas prices, returning last config", "err", err)
return *dec.lastConfig
}

// We estimate the gas costs of a calldata and blob tx under the assumption that we'd fill
// a frame fully and compressed random channel data has few zeros, so they can be
// ignored in the calldata gas price estimation.
// It is also assumed that a calldata tx would contain exactly one full frame
// and a blob tx would contain target-num-frames many blobs.

// It would be nicer to use core.IntrinsicGas, but we don't have the actual data at hand
calldataBytes := dec.calldataConfig.MaxFrameSize + 1 // + 1 version byte
calldataGas := big.NewInt(int64(calldataBytes*randomByteCalldataGas + params.TxGas))
calldataPrice := new(big.Int).Add(baseFee, tipCap)
calldataCost := new(big.Int).Mul(calldataGas, calldataPrice)

blobGas := big.NewInt(eth.BlobSize * int64(dec.blobConfig.TargetNumFrames))
blobCost := new(big.Int).Mul(blobGas, blobBaseFee)
// blobs still have intrinsic calldata costs
blobCalldataCost := new(big.Int).Mul(big.NewInt(int64(params.TxGas)), calldataPrice)
blobCost = blobCost.Add(blobCost, blobCalldataCost)

blobDataBytes := big.NewInt(eth.MaxBlobDataSize * int64(dec.blobConfig.TargetNumFrames))
lgr := dec.log.New("base_fee", baseFee, "blob_base_fee", blobBaseFee, "tip_cap", tipCap,
"calldata_bytes", calldataBytes, "calldata_cost", calldataCost,
"blob_data_bytes", blobDataBytes, "blob_cost", blobCost)

// Now we compare the prices normalized to the number of bytes that can be
// submitted for that price.
if new(big.Int).Mul(blobCost, big.NewInt(int64(calldataBytes))).
Cmp(new(big.Int).Mul(calldataCost, blobDataBytes)) == 1 {
lgr.Info("Using calldata channel config")
dec.lastConfig = &dec.calldataConfig
return dec.calldataConfig
}
lgr.Info("Using blob channel config")
dec.lastConfig = &dec.blobConfig
return dec.blobConfig
}
126 changes: 126 additions & 0 deletions op-batcher/batcher/channel_config_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package batcher

import (
"context"
"errors"
"math/big"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slog"
)

type mockGasPricer struct {
err error
tipCap int64
baseFee int64
blobBaseFee int64
}

func (gp *mockGasPricer) SuggestGasPriceCaps(context.Context) (tipCap *big.Int, baseFee *big.Int, blobBaseFee *big.Int, err error) {
if gp.err != nil {
return nil, nil, nil, gp.err
}
return big.NewInt(gp.tipCap), big.NewInt(gp.baseFee), big.NewInt(gp.blobBaseFee), nil
}

func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000,
TargetNumFrames: 1,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 6,
MultiFrameTxs: true,
}

tests := []struct {
name string
tipCap int64
baseFee int64
blobBaseFee int64
wantCalldata bool
}{
{
name: "much-cheaper-blobs",
tipCap: 1e3,
baseFee: 1e6,
blobBaseFee: 1,
},
{
name: "close-cheaper-blobs",
tipCap: 1e3,
baseFee: 1e6,
blobBaseFee: 16e6,
},
{
name: "close-cheaper-calldata",
tipCap: 1e3,
baseFee: 1e6,
blobBaseFee: 17e6,
wantCalldata: true,
},
{
name: "much-cheaper-calldata",
tipCap: 1e3,
baseFee: 1e6,
blobBaseFee: 1e9,
wantCalldata: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lgr, ch := testlog.CaptureLogger(t, slog.LevelInfo)
gp := &mockGasPricer{
tipCap: tt.tipCap,
baseFee: tt.baseFee,
blobBaseFee: tt.blobBaseFee,
}
dec := NewDynamicEthChannelConfig(lgr, 1*time.Second, gp, blobCfg, calldataCfg)
cc := dec.ChannelConfig()
if tt.wantCalldata {
require.Equal(t, cc, calldataCfg)
require.NotNil(t, ch.FindLog(testlog.NewMessageContainsFilter("calldata")))
require.Same(t, &dec.calldataConfig, dec.lastConfig)
} else {
require.Equal(t, cc, blobCfg)
require.NotNil(t, ch.FindLog(testlog.NewMessageContainsFilter("blob")))
require.Same(t, &dec.blobConfig, dec.lastConfig)
}
})
}

t.Run("error-latest", func(t *testing.T) {
lgr, ch := testlog.CaptureLogger(t, slog.LevelInfo)
gp := &mockGasPricer{
tipCap: 1,
baseFee: 1e3,
blobBaseFee: 1e6, // should return calldata cfg without error
err: errors.New("gp-error"),
}
dec := NewDynamicEthChannelConfig(lgr, 1*time.Second, gp, blobCfg, calldataCfg)
require.Equal(t, dec.ChannelConfig(), blobCfg)
require.NotNil(t, ch.FindLog(
testlog.NewLevelFilter(slog.LevelWarn),
testlog.NewMessageContainsFilter("returning last config"),
))

gp.err = nil
require.Equal(t, dec.ChannelConfig(), calldataCfg)
require.NotNil(t, ch.FindLog(
testlog.NewLevelFilter(slog.LevelInfo),
testlog.NewMessageContainsFilter("calldata"),
))

gp.err = errors.New("gp-error-2")
require.Equal(t, dec.ChannelConfig(), calldataCfg)
require.NotNil(t, ch.FindLog(
testlog.NewLevelFilter(slog.LevelWarn),
testlog.NewMessageContainsFilter("returning last config"),
))
})
}
34 changes: 18 additions & 16 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ var ErrReorg = errors.New("block does not extend existing chain")
// channel.
// Public functions on channelManager are safe for concurrent access.
type channelManager struct {
mu sync.Mutex
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rollupCfg *rollup.Config
mu sync.Mutex
log log.Logger
metr metrics.Metricer
cfgProvider ChannelConfigProvider
rollupCfg *rollup.Config

// All blocks since the last request for new tx data.
blocks []*types.Block
Expand All @@ -49,13 +49,13 @@ type channelManager struct {
closed bool
}

func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
log: log,
metr: metr,
cfgProvider: cfgProvider,
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
}
}

Expand Down Expand Up @@ -203,7 +203,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
cfg := s.cfgProvider.ChannelConfig()
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
Expand All @@ -216,10 +217,11 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType,
"compression_algo", s.cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", s.cfg.TargetNumFrames,
"max_frame_size", s.cfg.MaxFrameSize,
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.MultiFrameTxs,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))

Expand Down
8 changes: 8 additions & 0 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type CLIConfig struct {
// the data availability type to use for posting batches, e.g. blobs vs calldata.
DataAvailabilityType flags.DataAvailabilityType

// If DynamicEthDA is true, the batcher will switch dynamically to calldata
// transactions if blob transactions become more expensive.
DynamicEthDA bool

// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
// Should only be used for testing purposes.
TestUseMaxTxSizeForBlobs bool
Expand Down Expand Up @@ -140,6 +144,9 @@ func (c *CLIConfig) Check() error {
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
if c.DynamicEthDA && c.DataAvailabilityType != flags.BlobsType {
return errors.New("dynamic eth DA is only available for blobs")
}
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
return fmt.Errorf("unknown data availability type: %q", c.DataAvailabilityType)
}
Expand Down Expand Up @@ -181,6 +188,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
DynamicEthDA: ctx.Bool(flags.DynamicEthDAFlag.Name),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
Expand Down
Loading

0 comments on commit 9c988d2

Please sign in to comment.