From 011dd3bc239e9b6befb8fcf7a39f4f676601177e Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 11 Jun 2024 16:25:16 +0800 Subject: [PATCH 01/12] feat: op-batcher auto switch to economic DA type --- op-batcher/batcher/channel_manager.go | 16 +++ op-batcher/batcher/config.go | 2 +- op-batcher/batcher/driver.go | 149 ++++++++++++++++++++++++++ op-batcher/batcher/service.go | 8 +- op-batcher/flags/types.go | 2 + op-service/txmgr/cli.go | 6 +- op-service/txmgr/txmgr.go | 2 +- 7 files changed, 177 insertions(+), 8 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 491837182a..0cf13c8212 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -225,6 +226,21 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } +func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) { + s.mu.Lock() + defer s.mu.Unlock() + switch targetDAType { + case flags.BlobsType: + s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1 + s.cfg.MultiFrameTxs = true + case flags.CalldataType: + s.cfg.MaxFrameSize = CallDataMaxTxSize - 1 + s.cfg.MultiFrameTxs = false + default: + s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String()) + } +} + // registerL1Block registers the given block at the pending channel. func (s *channelManager) registerL1Block(l1Head eth.BlockID) { s.currentChannel.CheckTimeout(l1Head.Number) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 7450e83fb5..662de33a37 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -118,7 +118,7 @@ func (c *CLIConfig) Check() error { if c.BatchType > 1 { return fmt.Errorf("unknown batch type: %v", c.BatchType) } - if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 { + if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 { return errors.New("too many frames for blob transactions, max 6") } if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) { diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 50bc25142e..16d1f7dabe 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -7,9 +7,12 @@ import ( "io" "math/big" _ "net/http/pprof" + "strings" "sync" + "sync/atomic" "time" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -17,15 +20,23 @@ import ( "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" ) +// Auto DA params +const DATypeSwitchThrehold int = 5 +const CallDataMaxTxSize uint64 = 120000 +const ApproximateGasPerCallDataTx int64 = 1934892 + var ErrBatcherNotRunning = errors.New("batcher is not running") type L1Client interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + SuggestGasTipCap(ctx context.Context) (*big.Int, error) } type L2Client interface { @@ -47,6 +58,7 @@ type DriverSetup struct { EndpointProvider dial.L2EndpointProvider ChannelConfig ChannelConfig PlasmaDA *plasma.DAClient + AutoSwitchDA bool } // BatchSubmitter encapsulates a service responsible for submitting L2 tx @@ -68,6 +80,9 @@ type BatchSubmitter struct { lastStoredBlock eth.BlockID lastL1Tip eth.L1BlockRef + // addressReservedError is recorded from L1 txpool, which may occur when switch DA type + addressReservedError atomic.Bool + state *channelManager } @@ -272,6 +287,65 @@ func (l *BatchSubmitter) loop() { } }() + economicDATypeCh := make(chan flags.DataAvailabilityType) + if l.AutoSwitchDA { + // start auto choose economic DA type processing loop + economicDALoopDone := make(chan struct{}) + defer close(economicDALoopDone) // shut down auto DA loop + go func() { + economicDAType := flags.BlobsType + switchCount := 0 + economicDATicker := time.NewTicker(time.Minute) + defer economicDATicker.Stop() + addressReservedErrorTicker := time.NewTicker(time.Second) + defer addressReservedErrorTicker.Stop() + for { + select { + case <-economicDATicker.C: + newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx) + if err != nil { + l.Log.Error("getEconomicDAType failed: %w", err) + continue + } + if newEconomicDAType != economicDAType { + switchCount++ + } else { + switchCount = 0 + } + if switchCount >= DATypeSwitchThrehold { + l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String()) + start := time.Now() + economicDAType = newEconomicDAType + switchCount = 0 + economicDATypeCh <- economicDAType + l.Log.Info("finish economic switch", "duration", time.Since(start)) + } + case <-addressReservedErrorTicker.C: + if l.addressReservedError.Load() { + if economicDAType == flags.BlobsType { + economicDAType = flags.CalldataType + l.Log.Info("start resolve addressReservedError switch", "from type", flags.BlobsType.String(), "to type", flags.CalldataType.String()) + } else if economicDAType == flags.CalldataType { + economicDAType = flags.BlobsType + l.Log.Info("start resolve addressReservedError switch", "from type", flags.CalldataType.String(), "to type", flags.BlobsType.String()) + } else { + l.Log.Crit("invalid DA type in economic switch loop", "invalid type", economicDAType.String()) + } + switchCount = 0 + start := time.Now() + economicDATypeCh <- economicDAType + l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start)) + time.Sleep(5 * time.Minute) // stop economic type switching to let addressRservedError resolved first + l.addressReservedError.Store(false) + } + case <-economicDALoopDone: + l.Log.Info("auto DA processing loop done") + return + } + } + }() + } + ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() @@ -302,6 +376,23 @@ func (l *BatchSubmitter) loop() { continue } l.publishStateToL1(queue, receiptsCh) + case targetDAType := <-economicDATypeCh: + // close current state to prepare for switch + err := l.state.Close() + if err != nil { + if errors.Is(err, ErrPendingAfterClose) { + l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting") + } else { + l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err) + } + } + // on DA type switch we want to publish all pending state then wait until each result clears before resetting + // the state. + publishAndWait() + l.clearState(l.shutdownCtx) + // switch action after clear state + l.switchDAType(targetDAType) + continue case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -324,6 +415,52 @@ func (l *BatchSubmitter) loop() { } } +func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) { + sCtx, sCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout) + defer sCancel() + gasPrice, err := l.L1Client.SuggestGasTipCap(sCtx) + if err != nil { + return "", fmt.Errorf("getEconomicDAType failed to fetch the suggested gas tip cap: %w", err) + } + calldataCost := big.NewInt(0).Mul(big.NewInt(6*ApproximateGasPerCallDataTx), gasPrice) + + hCtx, hCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout) + defer hCancel() + header, err := l.L1Client.HeaderByNumber(hCtx, nil) + if err != nil { + return "", fmt.Errorf("getEconomicDAType failed to fetch the latest header: %w", err) + } + if header.ExcessBlobGas == nil { + return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", header) + } + blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas) + blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(21000), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice)) + + if calldataCost.Cmp(blobCost) < 0 { + l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost) + return flags.CalldataType, nil + } + l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost) + return flags.BlobsType, nil +} + +func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) { + switch targetDAType { + case flags.BlobsType: + l.Config.UseBlobs = true + l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1 + l.ChannelConfig.MultiFrameTxs = true + l.state.SwitchDAType(targetDAType) + case flags.CalldataType: + l.Config.UseBlobs = false + l.ChannelConfig.MaxFrameSize = CallDataMaxTxSize - 1 + l.ChannelConfig.MultiFrameTxs = false + l.state.SwitchDAType(targetDAType) + default: + l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String()) + } +} + // publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is // no more data to queue for publishing or if there was an error queing the data. func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) { @@ -525,6 +662,9 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.Log.Warn("Transaction failed to send", logFields(id, err)...) l.state.TxFailed(id) + if errStringMatch(err, txmgr.ErrAlreadyReserved) && l.AutoSwitchDA { + l.addressReservedError.Store(true) + } } func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) { @@ -560,3 +700,12 @@ func logFields(xs ...any) (fs []any) { } return fs } + +func errStringMatch(err, target error) bool { + if err == nil && target == nil { + return true + } else if err == nil || target == nil { + return false + } + return strings.Contains(err.Error(), target.Error()) +} diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 0b3a3552d2..c1a19065e9 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -120,7 +120,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, if err := bs.initPlasmaDA(cfg); err != nil { return fmt.Errorf("failed to init plasma DA: %w", err) } - bs.initDriver() + bs.initDriver(cfg) if err := bs.initRPCServer(cfg); err != nil { return fmt.Errorf("failed to start RPC server: %w", err) } @@ -198,7 +198,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { } switch cfg.DataAvailabilityType { - case flags.BlobsType: + case flags.BlobsType, flags.AutoType: if !cfg.TestUseMaxTxSizeForBlobs { // account for version byte prefix cc.MaxFrameSize = eth.MaxBlobDataSize - 1 @@ -228,6 +228,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { return fmt.Errorf("invalid channel configuration: %w", err) } bs.Log.Info("Initialized channel-config", + "da_type", cfg.DataAvailabilityType.String(), "use_blobs", bs.UseBlobs, "use_plasma", bs.UsePlasma, "max_frame_size", cc.MaxFrameSize, @@ -286,7 +287,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error { return nil } -func (bs *BatcherService) initDriver() { +func (bs *BatcherService) initDriver(cfg *CLIConfig) { bs.driver = NewBatchSubmitter(DriverSetup{ Log: bs.Log, Metr: bs.Metrics, @@ -297,6 +298,7 @@ func (bs *BatcherService) initDriver() { EndpointProvider: bs.EndpointProvider, ChannelConfig: bs.ChannelConfig, PlasmaDA: bs.PlasmaDA, + AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType, }) } diff --git a/op-batcher/flags/types.go b/op-batcher/flags/types.go index 0db97cdad2..775c916d02 100644 --- a/op-batcher/flags/types.go +++ b/op-batcher/flags/types.go @@ -8,11 +8,13 @@ const ( // data availability types CalldataType DataAvailabilityType = "calldata" BlobsType DataAvailabilityType = "blobs" + AutoType DataAvailabilityType = "auto" ) var DataAvailabilityTypes = []DataAvailabilityType{ CalldataType, BlobsType, + AutoType, } func (kind DataAvailabilityType) String() string { diff --git a/op-service/txmgr/cli.go b/op-service/txmgr/cli.go index 591aa0df7d..533961adaf 100644 --- a/op-service/txmgr/cli.go +++ b/op-service/txmgr/cli.go @@ -77,7 +77,7 @@ var ( SafeAbortNonceTooLowCount: uint64(3), FeeLimitMultiplier: uint64(5), FeeLimitThresholdGwei: 100.0, - BlobGasPriceLimitGwei: 100.0, + BlobGasPriceLimitGwei: 0, MinTipCapGwei: 1.0, MinBaseFeeGwei: 1.0, ResubmissionTimeout: 48 * time.Second, @@ -91,7 +91,7 @@ var ( SafeAbortNonceTooLowCount: uint64(3), FeeLimitMultiplier: uint64(5), FeeLimitThresholdGwei: 100.0, - BlobGasPriceLimitGwei: 100.0, + BlobGasPriceLimitGwei: 0, MinTipCapGwei: 1.0, MinBaseFeeGwei: 1.0, ResubmissionTimeout: 24 * time.Second, @@ -152,7 +152,7 @@ func CLIFlagsWithDefaults(envPrefix string, defaults DefaultFlagValues) []cli.Fl }, &cli.Float64Flag{ Name: BlobGasPriceLimitFlagName, - Usage: "The maximum limit (in GWei) of blob gas price, above which will stop submit and wait for the price go down", + Usage: "The maximum limit (in GWei) of blob gas price, above which will stop submit and wait for the price go down. Default value is 0(disabled)", Value: defaults.BlobGasPriceLimitGwei, EnvVars: prefixEnvVars("TXMGR_BLOB_GAS_PRICE_LIMIT"), }, diff --git a/op-service/txmgr/txmgr.go b/op-service/txmgr/txmgr.go index 5e00b1a9f6..e409de3211 100644 --- a/op-service/txmgr/txmgr.go +++ b/op-service/txmgr/txmgr.go @@ -798,7 +798,7 @@ func (m *SimpleTxManager) checkLimits(tip, baseFee, bumpedTip, bumpedFee *big.In func (m *SimpleTxManager) checkBlobFeeLimits(blobBaseFee, bumpedBlobFee *big.Int) error { // If above limit, do not send transaction - if limit := m.cfg.BlobGasPriceLimit; limit != nil && limit.Cmp(bumpedBlobFee) == -1 { + if limit := m.cfg.BlobGasPriceLimit; limit != nil && limit.Cmp(big.NewInt(0)) == 1 && limit.Cmp(bumpedBlobFee) == -1 { return fmt.Errorf( "bumped blob fee %v is over blob gas price limit value: %v", bumpedBlobFee, m.cfg.BlobGasPriceLimit) From 24d30f66cceb627faa3c9386f54cc713fb434273 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 13 Jun 2024 11:40:22 +0800 Subject: [PATCH 02/12] chore: add metrics for auto switch --- op-batcher/batcher/driver.go | 9 +++++ op-batcher/metrics/metrics.go | 76 +++++++++++++++++++++++++++++++++++ op-batcher/metrics/noop.go | 10 +++++ 3 files changed, 95 insertions(+) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 16d1f7dabe..14058d4861 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -294,6 +294,7 @@ func (l *BatchSubmitter) loop() { defer close(economicDALoopDone) // shut down auto DA loop go func() { economicDAType := flags.BlobsType + l.Metr.RecordAutoChoosedDAType(economicDAType) switchCount := 0 economicDATicker := time.NewTicker(time.Minute) defer economicDATicker.Stop() @@ -319,6 +320,9 @@ func (l *BatchSubmitter) loop() { switchCount = 0 economicDATypeCh <- economicDAType l.Log.Info("finish economic switch", "duration", time.Since(start)) + l.Metr.RecordAutoChoosedDAType(economicDAType) + l.Metr.RecordEconomicAutoSwitchCount() + l.Metr.RecordAutoSwitchTimeDuration(time.Since(start)) } case <-addressReservedErrorTicker.C: if l.addressReservedError.Load() { @@ -335,6 +339,9 @@ func (l *BatchSubmitter) loop() { start := time.Now() economicDATypeCh <- economicDAType l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start)) + l.Metr.RecordAutoChoosedDAType(economicDAType) + l.Metr.RecordReservedErrorSwitchCount() + l.Metr.RecordAutoSwitchTimeDuration(time.Since(start)) time.Sleep(5 * time.Minute) // stop economic type switching to let addressRservedError resolved first l.addressReservedError.Store(false) } @@ -436,6 +443,8 @@ func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvail blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas) blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(21000), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice)) + l.Metr.RecordEstimatedCalldataTypeFee(calldataCost) + l.Metr.RecordEstimatedBlobTypeFee(blobCost) if calldataCost.Cmp(blobCost) < 0 { l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost) return flags.CalldataType, nil diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index 89a3cc2528..37646586ee 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -2,6 +2,8 @@ package metrics import ( "io" + "math/big" + "time" "github.com/prometheus/client_golang/prometheus" @@ -9,7 +11,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -47,6 +51,13 @@ type Metricer interface { RecordBlobUsedBytes(num int) + RecordAutoChoosedDAType(daType flags.DataAvailabilityType) + RecordEconomicAutoSwitchCount() + RecordReservedErrorSwitchCount() + RecordAutoSwitchTimeDuration(duration time.Duration) + RecordEstimatedCalldataTypeFee(fee *big.Int) + RecordEstimatedBlobTypeFee(fee *big.Int) + Document() []opmetrics.DocumentedMetric } @@ -81,6 +92,13 @@ type Metrics struct { batcherTxEvs opmetrics.EventVec blobUsedBytes prometheus.Histogram + + autoChoosedDAType prometheus.Gauge + economicAutoSwitchCount prometheus.Counter + reservedErrorSwitchCount prometheus.Counter + autoSwitchTimeDuration prometheus.Gauge + estimatedCalldataTypeFee prometheus.Gauge + estimatedBlobTypeFee prometheus.Gauge } var _ Metricer = (*Metrics)(nil) @@ -188,6 +206,36 @@ func NewMetrics(procName string) *Metrics { Help: "Blob size in bytes (of last blob only for multi-blob txs).", Buckets: prometheus.LinearBuckets(0.0, eth.MaxBlobDataSize/13, 14), }), + autoChoosedDAType: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "auto_choosed_da_type", + Help: "Current DA type choosed by auto switch", + }), + economicAutoSwitchCount: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "economic_auto_switch_count", + Help: "Total number of switch action caused by economic calculation", + }), + reservedErrorSwitchCount: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "reserved_error_switch_count", + Help: "Total number of switch action caused by txpool addressReservedError", + }), + autoSwitchTimeDuration: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "auto_switch_time_duration", + Help: "Time duration in milliseconds of auto switch action", + }), + estimatedCalldataTypeFee: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "estimated_calldata_type_fee", + Help: "Current estimated fee in gwei of calldata type(6 txes) by auto switch routine", + }), + estimatedBlobTypeFee: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "estimated_blob_type_fee", + Help: "Current estimated fee in gwei of blob type(1 tx with 6 blobs) by auto switch routine", + }), batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), } @@ -315,6 +363,34 @@ func (m *Metrics) RecordBlobUsedBytes(num int) { m.blobUsedBytes.Observe(float64(num)) } +func (m *Metrics) RecordAutoChoosedDAType(daType flags.DataAvailabilityType) { + if daType == flags.CalldataType { + m.autoChoosedDAType.Set(0) + } else if daType == flags.BlobsType { + m.autoChoosedDAType.Set(1) + } +} + +func (m *Metrics) RecordEconomicAutoSwitchCount() { + m.economicAutoSwitchCount.Inc() +} + +func (m *Metrics) RecordReservedErrorSwitchCount() { + m.reservedErrorSwitchCount.Inc() +} + +func (m *Metrics) RecordAutoSwitchTimeDuration(duration time.Duration) { + m.autoSwitchTimeDuration.Set(float64(duration.Milliseconds())) +} + +func (m *Metrics) RecordEstimatedCalldataTypeFee(fee *big.Int) { + m.estimatedCalldataTypeFee.Set(float64(fee.Uint64() / params.GWei)) +} + +func (m *Metrics) RecordEstimatedBlobTypeFee(fee *big.Int) { + m.estimatedBlobTypeFee.Set(float64(fee.Uint64() / params.GWei)) +} + // estimateBatchSize estimates the size of the batch func estimateBatchSize(block *types.Block) uint64 { size := uint64(70) // estimated overhead of batch metadata diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go index c7de3496e4..97fcb144be 100644 --- a/op-batcher/metrics/noop.go +++ b/op-batcher/metrics/noop.go @@ -2,12 +2,15 @@ package metrics import ( "io" + "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -47,5 +50,12 @@ func (*noopMetrics) StartBalanceMetrics(log.Logger, ethereum.ChainStateReader, c } func (*noopMetrics) RecordBlobsNumber(number int) {} +func (*noopMetrics) RecordAutoChoosedDAType(daType flags.DataAvailabilityType) {} +func (*noopMetrics) RecordEconomicAutoSwitchCount() {} +func (*noopMetrics) RecordReservedErrorSwitchCount() {} +func (*noopMetrics) RecordAutoSwitchTimeDuration(duration time.Duration) {} +func (*noopMetrics) RecordEstimatedCalldataTypeFee(fee *big.Int) {} +func (*noopMetrics) RecordEstimatedBlobTypeFee(fee *big.Int) {} + func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { } From 628490cb67b496beecc7e465417259b7c03422cf Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 13 Jun 2024 11:44:21 +0800 Subject: [PATCH 03/12] chore: fix fmt --- op-batcher/metrics/noop.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go index 97fcb144be..08dc44106f 100644 --- a/op-batcher/metrics/noop.go +++ b/op-batcher/metrics/noop.go @@ -51,11 +51,11 @@ func (*noopMetrics) StartBalanceMetrics(log.Logger, ethereum.ChainStateReader, c func (*noopMetrics) RecordBlobsNumber(number int) {} func (*noopMetrics) RecordAutoChoosedDAType(daType flags.DataAvailabilityType) {} -func (*noopMetrics) RecordEconomicAutoSwitchCount() {} -func (*noopMetrics) RecordReservedErrorSwitchCount() {} -func (*noopMetrics) RecordAutoSwitchTimeDuration(duration time.Duration) {} -func (*noopMetrics) RecordEstimatedCalldataTypeFee(fee *big.Int) {} -func (*noopMetrics) RecordEstimatedBlobTypeFee(fee *big.Int) {} +func (*noopMetrics) RecordEconomicAutoSwitchCount() {} +func (*noopMetrics) RecordReservedErrorSwitchCount() {} +func (*noopMetrics) RecordAutoSwitchTimeDuration(duration time.Duration) {} +func (*noopMetrics) RecordEstimatedCalldataTypeFee(fee *big.Int) {} +func (*noopMetrics) RecordEstimatedBlobTypeFee(fee *big.Int) {} func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { } From ebbbfcbb59e21954851e6c32e0e6b733c95435db Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 13 Jun 2024 17:43:37 +0800 Subject: [PATCH 04/12] fix(op-batcher): reset l.lastStoredBlock when auto switch da type --- op-batcher/batcher/driver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 14058d4861..c66d06542d 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -384,6 +384,7 @@ func (l *BatchSubmitter) loop() { } l.publishStateToL1(queue, receiptsCh) case targetDAType := <-economicDATypeCh: + l.lastStoredBlock = eth.BlockID{} // close current state to prepare for switch err := l.state.Close() if err != nil { From 04befd778e81f253abbaa518e1d2a1b3a061b5e6 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 13 Jun 2024 19:20:18 +0800 Subject: [PATCH 05/12] optimize(batcher): limit the max load blocks one time --- op-batcher/batcher/driver.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index c66d06542d..59fc5eb106 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -27,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/params" ) +const LimitLoadBlocksOneTime uint64 = 300 + // Auto DA params const DATypeSwitchThrehold int = 5 const CallDataMaxTxSize uint64 = 120000 @@ -170,10 +172,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { } else if start.Number >= end.Number { return errors.New("start number is >= end number") } + // Limit the max loaded blocks one time + endNumber := end.Number + if endNumber-start.Number > LimitLoadBlocksOneTime { + endNumber = start.Number + LimitLoadBlocksOneTime + } var latestBlock *types.Block // Add all blocks to "state" - for i := start.Number + 1; i < end.Number+1; i++ { + for i := start.Number + 1; i < endNumber+1; i++ { block, err := l.loadBlockIntoState(ctx, i) if errors.Is(err, ErrReorg) { l.Log.Warn("Found L2 reorg", "block_number", i) From af2af4bbe3b6fa0cd0c95ab60f1e4bd15ba30d72 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 14 Jun 2024 10:24:33 +0800 Subject: [PATCH 06/12] chore: opt params --- op-batcher/batcher/driver.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 59fc5eb106..0558d8db5c 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -33,6 +33,7 @@ const LimitLoadBlocksOneTime uint64 = 300 const DATypeSwitchThrehold int = 5 const CallDataMaxTxSize uint64 = 120000 const ApproximateGasPerCallDataTx int64 = 1934892 +const MaxBlobsNumberPerTx int64 = 6 var ErrBatcherNotRunning = errors.New("batcher is not running") @@ -303,7 +304,7 @@ func (l *BatchSubmitter) loop() { economicDAType := flags.BlobsType l.Metr.RecordAutoChoosedDAType(economicDAType) switchCount := 0 - economicDATicker := time.NewTicker(time.Minute) + economicDATicker := time.NewTicker(30 * time.Second) defer economicDATicker.Stop() addressReservedErrorTicker := time.NewTicker(time.Second) defer addressReservedErrorTicker.Stop() @@ -437,7 +438,7 @@ func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvail if err != nil { return "", fmt.Errorf("getEconomicDAType failed to fetch the suggested gas tip cap: %w", err) } - calldataCost := big.NewInt(0).Mul(big.NewInt(6*ApproximateGasPerCallDataTx), gasPrice) + calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice) hCtx, hCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout) defer hCancel() @@ -449,7 +450,7 @@ func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvail return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", header) } blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas) - blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(21000), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice)) + blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice)) l.Metr.RecordEstimatedCalldataTypeFee(calldataCost) l.Metr.RecordEstimatedBlobTypeFee(blobCost) From f61d62e5e5918e686d0f1f0e93787c04db3f92ff Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 17 Jun 2024 10:52:35 +0800 Subject: [PATCH 07/12] op-batcher: tune auto switch params --- op-batcher/batcher/driver.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 0558d8db5c..451df0865c 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -27,10 +27,10 @@ import ( "github.com/ethereum/go-ethereum/params" ) -const LimitLoadBlocksOneTime uint64 = 300 +const LimitLoadBlocksOneTime uint64 = 30 // Auto DA params -const DATypeSwitchThrehold int = 5 +const DATypeSwitchThrehold int = 2 const CallDataMaxTxSize uint64 = 120000 const ApproximateGasPerCallDataTx int64 = 1934892 const MaxBlobsNumberPerTx int64 = 6 @@ -304,7 +304,7 @@ func (l *BatchSubmitter) loop() { economicDAType := flags.BlobsType l.Metr.RecordAutoChoosedDAType(economicDAType) switchCount := 0 - economicDATicker := time.NewTicker(30 * time.Second) + economicDATicker := time.NewTicker(5 * time.Second) defer economicDATicker.Stop() addressReservedErrorTicker := time.NewTicker(time.Second) defer addressReservedErrorTicker.Stop() From 2400897fac2b5786baca7fcb824a1444ec397e9a Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 17 Jun 2024 11:52:05 +0800 Subject: [PATCH 08/12] chore(batcher): opt auto swtich metrics --- op-batcher/metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index 37646586ee..3791ca89cc 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -384,11 +384,11 @@ func (m *Metrics) RecordAutoSwitchTimeDuration(duration time.Duration) { } func (m *Metrics) RecordEstimatedCalldataTypeFee(fee *big.Int) { - m.estimatedCalldataTypeFee.Set(float64(fee.Uint64() / params.GWei)) + m.estimatedCalldataTypeFee.Set(float64(fee.Uint64()) / params.GWei) } func (m *Metrics) RecordEstimatedBlobTypeFee(fee *big.Int) { - m.estimatedBlobTypeFee.Set(float64(fee.Uint64() / params.GWei)) + m.estimatedBlobTypeFee.Set(float64(fee.Uint64()) / params.GWei) } // estimateBatchSize estimates the size of the batch From 3dd433a509d711b8e84a98b0c0fea51e2b73fd45 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 17 Jun 2024 12:18:08 +0800 Subject: [PATCH 09/12] op-batcher: tune addressReservedError handle params --- op-batcher/batcher/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 451df0865c..2105584781 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -350,7 +350,7 @@ func (l *BatchSubmitter) loop() { l.Metr.RecordAutoChoosedDAType(economicDAType) l.Metr.RecordReservedErrorSwitchCount() l.Metr.RecordAutoSwitchTimeDuration(time.Since(start)) - time.Sleep(5 * time.Minute) // stop economic type switching to let addressRservedError resolved first + time.Sleep(time.Second) // stop to let last addressRservedError handled first l.addressReservedError.Store(false) } case <-economicDALoopDone: From a9841cfa31d91e98545014e81a315a7b4e603d74 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:45:49 +0800 Subject: [PATCH 10/12] op-batcher: tune auto switch params --- op-batcher/batcher/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 2105584781..cdd7de28c3 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -30,7 +30,7 @@ import ( const LimitLoadBlocksOneTime uint64 = 30 // Auto DA params -const DATypeSwitchThrehold int = 2 +const DATypeSwitchThrehold int = 3 const CallDataMaxTxSize uint64 = 120000 const ApproximateGasPerCallDataTx int64 = 1934892 const MaxBlobsNumberPerTx int64 = 6 From 88ff78a4a64d9e0623fd9116841816e0976466bb Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 18 Jun 2024 16:48:05 +0800 Subject: [PATCH 11/12] batcher: optimize auto switch strategy --- op-batcher/batcher/driver.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index cdd7de28c3..5ce5cf0b62 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -30,7 +30,7 @@ import ( const LimitLoadBlocksOneTime uint64 = 30 // Auto DA params -const DATypeSwitchThrehold int = 3 +const DATypeSwitchThrehold int = 5 const CallDataMaxTxSize uint64 = 120000 const ApproximateGasPerCallDataTx int64 = 1934892 const MaxBlobsNumberPerTx int64 = 6 @@ -296,6 +296,7 @@ func (l *BatchSubmitter) loop() { }() economicDATypeCh := make(chan flags.DataAvailabilityType) + waitSwitchDACh := make(chan struct{}) if l.AutoSwitchDA { // start auto choose economic DA type processing loop economicDALoopDone := make(chan struct{}) @@ -321,12 +322,17 @@ func (l *BatchSubmitter) loop() { } else { switchCount = 0 } - if switchCount >= DATypeSwitchThrehold { + threhold := DATypeSwitchThrehold + if economicDAType == flags.CalldataType { + threhold = 20 * DATypeSwitchThrehold + } + if switchCount >= threhold { l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String()) start := time.Now() economicDAType = newEconomicDAType switchCount = 0 economicDATypeCh <- economicDAType + <-waitSwitchDACh l.Log.Info("finish economic switch", "duration", time.Since(start)) l.Metr.RecordAutoChoosedDAType(economicDAType) l.Metr.RecordEconomicAutoSwitchCount() @@ -346,11 +352,11 @@ func (l *BatchSubmitter) loop() { switchCount = 0 start := time.Now() economicDATypeCh <- economicDAType + <-waitSwitchDACh l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start)) l.Metr.RecordAutoChoosedDAType(economicDAType) l.Metr.RecordReservedErrorSwitchCount() l.Metr.RecordAutoSwitchTimeDuration(time.Since(start)) - time.Sleep(time.Second) // stop to let last addressRservedError handled first l.addressReservedError.Store(false) } case <-economicDALoopDone: @@ -408,6 +414,8 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) // switch action after clear state l.switchDAType(targetDAType) + time.Sleep(time.Minute) // wait op-node derivate published DA data + waitSwitchDACh <- struct{}{} continue case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { @@ -681,6 +689,7 @@ func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.Log.Warn("Transaction failed to send", logFields(id, err)...) l.state.TxFailed(id) if errStringMatch(err, txmgr.ErrAlreadyReserved) && l.AutoSwitchDA { + l.Log.Warn("Encounter ErrAlreadyReserved", "id", id.String()) l.addressReservedError.Store(true) } } From 5d77a97590295eb3e5a736de42aa5c7fcf3886c8 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 18 Jun 2024 18:23:08 +0800 Subject: [PATCH 12/12] chore: fix batcher lint --- op-batcher/batcher/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 5ce5cf0b62..f91147685b 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -414,7 +414,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) // switch action after clear state l.switchDAType(targetDAType) - time.Sleep(time.Minute) // wait op-node derivate published DA data + time.Sleep(time.Minute) // wait op-node derivation published DA data waitSwitchDACh <- struct{}{} continue case <-l.shutdownCtx.Done():