Skip to content

Commit

Permalink
[CM-380] Identical Aggregator (#771)
Browse files Browse the repository at this point in the history
* [CM-380] Identical Aggregator

* [CAPPL-60] Dynamic encoder selection in OCR consensus aggregator

* extract encoder name and config

* Add more tests

* add limit to seq num range (#781)

* [chore] Handle aliases in slices (#784)

* [chore] Handle aliases in slices

* More aliasing tests

* Lint fix

* Fix test

---------

Co-authored-by: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com>

* feat(observability-lib): legendoptions + improvement on node general dashboard (#785)

* [CAPPL-58] Correctly stub out clock_time_get and poll_oneoff (#778)

* [CAPPL-58] Further cleanup

* [CAPPL-58] Add support for compression

* More alias handling in Unwrap functionality of Value  (#792)

* Generic case to handle both pointer type and raw type and simplify int unwrap

* Handling interface and default

* Small test fix

---------

Co-authored-by: Cedric Cordenier <cedric.cordenier@smartcontract.com>

* Fix alias typing and tests (#788)

* Fix alias typing and tests

* Fix ints

* errors.new instead of fmt

* Add array support to slice (#789)

* Replace fmt.Errorf with errors.New where possible (#795)

* chore(workflows): adds unit test to utils (#782)

* Have the mock runner register with capabilites (#783)

* Add binary + config to custom compute (#794)

* Add binary + config to custom compute

* Add binary + config to custom compute

* fix lint issues (#786)

* execution factory constructor updated to take two providers, chainIDs, and source token address (#641)

* execution factory constructor updated to take two providers and chain IDs

(cherry picked from commit 6ad1f08)

* Adding source token address to execution factory constructor

* Support passing in a values.Value to the chainreader GetLatestValue method (#779)

* add support for passing in a values.Value type to the contract readers GetLatestValue and QueryKey methods

---------

Co-authored-by: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com>
Co-authored-by: Cedric Cordenier <cedric.cordenier@smartcontract.com>

* [CAPPL-31] feat(values): adds support for time.Time as value (#787)

* feat(values): adds support for time.Time as value

* chore(deps): updates .tool-versions

* refactor(values): uses primitive type in protos

* feat(values): support float64 values (#799)

* confidence level from string (#802)

* Float32/Float64 wrapping (#804)

* feat: implement sdk logger (#762)

* Add MustEmbed Constraint to Contract Reader (#801)

Reintroducing the must embed constraint to `ContractReader` implementations to
ensure that all implementations of `ContractReader` embed the `UnimplementedContractReader`.

If an implementation contains the unemplemented struct, changes to the interface
will flow down to all implementations without introducing breaking changes.

* Updated TestStruct to enable advanced querying (#798)

* Updated TestStruct to enable advanced querying

* linting fixes

* Update pkg/codec/encodings/type_codec_test.go

Co-authored-by: Clement <clement.erena78@gmail.com>

* Update pkg/codec/encodings/type_codec_test.go

Co-authored-by: Clement <clement.erena78@gmail.com>

* Fixed codec tests

---------

Co-authored-by: Clement <clement.erena78@gmail.com>

* Properly support the range of uint64 and allow big int to unwrap into smaller integer types (#810)

* Extract expirable cache abstraction for reuse (#807)

* expirable_cache

* remove cache (#812)

* CCIP-3555 Attestation encoder interfaces (#813)

* Attestation encoder interfaces

* Attestation encoder interfaces

* Attestation encoder interfaces

* Comment

* [BCF-3392]  - ContractReaderByIDs Wrapper (#797)

* WIP

* Update ContractReaderByIDs interface method names

* Unexpose types.ContractReader in contractReaderByIDs

* Add multiple contract address support to fakeContractReader for tests

* Add GetLatestValue unit test for contractReaderByIDs

* Add GetLatestValue unit test for QueryKey

* Add BatchGetLatestValues unit test for CR by custom IdDs wrapper

* Rm ContractReaderByIDs interface and export the struct

* Change ContractReaderByIDs wrapper Unbind handling

* Improve ContractReaderByIDs wrapper err handling

* Remove mockery usage from ContractReaderByIDs tests

* lint

* pkg/types/ccipocr3: add DestExecData to RampTokenAmount (#817)

* pkg/types/ccipocr3: add DestExecData to RampTokenAmount

* fix test

* Allow the creation of maps from string to capbility outputs. (#815)

* Add the FeeValueJuels field to ccipocr3.Message (#819)

* feat(observability-lib): improve alerts rule (#803)

* feat(observability-lib): improve alerts rule

* chore(observability-lib): README + folder structure (#806)

* chore(observability-lib): README + folder structure

* feat(observability-lib): variable add current + includeAll options (#808)

* chore(README): small corrections

* chore(README): example improved

* chore(README): add references to dashboards examples

* feat(observability-lib): refactor exportable func + link to godoc

* fix(observability-lib): cmd errors returns

* enable errorf check (#826)

* Make overridding the encoder first-class

* Update mocks

* Mock updates

* Adjust tests

* Fix mock

* Fix mock

* Update mock

* Linting

---------

Co-authored-by: Cedric Cordenier <cedric.cordenier@smartcontract.com>
Co-authored-by: dimitris <dimitrios.kouveris@smartcontract.com>
Co-authored-by: Sri Kidambi <1702865+kidambisrinivas@users.noreply.github.com>
Co-authored-by: Clement <clement.erena78@gmail.com>
Co-authored-by: Ryan Tinianov <tinianov@live.com>
Co-authored-by: Street <5597260+MStreet3@users.noreply.github.com>
Co-authored-by: Jordan Krage <jmank88@gmail.com>
Co-authored-by: Patrick <patrick.huie@smartcontract.com>
Co-authored-by: Matthew Pendrey <matthew.pendrey@gmail.com>
Co-authored-by: Gabriel Paradiso <gaboparadiso@gmail.com>
Co-authored-by: Awbrey Hughlett <athughlett@gmail.com>
Co-authored-by: Silas Lenihan <32529249+silaslenihan@users.noreply.github.com>
Co-authored-by: Mateusz Sekara <mateusz.sekara@gmail.com>
Co-authored-by: ilija42 <57732589+ilija42@users.noreply.github.com>
Co-authored-by: Makram <makramkd@users.noreply.github.com>
Co-authored-by: Ryan Stout <rstout610@gmail.com>
  • Loading branch information
17 people authored Oct 10, 2024
1 parent 8166e65 commit 4637084
Show file tree
Hide file tree
Showing 16 changed files with 595 additions and 111 deletions.
125 changes: 125 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package aggregators

import (
"crypto/sha256"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"

ocrcommon "github.com/smartcontractkit/libocr/commontypes"
)

type identicalAggregator struct {
config aggregatorConfig
lggr logger.Logger
}

type aggregatorConfig struct {
// Length of the list of observations that each node is expected to provide.
// Aggregator's output (i.e. EncodableOutcome) will be a values.Map with the same
// number of elements and keyed by indices 0,1,2,... (unless KeyOverrides are provided).
// Defaults to 1.
ExpectedObservationsLen int
// If non-empty, the keys in the outcome map will be replaced with these values.
// If non-empty, must be of length ExpectedObservationsLen.
KeyOverrides []string
}

type counter struct {
fullObservation values.Value
count int
}

var _ types.Aggregator = (*identicalAggregator)(nil)

func (a *identicalAggregator) Aggregate(_ *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) {
counters := []map[[32]byte]*counter{}
for i := 0; i < a.config.ExpectedObservationsLen; i++ {
counters = append(counters, map[[32]byte]*counter{})
}
for nodeID, nodeObservations := range observations {
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
a.lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) != a.config.ExpectedObservationsLen {
a.lggr.Warnf("node %d contributed with an incorrect number of observations %d - ignoring them", nodeID, len(nodeObservations))
continue
}
for idx, observation := range nodeObservations {
marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(observation))
if err != nil {
return nil, err
}
sha := sha256.Sum256(marshalled)
elem, ok := counters[idx][sha]
if !ok {
counters[idx][sha] = &counter{
fullObservation: observation,
count: 1,
}
} else {
elem.count++
}
}
}
return a.collectHighestCounts(counters, f)
}

func (a *identicalAggregator) collectHighestCounts(counters []map[[32]byte]*counter, f int) (*types.AggregationOutcome, error) {
useOverrides := len(a.config.KeyOverrides) == len(counters)
outcome := make(map[string]any)
for idx, shaToCounter := range counters {
highestCount := 0
var highestObservation values.Value
for _, counter := range shaToCounter {
if counter.count > highestCount {
highestCount = counter.count
highestObservation = counter.fullObservation
}
}
if highestCount < 2*f+1 {
return nil, fmt.Errorf("can't reach consensus on observations with index %d", idx)
}
if useOverrides {
outcome[a.config.KeyOverrides[idx]] = highestObservation
} else {
outcome[fmt.Sprintf("%d", idx)] = highestObservation
}
}
valMap, err := values.NewMap(outcome)
if err != nil {
return nil, err
}
return &types.AggregationOutcome{
EncodableOutcome: values.ProtoMap(valMap),
Metadata: nil,
ShouldReport: true,
}, nil
}

func NewIdenticalAggregator(config values.Map, lggr logger.Logger) (*identicalAggregator, error) {
parsedConfig, err := ParseConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to parse config (%+v): %w", config, err)
}
return &identicalAggregator{
config: parsedConfig,
lggr: lggr,
}, nil
}

func ParseConfig(config values.Map) (aggregatorConfig, error) {
parsedConfig := aggregatorConfig{}
if err := config.UnwrapTo(&parsedConfig); err != nil {
return aggregatorConfig{}, err
}
if parsedConfig.ExpectedObservationsLen == 0 {
parsedConfig.ExpectedObservationsLen = 1
}
return parsedConfig, nil
}
93 changes: 93 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package aggregators_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/aggregators"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

func TestDataFeedsAggregator_Aggregate(t *testing.T) {
config := getConfig(t, nil)
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["0"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_OverrideWithKeys(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["outcome"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_NoConsensus(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

encoderStr := "evm"
encoderName := values.NewString(encoderStr)
encoderCfg, err := values.NewMap(map[string]any{"foo": "bar"})
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a"), encoderName, encoderCfg},
1: {values.NewString("b"), encoderName, encoderCfg},
2: {values.NewString("b"), encoderName, encoderCfg},
3: {values.NewString("a"), encoderName, encoderCfg},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.Nil(t, outcome)
require.ErrorContains(t, err, "can't reach consensus on observations with index 0")
}

func getConfig(t *testing.T, overrideKeys []string) *values.Map {
unwrappedConfig := map[string]any{
"expectedObservationsLen": len(overrideKeys),
"keyOverrides": overrideKeys,
}

config, err := values.NewMap(unwrappedConfig)
require.NoError(t, err)
return config
}
2 changes: 2 additions & 0 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func (o *capability) queueRequestForProcessing(
WorkflowDonID: metadata.WorkflowDonID,
WorkflowDonConfigVersion: metadata.WorkflowDonConfigVersion,
Observations: i.Observations,
OverriddenEncoderName: i.EncoderName,
OverriddenEncoderConfig: i.EncoderConfig,
KeyID: c.KeyID,
ExpiresAt: o.clock.Now().Add(requestTimeout),
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/capabilities/consensus/ocr3/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ type config struct {
}

type inputs struct {
Observations *values.List `json:"observations"`
Observations *values.List `json:"observations" jsonschema:""`
EncoderName string `mapstructure:"encoder" json:"encoder,omitempty"`
EncoderConfig *values.Map `mapstructure:"encoder_config" json:"encoder_config,omitempty"`
}
12 changes: 6 additions & 6 deletions pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ func (c IdenticalConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string,
}

type IdenticalConsensusInput[T any] struct {
Observations sdk.CapDefinition[T]
}

type IdenticalConsensusMergedInput[T any] struct {
Observations []T
Observation sdk.CapDefinition[T]
Encoder Encoder
EncoderConfig EncoderConfig
}

func (input IdenticalConsensusInput[T]) ToSteps() sdk.StepInputs {
return sdk.StepInputs{
Mapping: map[string]any{
"observations": input.Observations.Ref(),
"observations": sdk.ListOf(input.Observation).Ref(),
"encoder": input.Encoder,
"encoderConfig": input.EncoderConfig,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestIdenticalConsensus(t *testing.T) {
Encoder: ocr3.EncoderEVM,
EncoderConfig: ocr3.EncoderConfig{},
ReportID: "0001",
}.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{Observations: trigger})
}.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{
Observation: trigger,
Encoder: "evm",
EncoderConfig: ocr3.EncoderConfig(map[string]any{"foo": "bar"}),
})

chainwriter.TargetConfig{
Address: "0x1235",
Expand Down Expand Up @@ -55,9 +59,13 @@ func TestIdenticalConsensus(t *testing.T) {
Actions: []sdk.StepDefinition{},
Consensus: []sdk.StepDefinition{
{
ID: "offchain_reporting@1.0.0",
Ref: "consensus",
Inputs: sdk.StepInputs{Mapping: map[string]any{"observations": "$(trigger.outputs)"}},
ID: "offchain_reporting@1.0.0",
Ref: "consensus",
Inputs: sdk.StepInputs{Mapping: map[string]any{
"observations": []any{"$(trigger.outputs)"},
"encoder": "evm",
"encoderConfig": map[string]any{"foo": "bar"},
}},
Config: map[string]any{
"encoder": "EVM",
"encoder_config": map[string]any{},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package ocr3captest

import (
"errors"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/values/pb"
Expand Down Expand Up @@ -32,12 +31,6 @@ func IdenticalConsensusForStep[T any](runner *testutils.Runner, step string) *Id
}

func identicalConsensus[T any](inputs ConsensusInput[T]) (ocr3cap.SignedReport, error) {
if len(inputs.Observations) == 0 {
return ocr3cap.SignedReport{}, errors.New("no observations were made")
} else if len(inputs.Observations) > 1 {
return ocr3cap.SignedReport{}, errors.New("more than one observation was made, but this mock isn't set up to support that")
}

wrapped, err := values.Wrap(inputs.Observations[0])
if err != nil {
return ocr3cap.SignedReport{}, err
Expand All @@ -63,16 +56,7 @@ type IdenticalConsensusMock[T any] struct {
*testutils.Mock[ConsensusInput[T], ocr3cap.SignedReport]
}

var _ testutils.ConsensusMock = &IdenticalConsensusMock[struct{}]{}

func (c *IdenticalConsensusMock[T]) SingleToManyObservations(input values.Value) (*values.Map, error) {
tmp := singleConsensusInput[T]{}
if err := input.UnwrapTo(&tmp); err != nil {
return nil, err
}

return values.CreateMapFromStruct(ConsensusInput[T]{Observations: []T{tmp.Observations}})
}
var _ capabilities.ConsensusCapability = &IdenticalConsensusMock[struct{}]{}

func (c *IdenticalConsensusMock[T]) GetStepDecoded(ref string) testutils.StepResults[ConsensusInput[T], T] {
step := c.GetStep(ref)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,3 @@ package ocr3captest
type ConsensusInput[T any] struct {
Observations []T
}

type singleConsensusInput[T any] struct {
Observations T
}
Loading

0 comments on commit 4637084

Please sign in to comment.