diff --git a/pkg/capabilities/consensus/ocr3/aggregators/identical.go b/pkg/capabilities/consensus/ocr3/aggregators/identical.go new file mode 100644 index 000000000..a50448bc3 --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/aggregators/identical.go @@ -0,0 +1,125 @@ +package aggregators + +import ( + "crypto/sha256" + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/values" + + ocrcommon "github.com/smartcontractkit/libocr/commontypes" +) + +type identicalAggregator struct { + config aggregatorConfig + lggr logger.Logger +} + +type aggregatorConfig struct { + // Length of the list of observations that each node is expected to provide. + // Aggregator's output (i.e. EncodableOutcome) will be a values.Map with the same + // number of elements and keyed by indices 0,1,2,... (unless KeyOverrides are provided). + // Defaults to 1. + ExpectedObservationsLen int + // If non-empty, the keys in the outcome map will be replaced with these values. + // If non-empty, must be of length ExpectedObservationsLen. + KeyOverrides []string +} + +type counter struct { + fullObservation values.Value + count int +} + +var _ types.Aggregator = (*identicalAggregator)(nil) + +func (a *identicalAggregator) Aggregate(_ *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) { + counters := []map[[32]byte]*counter{} + for i := 0; i < a.config.ExpectedObservationsLen; i++ { + counters = append(counters, map[[32]byte]*counter{}) + } + for nodeID, nodeObservations := range observations { + if len(nodeObservations) == 0 || nodeObservations[0] == nil { + a.lggr.Warnf("node %d contributed with empty observations", nodeID) + continue + } + if len(nodeObservations) != a.config.ExpectedObservationsLen { + a.lggr.Warnf("node %d contributed with an incorrect number of observations %d - ignoring them", nodeID, len(nodeObservations)) + continue + } + for idx, observation := range nodeObservations { + marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(observation)) + if err != nil { + return nil, err + } + sha := sha256.Sum256(marshalled) + elem, ok := counters[idx][sha] + if !ok { + counters[idx][sha] = &counter{ + fullObservation: observation, + count: 1, + } + } else { + elem.count++ + } + } + } + return a.collectHighestCounts(counters, f) +} + +func (a *identicalAggregator) collectHighestCounts(counters []map[[32]byte]*counter, f int) (*types.AggregationOutcome, error) { + useOverrides := len(a.config.KeyOverrides) == len(counters) + outcome := make(map[string]any) + for idx, shaToCounter := range counters { + highestCount := 0 + var highestObservation values.Value + for _, counter := range shaToCounter { + if counter.count > highestCount { + highestCount = counter.count + highestObservation = counter.fullObservation + } + } + if highestCount < 2*f+1 { + return nil, fmt.Errorf("can't reach consensus on observations with index %d", idx) + } + if useOverrides { + outcome[a.config.KeyOverrides[idx]] = highestObservation + } else { + outcome[fmt.Sprintf("%d", idx)] = highestObservation + } + } + valMap, err := values.NewMap(outcome) + if err != nil { + return nil, err + } + return &types.AggregationOutcome{ + EncodableOutcome: values.ProtoMap(valMap), + Metadata: nil, + ShouldReport: true, + }, nil +} + +func NewIdenticalAggregator(config values.Map, lggr logger.Logger) (*identicalAggregator, error) { + parsedConfig, err := ParseConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to parse config (%+v): %w", config, err) + } + return &identicalAggregator{ + config: parsedConfig, + lggr: lggr, + }, nil +} + +func ParseConfig(config values.Map) (aggregatorConfig, error) { + parsedConfig := aggregatorConfig{} + if err := config.UnwrapTo(&parsedConfig); err != nil { + return aggregatorConfig{}, err + } + if parsedConfig.ExpectedObservationsLen == 0 { + parsedConfig.ExpectedObservationsLen = 1 + } + return parsedConfig, nil +} diff --git a/pkg/capabilities/consensus/ocr3/aggregators/identical_test.go b/pkg/capabilities/consensus/ocr3/aggregators/identical_test.go new file mode 100644 index 000000000..aaa146b44 --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/aggregators/identical_test.go @@ -0,0 +1,93 @@ +package aggregators_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/commontypes" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/aggregators" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +func TestDataFeedsAggregator_Aggregate(t *testing.T) { + config := getConfig(t, nil) + agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop()) + require.NoError(t, err) + + observations := map[commontypes.OracleID][]values.Value{ + 0: {values.NewString("a")}, + 1: {values.NewString("a")}, + 2: {values.NewString("a")}, + 3: {values.NewString("a")}, + } + outcome, err := agg.Aggregate(nil, observations, 1) + require.NoError(t, err) + require.True(t, outcome.ShouldReport) + require.Equal(t, "", outcome.EncoderName) + require.Nil(t, outcome.EncoderConfig) + + m, err := values.FromMapValueProto(outcome.EncodableOutcome) + require.NoError(t, err) + + require.Len(t, m.Underlying, 1) + require.Equal(t, m.Underlying["0"], values.NewString("a")) +} + +func TestDataFeedsAggregator_Aggregate_OverrideWithKeys(t *testing.T) { + config := getConfig(t, []string{"outcome"}) + agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop()) + require.NoError(t, err) + + observations := map[commontypes.OracleID][]values.Value{ + 0: {values.NewString("a")}, + 1: {values.NewString("a")}, + 2: {values.NewString("a")}, + 3: {values.NewString("a")}, + } + outcome, err := agg.Aggregate(nil, observations, 1) + require.NoError(t, err) + require.True(t, outcome.ShouldReport) + require.Equal(t, "", outcome.EncoderName) + require.Nil(t, outcome.EncoderConfig) + + m, err := values.FromMapValueProto(outcome.EncodableOutcome) + require.NoError(t, err) + + require.Len(t, m.Underlying, 1) + require.Equal(t, m.Underlying["outcome"], values.NewString("a")) +} + +func TestDataFeedsAggregator_Aggregate_NoConsensus(t *testing.T) { + config := getConfig(t, []string{"outcome"}) + agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop()) + require.NoError(t, err) + + encoderStr := "evm" + encoderName := values.NewString(encoderStr) + encoderCfg, err := values.NewMap(map[string]any{"foo": "bar"}) + require.NoError(t, err) + + observations := map[commontypes.OracleID][]values.Value{ + 0: {values.NewString("a"), encoderName, encoderCfg}, + 1: {values.NewString("b"), encoderName, encoderCfg}, + 2: {values.NewString("b"), encoderName, encoderCfg}, + 3: {values.NewString("a"), encoderName, encoderCfg}, + } + outcome, err := agg.Aggregate(nil, observations, 1) + require.Nil(t, outcome) + require.ErrorContains(t, err, "can't reach consensus on observations with index 0") +} + +func getConfig(t *testing.T, overrideKeys []string) *values.Map { + unwrappedConfig := map[string]any{ + "expectedObservationsLen": len(overrideKeys), + "keyOverrides": overrideKeys, + } + + config, err := values.NewMap(unwrappedConfig) + require.NoError(t, err) + return config +} diff --git a/pkg/capabilities/consensus/ocr3/capability.go b/pkg/capabilities/consensus/ocr3/capability.go index 3255fb19c..6c4c8f1a8 100644 --- a/pkg/capabilities/consensus/ocr3/capability.go +++ b/pkg/capabilities/consensus/ocr3/capability.go @@ -282,6 +282,8 @@ func (o *capability) queueRequestForProcessing( WorkflowDonID: metadata.WorkflowDonID, WorkflowDonConfigVersion: metadata.WorkflowDonConfigVersion, Observations: i.Observations, + OverriddenEncoderName: i.EncoderName, + OverriddenEncoderConfig: i.EncoderConfig, KeyID: c.KeyID, ExpiresAt: o.clock.Now().Add(requestTimeout), } diff --git a/pkg/capabilities/consensus/ocr3/models.go b/pkg/capabilities/consensus/ocr3/models.go index e03a68a72..86662dc31 100644 --- a/pkg/capabilities/consensus/ocr3/models.go +++ b/pkg/capabilities/consensus/ocr3/models.go @@ -16,5 +16,7 @@ type config struct { } type inputs struct { - Observations *values.List `json:"observations"` + Observations *values.List `json:"observations" jsonschema:""` + EncoderName string `mapstructure:"encoder" json:"encoder,omitempty"` + EncoderConfig *values.Map `mapstructure:"encoder_config" json:"encoder_config,omitempty"` } diff --git a/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go b/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go index dfa7ffe48..12913126c 100644 --- a/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go +++ b/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go @@ -32,17 +32,17 @@ func (c IdenticalConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string, } type IdenticalConsensusInput[T any] struct { - Observations sdk.CapDefinition[T] -} - -type IdenticalConsensusMergedInput[T any] struct { - Observations []T + Observation sdk.CapDefinition[T] + Encoder Encoder + EncoderConfig EncoderConfig } func (input IdenticalConsensusInput[T]) ToSteps() sdk.StepInputs { return sdk.StepInputs{ Mapping: map[string]any{ - "observations": input.Observations.Ref(), + "observations": sdk.ListOf(input.Observation).Ref(), + "encoder": input.Encoder, + "encoderConfig": input.EncoderConfig, }, } } diff --git a/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus_test.go b/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus_test.go index 89dc0df73..2affbeab7 100644 --- a/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus_test.go +++ b/pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus_test.go @@ -26,7 +26,11 @@ func TestIdenticalConsensus(t *testing.T) { Encoder: ocr3.EncoderEVM, EncoderConfig: ocr3.EncoderConfig{}, ReportID: "0001", - }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{Observations: trigger}) + }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{ + Observation: trigger, + Encoder: "evm", + EncoderConfig: ocr3.EncoderConfig(map[string]any{"foo": "bar"}), + }) chainwriter.TargetConfig{ Address: "0x1235", @@ -55,9 +59,13 @@ func TestIdenticalConsensus(t *testing.T) { Actions: []sdk.StepDefinition{}, Consensus: []sdk.StepDefinition{ { - ID: "offchain_reporting@1.0.0", - Ref: "consensus", - Inputs: sdk.StepInputs{Mapping: map[string]any{"observations": "$(trigger.outputs)"}}, + ID: "offchain_reporting@1.0.0", + Ref: "consensus", + Inputs: sdk.StepInputs{Mapping: map[string]any{ + "observations": []any{"$(trigger.outputs)"}, + "encoder": "evm", + "encoderConfig": map[string]any{"foo": "bar"}, + }}, Config: map[string]any{ "encoder": "EVM", "encoder_config": map[string]any{}, diff --git a/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/identical_consensus.go b/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/identical_consensus.go index 2c89e1bad..b5a3420c6 100644 --- a/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/identical_consensus.go +++ b/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/identical_consensus.go @@ -1,10 +1,9 @@ package ocr3captest import ( - "errors" - "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink-common/pkg/values/pb" @@ -32,12 +31,6 @@ func IdenticalConsensusForStep[T any](runner *testutils.Runner, step string) *Id } func identicalConsensus[T any](inputs ConsensusInput[T]) (ocr3cap.SignedReport, error) { - if len(inputs.Observations) == 0 { - return ocr3cap.SignedReport{}, errors.New("no observations were made") - } else if len(inputs.Observations) > 1 { - return ocr3cap.SignedReport{}, errors.New("more than one observation was made, but this mock isn't set up to support that") - } - wrapped, err := values.Wrap(inputs.Observations[0]) if err != nil { return ocr3cap.SignedReport{}, err @@ -63,16 +56,7 @@ type IdenticalConsensusMock[T any] struct { *testutils.Mock[ConsensusInput[T], ocr3cap.SignedReport] } -var _ testutils.ConsensusMock = &IdenticalConsensusMock[struct{}]{} - -func (c *IdenticalConsensusMock[T]) SingleToManyObservations(input values.Value) (*values.Map, error) { - tmp := singleConsensusInput[T]{} - if err := input.UnwrapTo(&tmp); err != nil { - return nil, err - } - - return values.CreateMapFromStruct(ConsensusInput[T]{Observations: []T{tmp.Observations}}) -} +var _ capabilities.ConsensusCapability = &IdenticalConsensusMock[struct{}]{} func (c *IdenticalConsensusMock[T]) GetStepDecoded(ref string) testutils.StepResults[ConsensusInput[T], T] { step := c.GetStep(ref) diff --git a/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/observations.go b/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/observations.go index a106b5a44..a136d9cd9 100644 --- a/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/observations.go +++ b/pkg/capabilities/consensus/ocr3/ocr3cap/ocr3captest/observations.go @@ -3,7 +3,3 @@ package ocr3captest type ConsensusInput[T any] struct { Observations []T } - -type singleConsensusInput[T any] struct { - Observations T -} diff --git a/pkg/capabilities/consensus/ocr3/reporting_plugin.go b/pkg/capabilities/consensus/ocr3/reporting_plugin.go index 60b900fad..6010575de 100644 --- a/pkg/capabilities/consensus/ocr3/reporting_plugin.go +++ b/pkg/capabilities/consensus/ocr3/reporting_plugin.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "slices" "time" @@ -22,6 +23,7 @@ import ( pbtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-common/pkg/values/pb" ) var _ ocr3types.ReportingPlugin[[]byte] = (*reportingPlugin)(nil) @@ -126,6 +128,13 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc lggr.Errorw("observations are not a list") continue } + + var cfgProto *pb.Map + if rq.OverriddenEncoderConfig != nil { + cp := values.Proto(rq.OverriddenEncoderConfig).GetMapValue() + cfgProto = cp + } + newOb := &pbtypes.Observation{ Observations: listProto, Id: &pbtypes.Id{ @@ -138,6 +147,8 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc ReportId: rq.ReportID, KeyId: rq.KeyID, }, + OverriddenEncoderName: rq.OverriddenEncoderName, + OverriddenEncoderConfig: cfgProto, } obs.Observations = append(obs.Observations, newOb) @@ -158,12 +169,39 @@ func (r *reportingPlugin) ObservationQuorum(ctx context.Context, outctx ocr3type return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, r.config.N, r.config.F, aos), nil } +func shaForOverriddenEncoder(obs *pbtypes.Observation) (string, error) { + hash := sha256.New() + _, err := hash.Write([]byte(obs.OverriddenEncoderName)) + if err != nil { + return "", fmt.Errorf("could not write encoder name to hash: %w", err) + } + + marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(obs.OverriddenEncoderConfig) + if err != nil { + return "", fmt.Errorf("could not marshal overridden encoder: %w", err) + } + + _, err = hash.Write(marshalled) + if err != nil { + return "", fmt.Errorf("could not write encoder config to hash: %w", err) + } + + return string(hash.Sum([]byte{})), nil +} + +type encoderConfig struct { + name string + config *pb.Map +} + func (r *reportingPlugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query, attributedObservations []types.AttributedObservation) (ocr3types.Outcome, error) { // execution ID -> oracle ID -> list of observations execIDToOracleObservations := map[string]map[ocrcommon.OracleID][]values.Value{} seenWorkflowIDs := map[string]int{} var sortedTimestamps []*timestamppb.Timestamp var finalTimestamp *timestamppb.Timestamp + execIDToEncoderShaToCount := map[string]map[string]int{} + shaToEncoder := map[string]encoderConfig{} for _, attributedObservation := range attributedObservations { obs := &pbtypes.Observations{} err := proto.Unmarshal(attributedObservation.Observation, obs) @@ -210,6 +248,21 @@ func (r *reportingPlugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeC execIDToOracleObservations[weid] = make(map[ocrcommon.OracleID][]values.Value) } execIDToOracleObservations[weid][attributedObservation.Observer] = obsList.Underlying + + sha, err := shaForOverriddenEncoder(request) + if err != nil { + r.lggr.Errorw("could not calculate sha for overridden encoder", "error", err, "observation", obs) + continue + } + + shaToEncoder[sha] = encoderConfig{ + name: request.OverriddenEncoderName, + config: request.OverriddenEncoderConfig, + } + if _, ok := execIDToEncoderShaToCount[weid]; !ok { + execIDToEncoderShaToCount[weid] = map[string]int{} + } + execIDToEncoderShaToCount[weid][sha]++ } } @@ -298,6 +351,35 @@ func (r *reportingPlugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeC outcome.Timestamp = finalTimestamp + shaToCount, ok := execIDToEncoderShaToCount[weid.WorkflowExecutionId] + if !ok { + lggr.Debugw("could not find any encoder shas matching weid requested in the query") + continue + } + + // Note: no need to check the observation count here, + // we've checked this above when we checked the observations count. + var encCfg *encoderConfig + for sha, count := range shaToCount { + if count >= 2*r.config.F+1 { + encoderCfg, ok := shaToEncoder[sha] + if !ok { + lggr.Debugw("could not find encoder matching sha") + continue + } + + lggr.Debugw("consensus reached on overridden encoder", "encoderName", encoderCfg.name) + encCfg = &encoderCfg + break + } + } + + if encCfg != nil { + lggr.Debugw("overridden encoder set", "name", encCfg.name, "cfg", encCfg.config) + outcome.EncoderName = encCfg.name + outcome.EncoderConfig = encCfg.config + } + report := &pbtypes.Report{ Outcome: outcome, Id: weid, diff --git a/pkg/capabilities/consensus/ocr3/reporting_plugin_test.go b/pkg/capabilities/consensus/ocr3/reporting_plugin_test.go index 04be58609..9b54d61db 100644 --- a/pkg/capabilities/consensus/ocr3/reporting_plugin_test.go +++ b/pkg/capabilities/consensus/ocr3/reporting_plugin_test.go @@ -784,11 +784,11 @@ func TestReportPlugin_Outcome_ShouldReturnMedianTimestamp(t *testing.T) { }, { Observation: rawObs2, - Observer: commontypes.OracleID(1), + Observer: commontypes.OracleID(2), }, { Observation: rawObs3, - Observer: commontypes.OracleID(1), + Observer: commontypes.OracleID(3), }, } @@ -800,3 +800,160 @@ func TestReportPlugin_Outcome_ShouldReturnMedianTimestamp(t *testing.T) { assert.Equal(t, timestamppb.New(time2), opb1.Outcomes[workflowTestID].Timestamp) } + +func TestReportPlugin_Outcome_ShouldReturnOverriddenEncoder(t *testing.T) { + lggr := logger.Test(t) + s := requests.NewStore() + mcap := &mockCapability{ + aggregator: &aggregator{}, + encoder: &enc{}, + registeredWorkflows: map[string]bool{ + workflowTestID: true, + workflowTestID2: true, + }, + } + rp, err := newReportingPlugin(s, mcap, defaultBatchSize, ocr3types.ReportingPluginConfig{F: 1}, defaultOutcomePruningThreshold, lggr) + require.NoError(t, err) + + wowner := uuid.New().String() + id := &pbtypes.Id{ + WorkflowExecutionId: uuid.New().String(), + WorkflowId: workflowTestID, + WorkflowOwner: wowner, + WorkflowName: workflowTestName, + ReportId: reportTestID, + } + id2 := &pbtypes.Id{ + WorkflowExecutionId: uuid.New().String(), + WorkflowId: workflowTestID2, + WorkflowOwner: wowner, + WorkflowName: workflowTestName, + ReportId: reportTestID, + } + id3 := &pbtypes.Id{ + WorkflowExecutionId: uuid.New().String(), + WorkflowId: workflowTestID3, + WorkflowOwner: wowner, + WorkflowName: workflowTestName, + ReportId: reportTestID, + } + q := &pbtypes.Query{ + Ids: []*pbtypes.Id{id, id2, id3}, + } + qb, err := proto.Marshal(q) + require.NoError(t, err) + o, err := values.NewList([]any{"hello"}) + require.NoError(t, err) + time1 := time.Now().Add(time.Second * 1) + time2 := time.Now().Add(time.Second * 2) + time3 := time.Now().Add(time.Second * 3) + m, err := values.NewMap(map[string]any{"foo": "bar"}) + require.NoError(t, err) + mc := values.ProtoMap(m) + obs := &pbtypes.Observations{ + Observations: []*pbtypes.Observation{ + { + Id: id, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "evm", + OverriddenEncoderConfig: mc, + }, + { + Id: id2, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "evm", + OverriddenEncoderConfig: mc, + }, + { + Id: id3, + Observations: values.Proto(o).GetListValue(), + }, + }, + RegisteredWorkflowIds: []string{workflowTestID, workflowTestID2}, + Timestamp: timestamppb.New(time1), + } + obs2 := &pbtypes.Observations{ + Observations: []*pbtypes.Observation{ + { + Id: id, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "evm", + OverriddenEncoderConfig: mc, + }, + { + Id: id2, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "evm", + OverriddenEncoderConfig: mc, + }, + { + Id: id3, + Observations: values.Proto(o).GetListValue(), + }, + }, + RegisteredWorkflowIds: []string{workflowTestID}, + Timestamp: timestamppb.New(time2), + } + obs3 := &pbtypes.Observations{ + Observations: []*pbtypes.Observation{ + { + Id: id, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "evm", + OverriddenEncoderConfig: mc, + }, + { + Id: id2, + Observations: values.Proto(o).GetListValue(), + OverriddenEncoderName: "solana", + OverriddenEncoderConfig: mc, + }, + { + Id: id3, + Observations: values.Proto(o).GetListValue(), + }, + }, + RegisteredWorkflowIds: []string{workflowTestID}, + Timestamp: timestamppb.New(time3), + } + + rawObs, err := proto.Marshal(obs) + require.NoError(t, err) + rawObs2, err := proto.Marshal(obs2) + require.NoError(t, err) + rawObs3, err := proto.Marshal(obs3) + require.NoError(t, err) + aos := []types.AttributedObservation{ + { + Observation: rawObs, + Observer: commontypes.OracleID(1), + }, + { + Observation: rawObs2, + Observer: commontypes.OracleID(2), + }, + { + Observation: rawObs3, + Observer: commontypes.OracleID(3), + }, + } + + outcome, err := rp.Outcome(tests.Context(t), ocr3types.OutcomeContext{SeqNr: 100}, qb, aos) + require.NoError(t, err) + opb1 := &pbtypes.Outcome{} + err = proto.Unmarshal(outcome, opb1) + require.NoError(t, err) + + assert.Equal(t, opb1.Outcomes[workflowTestID].EncoderName, "evm") + ec, err := values.FromMapValueProto(opb1.Outcomes[workflowTestID].EncoderConfig) + require.NoError(t, err) + assert.Equal(t, ec, m) + + // No consensus on outcome 2 + assert.Equal(t, opb1.Outcomes[workflowTestID2].EncoderName, "") + assert.Nil(t, opb1.Outcomes[workflowTestID2].EncoderConfig) + + // Outcome 3 doesn't set the encoder + assert.Equal(t, opb1.Outcomes[workflowTestID3].EncoderName, "") + assert.Nil(t, opb1.Outcomes[workflowTestID3].EncoderConfig) +} diff --git a/pkg/capabilities/consensus/ocr3/requests/request.go b/pkg/capabilities/consensus/ocr3/requests/request.go index dc99d25e7..4d427a038 100644 --- a/pkg/capabilities/consensus/ocr3/requests/request.go +++ b/pkg/capabilities/consensus/ocr3/requests/request.go @@ -8,8 +8,10 @@ import ( ) type Request struct { - Observations *values.List `mapstructure:"-"` - ExpiresAt time.Time + Observations *values.List `mapstructure:"-"` + OverriddenEncoderName string + OverriddenEncoderConfig *values.Map + ExpiresAt time.Time // CallbackCh is a channel to send a response back to the requester // after the request has been processed or timed out. @@ -29,9 +31,11 @@ type Request struct { func (r *Request) Copy() *Request { return &Request{ - Observations: r.Observations.CopyList(), + Observations: r.Observations.CopyList(), + OverriddenEncoderConfig: r.OverriddenEncoderConfig.CopyMap(), // No need to copy these, they're value types. + OverriddenEncoderName: r.OverriddenEncoderName, ExpiresAt: r.ExpiresAt, WorkflowExecutionID: r.WorkflowExecutionID, WorkflowID: r.WorkflowID, diff --git a/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json b/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json index 843dcc58c..ebdabb38d 100644 --- a/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json +++ b/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json @@ -74,6 +74,21 @@ "required": [ "Underlying" ] + }, + "encoder": { + "type": "string" + }, + "encoder_config": { + "properties": { + "Underlying": { + "type": "object" + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "Underlying" + ] } }, "additionalProperties": false, diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go b/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go index 27ffb7ed4..05ad85bf7 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go @@ -276,7 +276,9 @@ type Observation struct { Id *Id `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // list of observations defined in inputs.observations - Observations *pb.List `protobuf:"bytes,4,opt,name=observations,proto3" json:"observations,omitempty"` + Observations *pb.List `protobuf:"bytes,4,opt,name=observations,proto3" json:"observations,omitempty"` + OverriddenEncoderName string `protobuf:"bytes,5,opt,name=overriddenEncoderName,proto3" json:"overriddenEncoderName,omitempty"` + OverriddenEncoderConfig *pb.Map `protobuf:"bytes,6,opt,name=overriddenEncoderConfig,proto3" json:"overriddenEncoderConfig,omitempty"` } func (x *Observation) Reset() { @@ -325,6 +327,20 @@ func (x *Observation) GetObservations() *pb.List { return nil } +func (x *Observation) GetOverriddenEncoderName() string { + if x != nil { + return x.OverriddenEncoderName + } + return "" +} + +func (x *Observation) GetOverriddenEncoderConfig() *pb.Map { + if x != nil { + return x.OverriddenEncoderConfig + } + return nil +} + type Observations struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -609,53 +625,61 @@ var file_capabilities_consensus_ocr3_types_ocr3_types_proto_rawDesc = []byte{ 0x66, 0x6c, 0x6f, 0x77, 0x44, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x14, 0x0a, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6b, 0x65, 0x79, 0x49, 0x64, 0x4a, 0x04, 0x08, 0x05, 0x10, 0x06, - 0x22, 0x5f, 0x0a, 0x0b, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x22, 0xdc, 0x01, 0x0a, 0x0b, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6f, + 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x64, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x30, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2e, + 0x4c, 0x69, 0x73, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x34, 0x0a, 0x15, 0x6f, 0x76, 0x65, 0x72, 0x72, 0x69, 0x64, 0x64, 0x65, 0x6e, + 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x15, 0x6f, 0x76, 0x65, 0x72, 0x72, 0x69, 0x64, 0x64, 0x65, 0x6e, 0x45, 0x6e, 0x63, + 0x6f, 0x64, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x45, 0x0a, 0x17, 0x6f, 0x76, 0x65, 0x72, + 0x72, 0x69, 0x64, 0x64, 0x65, 0x6e, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x73, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x17, 0x6f, 0x76, 0x65, 0x72, 0x72, 0x69, 0x64, 0x64, + 0x65, 0x6e, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, + 0xbb, 0x01, 0x0a, 0x0c, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x3b, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x73, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x34, 0x0a, + 0x15, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x15, 0x72, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, + 0x49, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x62, 0x0a, + 0x06, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x1e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, + 0x2e, 0x49, 0x64, 0x52, 0x02, 0x69, 0x64, 0x12, 0x38, 0x0a, 0x07, 0x6f, 0x75, 0x74, 0x63, 0x6f, + 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x52, 0x07, 0x6f, 0x75, 0x74, 0x63, 0x6f, 0x6d, + 0x65, 0x22, 0x51, 0x0a, 0x0a, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x64, 0x52, 0x02, 0x69, 0x64, 0x12, - 0x30, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2e, 0x4c, - 0x69, 0x73, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x22, 0xbb, 0x01, 0x0a, 0x0c, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x12, 0x3b, 0x0a, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, - 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x0c, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, - 0x34, 0x0a, 0x15, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x15, - 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, - 0x6f, 0x77, 0x49, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, - 0x62, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x1e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, - 0x65, 0x73, 0x2e, 0x49, 0x64, 0x52, 0x02, 0x69, 0x64, 0x12, 0x38, 0x0a, 0x07, 0x6f, 0x75, 0x74, - 0x63, 0x6f, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x6f, 0x63, 0x72, - 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x52, 0x07, 0x6f, 0x75, 0x74, 0x63, - 0x6f, 0x6d, 0x65, 0x22, 0x51, 0x0a, 0x0a, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, - 0x6f, 0x12, 0x1e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, - 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x64, 0x52, 0x02, 0x69, - 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x72, 0x65, 0x70, 0x6f, - 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, - 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x22, 0xe2, 0x01, 0x0a, 0x07, 0x4f, 0x75, 0x74, 0x63, 0x6f, - 0x6d, 0x65, 0x12, 0x3d, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, 0x18, 0x01, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, - 0x73, 0x2e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x2e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, - 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, - 0x73, 0x12, 0x3b, 0x0a, 0x0f, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x70, - 0x6f, 0x72, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6f, 0x63, 0x72, - 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x0e, - 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x73, 0x1a, 0x5b, - 0x0a, 0x0d, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, - 0x79, 0x12, 0x34, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1e, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x41, 0x67, - 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, - 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x63, - 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x73, - 0x65, 0x6e, 0x73, 0x75, 0x73, 0x2f, 0x6f, 0x63, 0x72, 0x33, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x23, 0x0a, 0x0d, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x52, 0x65, + 0x70, 0x6f, 0x72, 0x74, 0x22, 0xe2, 0x01, 0x0a, 0x07, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, + 0x12, 0x3d, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, + 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x2e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, 0x12, + 0x3b, 0x0a, 0x0f, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, + 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x0e, 0x63, 0x75, + 0x72, 0x72, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x73, 0x1a, 0x5b, 0x0a, 0x0d, + 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x34, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, + 0x2e, 0x6f, 0x63, 0x72, 0x33, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x41, 0x67, 0x67, 0x72, + 0x65, 0x67, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x63, 0x61, 0x70, + 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x65, 0x6e, + 0x73, 0x75, 0x73, 0x2f, 0x6f, 0x63, 0x72, 0x33, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -692,19 +716,20 @@ var file_capabilities_consensus_ocr3_types_ocr3_types_proto_depIdxs = []int32{ 2, // 3: ocr3_types.Query.ids:type_name -> ocr3_types.Id 2, // 4: ocr3_types.Observation.id:type_name -> ocr3_types.Id 11, // 5: ocr3_types.Observation.observations:type_name -> values.List - 3, // 6: ocr3_types.Observations.observations:type_name -> ocr3_types.Observation - 10, // 7: ocr3_types.Observations.timestamp:type_name -> google.protobuf.Timestamp - 2, // 8: ocr3_types.Report.id:type_name -> ocr3_types.Id - 0, // 9: ocr3_types.Report.outcome:type_name -> ocr3_types.AggregationOutcome - 2, // 10: ocr3_types.ReportInfo.id:type_name -> ocr3_types.Id - 8, // 11: ocr3_types.Outcome.outcomes:type_name -> ocr3_types.Outcome.OutcomesEntry - 5, // 12: ocr3_types.Outcome.current_reports:type_name -> ocr3_types.Report - 0, // 13: ocr3_types.Outcome.OutcomesEntry.value:type_name -> ocr3_types.AggregationOutcome - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 9, // 6: ocr3_types.Observation.overriddenEncoderConfig:type_name -> values.Map + 3, // 7: ocr3_types.Observations.observations:type_name -> ocr3_types.Observation + 10, // 8: ocr3_types.Observations.timestamp:type_name -> google.protobuf.Timestamp + 2, // 9: ocr3_types.Report.id:type_name -> ocr3_types.Id + 0, // 10: ocr3_types.Report.outcome:type_name -> ocr3_types.AggregationOutcome + 2, // 11: ocr3_types.ReportInfo.id:type_name -> ocr3_types.Id + 8, // 12: ocr3_types.Outcome.outcomes:type_name -> ocr3_types.Outcome.OutcomesEntry + 5, // 13: ocr3_types.Outcome.current_reports:type_name -> ocr3_types.Report + 0, // 14: ocr3_types.Outcome.OutcomesEntry.value:type_name -> ocr3_types.AggregationOutcome + 15, // [15:15] is the sub-list for method output_type + 15, // [15:15] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_capabilities_consensus_ocr3_types_ocr3_types_proto_init() } diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto b/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto index 265b86552..4ce555895 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto @@ -39,6 +39,8 @@ message Observation { Id id = 1; // list of observations defined in inputs.observations values.List observations = 4; + string overriddenEncoderName = 5; + values.Map overriddenEncoderConfig = 6; } message Observations { diff --git a/pkg/workflows/sdk/testutils/runner.go b/pkg/workflows/sdk/testutils/runner.go index 125b666bb..aa3c7c35a 100644 --- a/pkg/workflows/sdk/testutils/runner.go +++ b/pkg/workflows/sdk/testutils/runner.go @@ -25,11 +25,6 @@ func NewRunner(ctx context.Context) *Runner { } } -type ConsensusMock interface { - capabilities.ConsensusCapability - SingleToManyObservations(value values.Value) (*values.Map, error) -} - type Runner struct { RawConfig []byte // Context is held in this runner because it's for testing and capability calls are made by it. @@ -173,12 +168,6 @@ func (r *Runner) walk(spec sdk.WorkflowSpec, ref string) error { return err } - if c, ok := mock.(ConsensusMock); ok { - if request.Inputs, err = c.SingleToManyObservations(request.Inputs); err != nil { - return err - } - } - results, err := mock.Execute(r.ctx, request) if err != nil { return err diff --git a/pkg/workflows/sdk/testutils/runner_test.go b/pkg/workflows/sdk/testutils/runner_test.go index 61f7a99a9..555e1f5c7 100644 --- a/pkg/workflows/sdk/testutils/runner_test.go +++ b/pkg/workflows/sdk/testutils/runner_test.go @@ -54,7 +54,7 @@ func TestRunner(t *testing.T) { assert.True(t, helper.transformTriggerCalled) consensus := consensusMock.GetStepDecoded("consensus") assert.Equal(t, "it was true", consensus.Output.AdaptedThing) - require.Len(t, consensus.Input.Observations, 1) + require.NotNil(t, consensus.Input.Observations[0]) rawConsensus := consensusMock.GetStep("consensus") target := targetMock.GetAllWrites() @@ -84,7 +84,7 @@ func TestRunner(t *testing.T) { consensus := ocr3.IdenticalConsensusConfig[basicaction.ActionOutputs]{ Encoder: "Test", EncoderConfig: ocr3.EncoderConfig{}, - }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basicaction.ActionOutputs]{Observations: action}) + }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basicaction.ActionOutputs]{Observation: action}) chainwriter.TargetConfig{ Address: "0x123", @@ -350,7 +350,7 @@ func createBasicTestWorkflow(actionTransform actionTransform) *sdk.WorkflowSpecF consensus := ocr3.IdenticalConsensusConfig[basicaction.ActionOutputs]{ Encoder: "Test", EncoderConfig: ocr3.EncoderConfig{}, - }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basicaction.ActionOutputs]{Observations: action}) + }.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basicaction.ActionOutputs]{Observation: action}) chainwriter.TargetConfig{ Address: "0x123",