Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

None blocking telemetry #8684

Merged
merged 11 commits into from
Mar 15, 2023
3 changes: 3 additions & 0 deletions core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
oracleCtx := job.NewServiceAdapter(oracle)
services = append(services, oracleCtx)

if !jb.OCROracleSpec.CaptureEATelemetry {
lggr.Infof("Enhanced EA telemetry is disabled for job %s", jb.Name.ValueOrZero())
}
// RunResultSaver needs to be started first so its available
// to read db writes. It is stopped last after the Oracle is shut down
// so no further runs are enqueued and we can drain the queue.
Expand Down
3 changes: 3 additions & 0 deletions core/services/ocr2/plugins/median/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func NewMedianServices(jb job.Job,
if err != nil {
return nil, err
}
if !jb.OCR2OracleSpec.CaptureEATelemetry {
lggr.Infof("Enhanced EA telemetry is disabled for job %s", jb.Name.ValueOrZero())
}
return []job.ServiceCtx{ocr2Provider, ocrcommon.NewResultRunSaver(
runResults,
pipelineRunner,
Expand Down
57 changes: 36 additions & 21 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,46 @@ func getJsonParsedValue(trr pipeline.TaskRunResult, trrs *pipeline.TaskRunResult
return nil
}

// getObservation checks pipeline.FinalResult and extracts the observation
func getObservation(ds *inMemoryDataSource, finalResult *pipeline.FinalResult) int64 {
singularResult, err := finalResult.SingularResult()
if err != nil {
ds.lggr.Warnf("cannot get singular result, job %d", ds.jb.ID)
return 0
}

finalResultDecimal, err := utils.ToDecimal(singularResult.Value)
if err != nil {
ds.lggr.Warnf("cannot parse singular result from bridge task, job %d", ds.jb.ID)
return 0
}

return finalResultDecimal.BigInt().Int64()
}

func getParsedValue(ds *inMemoryDataSource, trrs *pipeline.TaskRunResults, trr pipeline.TaskRunResult) int64 {
parsedValue := getJsonParsedValue(trr, trrs)
if parsedValue == nil {
ds.lggr.Warnf("cannot get json parse value, job %d, id %s", ds.jb.ID, trr.Task.DotID())
return 0
}
return parsedValue.Int64()
}

// collectEATelemetry checks if EA telemetry should be collected, gathers the information and sends it for ingestion
func collectEATelemetry(ds *inMemoryDataSource, trrs *pipeline.TaskRunResults, finalResult *pipeline.FinalResult) {
if !shouldCollectTelemetry(&ds.jb) {
if !shouldCollectTelemetry(&ds.jb) || ds.monitoringEndpoint == nil {
return
george-dorin marked this conversation as resolved.
Show resolved Hide resolved
}

go collectAndSend(ds, trrs, finalResult)
}

func collectAndSend(ds *inMemoryDataSource, trrs *pipeline.TaskRunResults, finalResult *pipeline.FinalResult) {
chainID := getChainID(&ds.jb)
contract := getContract(&ds.jb)

observation := int64(0)
if finalResult != nil {
singularResult, err := finalResult.SingularResult()
if err != nil {
ds.lggr.Warnf("cannot get singular result, job %d, id %d", ds.jb.ID)
}

finalResultDecimal, err := utils.ToDecimal(singularResult.Value)
if err != nil {
ds.lggr.Warnf("cannot parse singular result from bridge task, job %d", ds.jb.ID)
}
observation = finalResultDecimal.BigInt().Int64()
}
observation := getObservation(ds, finalResult)

for _, trr := range *trrs {
if trr.Task.Type() != pipeline.TaskTypeBridge {
Expand All @@ -118,18 +136,14 @@ func collectEATelemetry(ds *inMemoryDataSource, trrs *pipeline.TaskRunResults, f

bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
ds.lggr.Warnf("cannot get bridge response from bridge task, job %d, id %d", ds.jb.ID, trr.Task.DotID())
ds.lggr.Warnf("cannot get bridge response from bridge task, job %d, id %s", ds.jb.ID, trr.Task.DotID())
continue
}
eaTelemetry, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
ds.lggr.Warnf("cannot parse EA telemetry, job %d, id %d", ds.jb.ID, trr.Task.DotID())
}
parsedValue := getJsonParsedValue(trr, trrs)
if parsedValue == nil {
ds.lggr.Warnf("cannot get json parse value, job %d, id %d", ds.jb.ID, trr.Task.DotID())
ds.lggr.Warnf("cannot parse EA telemetry, job %d, id %s", ds.jb.ID, trr.Task.DotID())
}
value := parsedValue.Int64()
value := getParsedValue(ds, trrs, trr)

t := &telem.TelemEnhancedEA{
DataSource: eaTelemetry.DataSource,
Expand All @@ -145,6 +159,7 @@ func collectEATelemetry(ds *inMemoryDataSource, trrs *pipeline.TaskRunResults, f
Feed: contract,
ChainId: chainID,
Observation: observation,
ConfigDigest: "",
Round: 0,
Epoch: 0,
}
Expand Down
194 changes: 175 additions & 19 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package ocrcommon

import (
"math/big"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/job"
"github.com/smartcontractkit/chainlink/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/core/services/pipeline"
Expand All @@ -30,6 +33,57 @@ const bridgeResponse = `{
}
}`

var trrs = pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds1_parse", nil, nil, 1),
},
Result: pipeline.Result{
Value: "123456",
},
},
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds2", nil, nil, 0),
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds2_parse", nil, nil, 1),
},
Result: pipeline.Result{
Value: "12345678",
},
},
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds3", nil, nil, 0),
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds3_parse", nil, nil, 1),
},
Result: pipeline.Result{
Value: "1234567890",
},
},
}

func TestShouldCollectTelemetry(t *testing.T) {
j := job.Job{
OCROracleSpec: &job.OCROracleSpec{CaptureEATelemetry: true},
Expand Down Expand Up @@ -104,27 +158,9 @@ func TestParseEATelemetry(t *testing.T) {
}

func TestGetJsonParsedValue(t *testing.T) {
trrs := pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds1_parse", nil, nil, 1),
},
Result: pipeline.Result{
Value: "1234567890",
},
},
}

resp := getJsonParsedValue(trrs[0], &trrs)
assert.Equal(t, "1234567890", resp.String())
assert.Equal(t, "123456", resp.String())

trrs[1].Result.Value = nil
resp = getJsonParsedValue(trrs[0], &trrs)
Expand All @@ -136,13 +172,15 @@ func TestGetJsonParsedValue(t *testing.T) {
}

func TestSendEATelemetry(t *testing.T) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryIngressClient(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA)

var sentMessage []byte
ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) {
sentMessage = args[0].(synchronization.TelemPayload).Telemetry
wg.Done()
})

feedAddress := utils.RandomAddress()
Expand Down Expand Up @@ -183,6 +221,7 @@ func TestSendEATelemetry(t *testing.T) {
FatalErrors: []error{nil},
}

wg.Add(1)
collectEATelemetry(&ds, &trrs, &fr)

expectedTelemetry := telem.TelemEnhancedEA{
Expand All @@ -204,5 +243,122 @@ func TestSendEATelemetry(t *testing.T) {
}

expectedMessage, _ := proto.Marshal(&expectedTelemetry)
wg.Wait()
assert.Equal(t, expectedMessage, sentMessage)
}

func TestGetObservation(t *testing.T) {
lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel)
ds := &inMemoryDataSource{
jb: job.Job{
ID: 1234567890,
Type: job.Type(pipeline.OffchainReportingJobType),
OCROracleSpec: &job.OCROracleSpec{
CaptureEATelemetry: true,
},
},
lggr: lggr,
}

obs := getObservation(ds, &pipeline.FinalResult{})
assert.Equal(t, obs, int64(0))
assert.Equal(t, logs.Len(), 1)
assert.Contains(t, logs.All()[0].Message, "cannot get singular result")

finalResult := &pipeline.FinalResult{
Values: []interface{}{"123456"},
AllErrors: nil,
FatalErrors: []error{nil},
}
obs = getObservation(ds, finalResult)
assert.Equal(t, obs, int64(123456))
}

func TestCollectAndSend(t *testing.T) {
ingressClient := mocks.NewTelemetryIngressClient(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA)
ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return()

lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel)
ds := &inMemoryDataSource{
jb: job.Job{
ID: 1234567890,
Type: job.Type(pipeline.OffchainReportingJobType),
OCROracleSpec: &job.OCROracleSpec{
CaptureEATelemetry: true,
},
},
lggr: lggr,
monitoringEndpoint: monitoringEndpoint,
}

finalResult := &pipeline.FinalResult{
Values: []interface{}{"123456"},
AllErrors: nil,
FatalErrors: []error{nil},
}

badTrrs := &pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
},
Result: pipeline.Result{
Value: nil,
},
}}

collectAndSend(ds, badTrrs, finalResult)
assert.Contains(t, logs.All()[0].Message, "cannot get bridge response from bridge task")

badTrrs = &pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
},
Result: pipeline.Result{
Value: "[]",
},
}}
collectAndSend(ds, badTrrs, finalResult)
assert.Equal(t, logs.Len(), 3)
assert.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry")
assert.Contains(t, logs.All()[2].Message, "cannot get json parse value")
}

func BenchmarkCollectEATelemetry(b *testing.B) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryIngressClient(b)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA)

ingressClient.On("Send", mock.AnythingOfType("synchronization.TelemPayload")).Return().Run(func(args mock.Arguments) {
wg.Done()
})

ds := inMemoryDataSource{
jb: job.Job{
Type: job.Type(pipeline.OffchainReportingJobType),
OCROracleSpec: &job.OCROracleSpec{
ContractAddress: ethkey.EIP55AddressFromAddress(utils.RandomAddress()),
CaptureEATelemetry: true,
EVMChainID: (*utils.Big)(big.NewInt(9)),
},
},
lggr: nil,
monitoringEndpoint: monitoringEndpoint,
}
finalResult := pipeline.FinalResult{
Values: []interface{}{"123456"},
AllErrors: nil,
FatalErrors: []error{nil},
}
b.ResetTimer()

for n := 0; n < b.N; n++ {
wg.Add(1)
collectEATelemetry(&ds, &trrs, &finalResult)
}
wg.Wait()
}
Loading