Skip to content

Commit

Permalink
feat(op-batcher/op-proposer): add InstrumentedClient (#109)
Browse files Browse the repository at this point in the history
* feat(op-batcher/op-proposer): add InstrumentedClient for batcher/proposer/txmgr

* fix

* fix noopMetric npe

---------

Co-authored-by: Welkin <welkin.b@nodereal.com>
  • Loading branch information
welkin22 and Welkin authored Jan 9, 2024
1 parent 635e668 commit ffec6c8
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 17 deletions.
3 changes: 1 addition & 2 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/ethereum-optimism/optimism/op-service/client"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"

Expand All @@ -25,7 +24,7 @@ type Config struct {
log log.Logger
metr metrics.Metricer
L1Client client.EthClient
L2Client *ethclient.Client
L2Client client.EthClient
RollupNode *sources.RollupClient
TxManager txmgr.TxManager

Expand Down
2 changes: 2 additions & 0 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
if err != nil {
return nil, err
}
l1Client = opclient.NewInstrumentedClient(l1Client, m)

l2Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L2EthRpc, opclient.DefaultDialTimeout)
if err != nil {
return nil, err
}
l2Client = opclient.NewInstrumentedClient(l2Client, m)

rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package metrics

import (
"context"
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -17,6 +21,7 @@ import (
)

const Namespace = "op_batcher"
const RPCClientSubsystem = "rpc_client"

type Metricer interface {
RecordInfo(version string)
Expand All @@ -43,6 +48,7 @@ type Metricer interface {
RecordBatchTxFailed()

Document() []opmetrics.DocumentedMetric
client.Metricer
}

type Metrics struct {
Expand Down Expand Up @@ -74,6 +80,10 @@ type Metrics struct {
channelOutputBytesTotal prometheus.Counter

batcherTxEvs opmetrics.EventVec

RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec
}

var _ Metricer = (*Metrics)(nil)
Expand Down Expand Up @@ -174,6 +184,33 @@ func NewMetrics(procName string) *Metrics {
}),

batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),

RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the op-batcher's RPC client",
}, []string{
"method",
}),
RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the op-batcher's RPC client",
}, []string{
"method",
"error",
}),
}
}

Expand Down Expand Up @@ -296,6 +333,39 @@ func (m *Metrics) RecordBatchTxFailed() {
m.batcherTxEvs.Record(TxStageFailed)
}

func (m *Metrics) RecordRPCClientRequest(method string) func(err error) {
m.RPCClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.RecordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}

// RecordRPCClientResponse records an RPC response. It will
// convert the passed-in error into something metrics friendly.
// Nil errors get converted into <nil>, RPC errors are converted
// into rpc_<error code>, HTTP errors are converted into
// http_<status code>, and everything else is converted into
// <unknown>.
func (m *Metrics) RecordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}

// estimateBatchSize estimates the size of the batch
func estimateBatchSize(block *types.Block) uint64 {
size := uint64(70) // estimated overhead of batch metadata
Expand Down
5 changes: 5 additions & 0 deletions op-batcher/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ func (*noopMetrics) RecordBatchTxFailed() {}

func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) {
}

func (m *noopMetrics) RecordRPCClientRequest(method string) func(err error) {
return func(err error) {
}
}
69 changes: 69 additions & 0 deletions op-proposer/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package metrics

import (
"context"
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-node/eth"

Expand All @@ -16,6 +20,7 @@ import (
)

const Namespace = "op_proposer"
const RPCClientSubsystem = "rpc_client"

type Metricer interface {
RecordInfo(version string)
Expand All @@ -28,6 +33,7 @@ type Metricer interface {
txmetrics.TxMetricer

RecordL2BlocksProposed(l2ref eth.L2BlockRef)
client.Metricer
}

type Metrics struct {
Expand All @@ -40,6 +46,10 @@ type Metrics struct {

info prometheus.GaugeVec
up prometheus.Gauge

RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec
}

var _ Metricer = (*Metrics)(nil)
Expand Down Expand Up @@ -73,6 +83,32 @@ func NewMetrics(procName string) *Metrics {
Name: "up",
Help: "1 if the op-proposer has finished starting up",
}),
RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the op-proposer's RPC client",
}, []string{
"method",
}),
RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the op-proposer's RPC client",
}, []string{
"method",
"error",
}),
}
}

Expand Down Expand Up @@ -109,3 +145,36 @@ func (m *Metrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {
func (m *Metrics) Document() []opmetrics.DocumentedMetric {
return m.factory.Document()
}

func (m *Metrics) RecordRPCClientRequest(method string) func(err error) {
m.RPCClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.RecordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}

// RecordRPCClientResponse records an RPC response. It will
// convert the passed-in error into something metrics friendly.
// Nil errors get converted into <nil>, RPC errors are converted
// into rpc_<error code>, HTTP errors are converted into
// http_<status code>, and everything else is converted into
// <unknown>.
func (m *Metrics) RecordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
5 changes: 5 additions & 0 deletions op-proposer/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ func (*noopMetrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {}

func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) {
}

func (m *noopMetrics) RecordRPCClientRequest(method string) func(err error) {
return func(err error) {
}
}
1 change: 1 addition & 0 deletions op-proposer/proposer/l2_output_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metr
if err != nil {
return nil, err
}
l1Client = opclient.NewInstrumentedClient(l1Client, m)

rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions op-service/client/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package client

import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"math/big"
"time"

"github.com/ethereum/go-ethereum/ethclient"
)

// DialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (*ethclient.Client, error) {
func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (EthClient, error) {
ctxt, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

Expand Down Expand Up @@ -66,5 +67,6 @@ type EthClient interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Close()
}
8 changes: 8 additions & 0 deletions op-service/client/fallback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ func (l *FallbackClient) CallContract(ctx context.Context, call ethereum.CallMsg
return contract, err
}

func (l *FallbackClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
block, err := (*l.currentClient.Load()).BlockByNumber(ctx, number)
if err != nil {
l.handleErr(err, "BlockByNumber")
}
return block, err
}

func (l *FallbackClient) Close() {
l.mx.Lock()
defer l.mx.Unlock()
Expand Down
Loading

0 comments on commit ffec6c8

Please sign in to comment.