Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CM-380] Identical Aggregator #771

Merged
merged 44 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7b72471
[CM-380] Identical Aggregator
bolekk Sep 16, 2024
9c58d5c
[CAPPL-60] Dynamic encoder selection in OCR consensus aggregator
bolekk Sep 18, 2024
2080f7b
extract encoder name and config
bolekk Sep 19, 2024
332728e
Add more tests
cedric-cordenier Oct 4, 2024
86e8dfc
add limit to seq num range (#781)
dimkouv Sep 19, 2024
ee30401
[chore] Handle aliases in slices (#784)
cedric-cordenier Sep 20, 2024
fdf417c
feat(observability-lib): legendoptions + improvement on node general …
Atrax1 Sep 23, 2024
f88a9b7
[CAPPL-58] Correctly stub out clock_time_get and poll_oneoff (#778)
cedric-cordenier Sep 23, 2024
0554950
More alias handling in Unwrap functionality of Value (#792)
kidambisrinivas Sep 23, 2024
1c89fdb
Fix alias typing and tests (#788)
nolag Sep 23, 2024
dc37362
Replace fmt.Errorf with errors.New where possible (#795)
nolag Sep 23, 2024
0c7cade
chore(workflows): adds unit test to utils (#782)
MStreet3 Sep 23, 2024
920340b
Have the mock runner register with capabilites (#783)
nolag Sep 23, 2024
a156b08
Add binary + config to custom compute (#794)
cedric-cordenier Sep 23, 2024
4fa1cca
fix lint issues (#786)
jmank88 Sep 24, 2024
bfc79d1
execution factory constructor updated to take two providers, chainIDs…
patrickhuie19 Sep 24, 2024
f758a45
Support passing in a values.Value to the chainreader GetLatestValue m…
ettec Sep 25, 2024
f56aed3
[CAPPL-31] feat(values): adds support for time.Time as value (#787)
MStreet3 Sep 25, 2024
0612595
feat(values): support float64 values (#799)
MStreet3 Sep 25, 2024
f6a9aa6
confidence level from string (#802)
ettec Sep 26, 2024
75f3f48
Float32/Float64 wrapping (#804)
cedric-cordenier Sep 26, 2024
ba2d2f2
feat: implement sdk logger (#762)
agparadiso Sep 26, 2024
2ed5fc2
Add MustEmbed Constraint to Contract Reader (#801)
EasterTheBunny Sep 26, 2024
490e6df
Updated TestStruct to enable advanced querying (#798)
silaslenihan Sep 26, 2024
0e2d430
Properly support the range of uint64 and allow big int to unwrap into…
nolag Sep 27, 2024
fbc5b07
Extract expirable cache abstraction for reuse (#807)
ettec Sep 30, 2024
4aa0b5f
remove cache (#812)
ettec Sep 30, 2024
62e7bca
CCIP-3555 Attestation encoder interfaces (#813)
mateusz-sekara Sep 30, 2024
a1f8c4c
[BCF-3392] - ContractReaderByIDs Wrapper (#797)
ilija42 Sep 30, 2024
a6558d6
pkg/types/ccipocr3: add DestExecData to RampTokenAmount (#817)
makramkd Oct 1, 2024
d946cbc
Allow the creation of maps from string to capbility outputs. (#815)
nolag Oct 1, 2024
5150f85
Add the FeeValueJuels field to ccipocr3.Message (#819)
rstout Oct 1, 2024
3244162
feat(observability-lib): improve alerts rule (#803)
Atrax1 Oct 2, 2024
369d2e7
enable errorf check (#826)
jmank88 Oct 2, 2024
76c6706
Merge branch 'main' into feature/CM-380-identical-aggregator
cedric-cordenier Oct 4, 2024
0e91007
Make overridding the encoder first-class
cedric-cordenier Oct 9, 2024
bfdb8a6
Merge branch 'main' into feature/CM-380-identical-aggregator
cedric-cordenier Oct 9, 2024
9fc17a6
Update mocks
cedric-cordenier Oct 9, 2024
b0608fc
Mock updates
cedric-cordenier Oct 9, 2024
7abd311
Adjust tests
cedric-cordenier Oct 9, 2024
449d79f
Fix mock
cedric-cordenier Oct 9, 2024
13de65e
Fix mock
cedric-cordenier Oct 9, 2024
ef71d0f
Update mock
cedric-cordenier Oct 9, 2024
3cf5d9f
Linting
cedric-cordenier Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package aggregators

import (
"crypto/sha256"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"

ocrcommon "github.com/smartcontractkit/libocr/commontypes"
)

type identicalAggregator struct {
config aggregatorConfig
lggr logger.Logger
}

type aggregatorConfig struct {
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
// Length of the list of observations that each node is expected to provide.
// Aggregator's output (i.e. EncodableOutcome) will be a values.Map with the same
// number of elements and keyed by indices 0,1,2,... (unless KeyOverrides are provided).
// Defaults to 1.
ExpectedObservationsLen int
// If non-empty, the keys in the outcome map will be replaced with these values.
// If non-empty, must be of length ExpectedObservationsLen.
KeyOverrides []string
}

type counter struct {
fullObservation values.Value
count int
}

var _ types.Aggregator = (*identicalAggregator)(nil)

func (a *identicalAggregator) Aggregate(_ *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) {
counters := []map[[32]byte]*counter{}
for i := 0; i < a.config.ExpectedObservationsLen; i++ {
counters = append(counters, map[[32]byte]*counter{})
}
for nodeID, nodeObservations := range observations {
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
a.lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) != a.config.ExpectedObservationsLen {
a.lggr.Warnf("node %d contributed with an incorrect number of observations %d - ignoring them", nodeID, len(nodeObservations))
continue
}
for idx, observation := range nodeObservations {
marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(observation))
if err != nil {
return nil, err
}
sha := sha256.Sum256(marshalled)
elem, ok := counters[idx][sha]
if !ok {
counters[idx][sha] = &counter{
fullObservation: observation,
count: 1,
}
} else {
elem.count++
}
}
}
return a.collectHighestCounts(counters, f)
}

func (a *identicalAggregator) collectHighestCounts(counters []map[[32]byte]*counter, f int) (*types.AggregationOutcome, error) {
useOverrides := len(a.config.KeyOverrides) == len(counters)
outcome := make(map[string]any)
for idx, shaToCounter := range counters {
highestCount := 0
var highestObservation values.Value
for _, counter := range shaToCounter {
if counter.count > highestCount {
highestCount = counter.count
highestObservation = counter.fullObservation
}
}
if highestCount < 2*f+1 {
return nil, fmt.Errorf("can't reach consensus on observations with index %d", idx)
}
if useOverrides {
outcome[a.config.KeyOverrides[idx]] = highestObservation
} else {
outcome[fmt.Sprintf("%d", idx)] = highestObservation
}
}
valMap, err := values.NewMap(outcome)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there needs to be a special case for a single observation to be just the object that's not keyed. This is our use case and I think most will be that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too but unfortunately the return type of outcome is a Map, not a Value... hence the key overrides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but if there's only one observation, the single observation's value should be a value.Value. That way the one observation can be unwrapped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that EncodableOutcome's type is values.Map so I can't assign Value to a Map... It needs at least one key.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was thinking is that the outcome gets stored in a fixed struct:

struct EncoderOutcome {
   Value any
}

Then you send the Value to the encoder.

if err != nil {
return nil, err
}
return &types.AggregationOutcome{
EncodableOutcome: values.ProtoMap(valMap),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you can make this a value.Value. There's no reason someone can't encode a number, a list of numbers etc.

Copy link
Contributor

@cedric-cordenier cedric-cordenier Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually assume that it needs to be a map since we add metadata to it. Any objection to leaving this as is and revisiting at a future date?

Metadata: nil,
ShouldReport: true,
}, nil
}

func NewIdenticalAggregator(config values.Map, lggr logger.Logger) (*identicalAggregator, error) {
parsedConfig, err := ParseConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to parse config (%+v): %w", config, err)
}
return &identicalAggregator{
config: parsedConfig,
lggr: lggr,
}, nil
}

func ParseConfig(config values.Map) (aggregatorConfig, error) {
parsedConfig := aggregatorConfig{}
if err := config.UnwrapTo(&parsedConfig); err != nil {
return aggregatorConfig{}, err
}
if parsedConfig.ExpectedObservationsLen == 0 {
parsedConfig.ExpectedObservationsLen = 1
}
return parsedConfig, nil
}
93 changes: 93 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package aggregators_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/aggregators"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

func TestDataFeedsAggregator_Aggregate(t *testing.T) {
config := getConfig(t, nil)
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["0"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_OverrideWithKeys(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
require.Equal(t, "", outcome.EncoderName)
require.Nil(t, outcome.EncoderConfig)

m, err := values.FromMapValueProto(outcome.EncodableOutcome)
require.NoError(t, err)

require.Len(t, m.Underlying, 1)
require.Equal(t, m.Underlying["outcome"], values.NewString("a"))
}

func TestDataFeedsAggregator_Aggregate_NoConsensus(t *testing.T) {
config := getConfig(t, []string{"outcome"})
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

encoderStr := "evm"
encoderName := values.NewString(encoderStr)
encoderCfg, err := values.NewMap(map[string]any{"foo": "bar"})
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a"), encoderName, encoderCfg},
1: {values.NewString("b"), encoderName, encoderCfg},
2: {values.NewString("b"), encoderName, encoderCfg},
3: {values.NewString("a"), encoderName, encoderCfg},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.Nil(t, outcome)
require.ErrorContains(t, err, "can't reach consensus on observations with index 0")
}

func getConfig(t *testing.T, overrideKeys []string) *values.Map {
unwrappedConfig := map[string]any{
"expectedObservationsLen": len(overrideKeys),
"keyOverrides": overrideKeys,
}

config, err := values.NewMap(unwrappedConfig)
require.NoError(t, err)
return config
}
2 changes: 2 additions & 0 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func (o *capability) queueRequestForProcessing(
WorkflowDonID: metadata.WorkflowDonID,
WorkflowDonConfigVersion: metadata.WorkflowDonConfigVersion,
Observations: i.Observations,
OverriddenEncoderName: i.EncoderName,
OverriddenEncoderConfig: i.EncoderConfig,
KeyID: c.KeyID,
ExpiresAt: o.clock.Now().Add(requestTimeout),
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/capabilities/consensus/ocr3/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ type config struct {
}

type inputs struct {
Observations *values.List `json:"observations"`
Observations *values.List `json:"observations" jsonschema:""`
EncoderName string `mapstructure:"encoder" json:"encoder,omitempty"`
EncoderConfig *values.Map `mapstructure:"encoder_config" json:"encoder_config,omitempty"`
}
12 changes: 6 additions & 6 deletions pkg/capabilities/consensus/ocr3/ocr3cap/identical_consensus.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mock in ocr3cap mock might need to change for this to work with it. Should we expose another wrapper to get multiple consensus or just hide at that point? I'm good either way.

Also, if we're only returning one of the signed reports, but there's actually an array, then line 30 should be of type []SignedReport and you would need to wrap the capability in sdk.ToListDefinition and on line 31 use step.Index(0) instead of step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we're only returning one of the signed reports, but there's actually an array, then line 30 should be of type []SignedReport and you would need to wrap the capability in sdk.ToListDefinition and on line 31 use step.Index(0) instead of step.

We only output one report atm, unless I'm misunderstanding the question?

Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ func (c IdenticalConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string,
}

type IdenticalConsensusInput[T any] struct {
Observations sdk.CapDefinition[T]
}

type IdenticalConsensusMergedInput[T any] struct {
Observations []T
Observation sdk.CapDefinition[T]
Encoder Encoder
EncoderConfig EncoderConfig
}

func (input IdenticalConsensusInput[T]) ToSteps() sdk.StepInputs {
return sdk.StepInputs{
Mapping: map[string]any{
"observations": input.Observations.Ref(),
"observations": sdk.ListOf(input.Observation).Ref(),
"encoder": input.Encoder,
"encoderConfig": input.EncoderConfig,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestIdenticalConsensus(t *testing.T) {
Encoder: ocr3.EncoderEVM,
EncoderConfig: ocr3.EncoderConfig{},
ReportID: "0001",
}.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{Observations: trigger})
}.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{
Observation: trigger,
Encoder: "evm",
EncoderConfig: ocr3.EncoderConfig(map[string]any{"foo": "bar"}),
})

chainwriter.TargetConfig{
Address: "0x1235",
Expand Down Expand Up @@ -55,9 +59,13 @@ func TestIdenticalConsensus(t *testing.T) {
Actions: []sdk.StepDefinition{},
Consensus: []sdk.StepDefinition{
{
ID: "offchain_reporting@1.0.0",
Ref: "consensus",
Inputs: sdk.StepInputs{Mapping: map[string]any{"observations": "$(trigger.outputs)"}},
ID: "offchain_reporting@1.0.0",
Ref: "consensus",
Inputs: sdk.StepInputs{Mapping: map[string]any{
"observations": []any{"$(trigger.outputs)"},
"encoder": "evm",
"encoderConfig": map[string]any{"foo": "bar"},
}},
Config: map[string]any{
"encoder": "EVM",
"encoder_config": map[string]any{},
Expand Down
Loading
Loading