Skip to content

Commit

Permalink
Merge branch 'develop' into BCFR-912-CR-DW-querying-index-override
Browse files Browse the repository at this point in the history
  • Loading branch information
ilija42 authored Oct 8, 2024
2 parents e4d9b90 + ad29b19 commit 0fa2db6
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-pillows-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Add prometheus metrics exposing health of telemetry client
5 changes: 5 additions & 0 deletions .changeset/wise-pandas-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Fix BHE PriceMax bug #bugfix
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/core/services/directrequest @smartcontractkit/foundations
/core/services/feeds @smartcontractkit/op-core @eutopian @yevshev
/core/services/synchronization/telem @smartcontractkit/realtime
/core/capabilities/ @smartcontractkit/keystone
/core/capabilities/ @smartcontractkit/keystone @smartcontractkit/capabilities-team
/core/capabilities/ccip @smartcontractkit/ccip-offchain

# To be deprecated in Chainlink V3
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/gas/block_history_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ func (b *BlockHistoryEstimator) GetDynamicFee(_ context.Context, maxGasPriceWei
"Using Evm.GasEstimator.TipCapDefault as fallback.", "blocks", b.getBlockHistoryNumbers())
tipCap = b.eConfig.TipCapDefault()
}
maxGasPrice := getMaxGasPrice(maxGasPriceWei, b.eConfig.PriceMax())
maxGasPrice := assets.WeiMin(maxGasPriceWei, b.eConfig.PriceMax())
tipCap = assets.WeiMin(tipCap, maxGasPrice)
if b.eConfig.BumpThreshold() == 0 {
// just use the max gas price if gas bumping is disabled
feeCap = maxGasPrice
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/gas/block_history_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2197,7 +2197,7 @@ func TestBlockHistoryEstimator_GetDynamicFee(t *testing.T) {
fee, err := bhe.GetDynamicFee(tests.Context(t), assets.NewWeiI(100))
require.NoError(t, err)

assert.Equal(t, gas.DynamicFee{FeeCap: assets.NewWeiI(100), TipCap: assets.NewWeiI(6000)}, fee)
assert.Equal(t, gas.DynamicFee{FeeCap: assets.NewWeiI(100), TipCap: assets.NewWeiI(100)}, fee)
})

h = testutils.Head(1)
Expand Down
33 changes: 33 additions & 0 deletions core/services/synchronization/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package synchronization

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
TelemetryClientConnectionStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "telemetry_client_connection_status",
Help: "Status of the connection to the telemetry ingress server",
}, []string{"endpoint"})

TelemetryClientMessagesSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_sent",
Help: "Number of telemetry messages sent to the telemetry ingress server",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientMessagesSendErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_send_errors",
Help: "Number of telemetry messages that failed to send to the telemetry ingress server",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientMessagesDropped = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_dropped",
Help: "Number of telemetry messages dropped",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientWorkers = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_workers",
Help: "Number of telemetry workers",
}, []string{"endpoint", "telemetry_type"})
)
37 changes: 36 additions & 1 deletion core/services/synchronization/telemetry_ingress_batch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/wsrpc"
"github.com/smartcontractkit/wsrpc/examples/simple/keys"
"google.golang.org/grpc/connectivity"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -57,9 +58,11 @@ type telemetryIngressBatchClient struct {
telemSendTimeout time.Duration

workers map[string]*telemetryIngressBatchWorker
workersMutex sync.Mutex
workersMutex sync.RWMutex

useUniConn bool

healthMonitorCancel context.CancelFunc
}

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
Expand Down Expand Up @@ -127,14 +130,43 @@ func (tc *telemetryIngressBatchClient) start(ctx context.Context) error {
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.closeFn = func() error { conn.Close(); return nil }
tc.startHealthMonitoring(ctx, conn)
}
}

return nil
}

// startHealthMonitoring starts a goroutine to monitor the connection state and update other relevant metrics every 5 seconds
func (tc *telemetryIngressBatchClient) startHealthMonitoring(ctx context.Context, conn *wsrpc.ClientConn) {
_, cancel := context.WithCancel(ctx)
tc.healthMonitorCancel = cancel

tc.eng.Go(func(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Check the connection state
connected := float64(0)
if conn.GetState() == connectivity.Ready {
connected = float64(1)
}
TelemetryClientConnectionStatus.WithLabelValues(tc.url.String()).Set(connected)
case <-ctx.Done():
return
}
}
})
}

// Close disconnects the wsrpc client from the ingress server and waits for all workers to exit
func (tc *telemetryIngressBatchClient) close() error {
if tc.healthMonitorCancel != nil {
tc.healthMonitorCancel()
}
if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn {
return tc.closeFn()
}
Expand Down Expand Up @@ -197,11 +229,14 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload)
payload.TelemType,
tc.eng,
tc.logging,
tc.url.String(),
)
tc.eng.GoTick(timeutil.NewTicker(func() time.Duration {
return tc.telemSendInterval
}), worker.Send)
tc.workers[workerKey] = worker

TelemetryClientWorkers.WithLabelValues(tc.url.String(), string(payload.TelemType)).Inc()
}

return worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type telemetryIngressBatchWorker struct {
logging bool
lggr logger.Logger
dropMessageCount atomic.Uint32

// endpointURL is used for reporting metrics
endpointURL string
}

// NewTelemetryIngressBatchWorker returns a worker for a given contractID that can send
Expand All @@ -38,6 +41,7 @@ func NewTelemetryIngressBatchWorker(
telemType TelemetryType,
lggr logger.Logger,
logging bool,
endpointURL string,
) *telemetryIngressBatchWorker {
return &telemetryIngressBatchWorker{
telemSendTimeout: telemSendTimeout,
Expand All @@ -48,6 +52,7 @@ func NewTelemetryIngressBatchWorker(
telemType: telemType,
logging: logging,
lggr: logger.Named(lggr, "TelemetryIngressBatchWorker"),
endpointURL: endpointURL,
}
}

Expand All @@ -65,8 +70,10 @@ func (tw *telemetryIngressBatchWorker) Send(ctx context.Context) {

if err != nil {
tw.lggr.Warnf("Could not send telemetry: %v", err)
TelemetryClientMessagesSendErrors.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()
return
}
TelemetryClientMessagesSent.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()
if tw.logging {
tw.lggr.Debugw("Successfully sent telemetry to ingress server", "contractID", telemBatchReq.ContractId, "telemType", telemBatchReq.TelemetryType, "telemetry", telemBatchReq.Telemetry)
}
Expand All @@ -86,6 +93,8 @@ func (tw *telemetryIngressBatchWorker) Send(ctx context.Context) {
// etc...
func (tw *telemetryIngressBatchWorker) logBufferFullWithExpBackoff(payload TelemPayload) {
count := tw.dropMessageCount.Add(1)
TelemetryClientMessagesDropped.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()

if count > 0 && (count%100 == 0 || count&(count-1) == 0) {
tw.lggr.Warnw("telemetry ingress client buffer full, dropping message", "telemetry", payload.Telemetry, "droppedCount", count)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) {
synchronization.OCR,
logger.TestLogger(t),
false,
"test-endpoint",
)

chTelemetry <- telemPayload
Expand Down

0 comments on commit 0fa2db6

Please sign in to comment.