Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CM-380] Identical Aggregator #771

Merged
merged 44 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7b72471
[CM-380] Identical Aggregator
bolekk Sep 16, 2024
9c58d5c
[CAPPL-60] Dynamic encoder selection in OCR consensus aggregator
bolekk Sep 18, 2024
2080f7b
extract encoder name and config
bolekk Sep 19, 2024
332728e
Add more tests
cedric-cordenier Oct 4, 2024
86e8dfc
add limit to seq num range (#781)
dimkouv Sep 19, 2024
ee30401
[chore] Handle aliases in slices (#784)
cedric-cordenier Sep 20, 2024
fdf417c
feat(observability-lib): legendoptions + improvement on node general …
Atrax1 Sep 23, 2024
f88a9b7
[CAPPL-58] Correctly stub out clock_time_get and poll_oneoff (#778)
cedric-cordenier Sep 23, 2024
0554950
More alias handling in Unwrap functionality of Value (#792)
kidambisrinivas Sep 23, 2024
1c89fdb
Fix alias typing and tests (#788)
nolag Sep 23, 2024
dc37362
Replace fmt.Errorf with errors.New where possible (#795)
nolag Sep 23, 2024
0c7cade
chore(workflows): adds unit test to utils (#782)
MStreet3 Sep 23, 2024
920340b
Have the mock runner register with capabilites (#783)
nolag Sep 23, 2024
a156b08
Add binary + config to custom compute (#794)
cedric-cordenier Sep 23, 2024
4fa1cca
fix lint issues (#786)
jmank88 Sep 24, 2024
bfc79d1
execution factory constructor updated to take two providers, chainIDs…
patrickhuie19 Sep 24, 2024
f758a45
Support passing in a values.Value to the chainreader GetLatestValue m…
ettec Sep 25, 2024
f56aed3
[CAPPL-31] feat(values): adds support for time.Time as value (#787)
MStreet3 Sep 25, 2024
0612595
feat(values): support float64 values (#799)
MStreet3 Sep 25, 2024
f6a9aa6
confidence level from string (#802)
ettec Sep 26, 2024
75f3f48
Float32/Float64 wrapping (#804)
cedric-cordenier Sep 26, 2024
ba2d2f2
feat: implement sdk logger (#762)
agparadiso Sep 26, 2024
2ed5fc2
Add MustEmbed Constraint to Contract Reader (#801)
EasterTheBunny Sep 26, 2024
490e6df
Updated TestStruct to enable advanced querying (#798)
silaslenihan Sep 26, 2024
0e2d430
Properly support the range of uint64 and allow big int to unwrap into…
nolag Sep 27, 2024
fbc5b07
Extract expirable cache abstraction for reuse (#807)
ettec Sep 30, 2024
4aa0b5f
remove cache (#812)
ettec Sep 30, 2024
62e7bca
CCIP-3555 Attestation encoder interfaces (#813)
mateusz-sekara Sep 30, 2024
a1f8c4c
[BCF-3392] - ContractReaderByIDs Wrapper (#797)
ilija42 Sep 30, 2024
a6558d6
pkg/types/ccipocr3: add DestExecData to RampTokenAmount (#817)
makramkd Oct 1, 2024
d946cbc
Allow the creation of maps from string to capbility outputs. (#815)
nolag Oct 1, 2024
5150f85
Add the FeeValueJuels field to ccipocr3.Message (#819)
rstout Oct 1, 2024
3244162
feat(observability-lib): improve alerts rule (#803)
Atrax1 Oct 2, 2024
369d2e7
enable errorf check (#826)
jmank88 Oct 2, 2024
76c6706
Merge branch 'main' into feature/CM-380-identical-aggregator
cedric-cordenier Oct 4, 2024
0e91007
Make overridding the encoder first-class
cedric-cordenier Oct 9, 2024
bfdb8a6
Merge branch 'main' into feature/CM-380-identical-aggregator
cedric-cordenier Oct 9, 2024
9fc17a6
Update mocks
cedric-cordenier Oct 9, 2024
b0608fc
Mock updates
cedric-cordenier Oct 9, 2024
7abd311
Adjust tests
cedric-cordenier Oct 9, 2024
449d79f
Fix mock
cedric-cordenier Oct 9, 2024
13de65e
Fix mock
cedric-cordenier Oct 9, 2024
ef71d0f
Update mock
cedric-cordenier Oct 9, 2024
3cf5d9f
Linting
cedric-cordenier Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package aggregators

import (
"crypto/sha256"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/values/pb"

ocrcommon "github.com/smartcontractkit/libocr/commontypes"
)

type identicalAggregator struct {
config aggregatorConfig
lggr logger.Logger
}

type aggregatorConfig struct {
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
// Length of the list of observations that each node is expected to provide.
// Aggregator's output (i.e. EncodableOutcome) will be a values.Map with the same
// number of elements and keyed by indices 0,1,2,... (unless KeyOverrides are provided).
// Defaults to 1.
ExpectedObservationsLen int
// If non-empty, the keys in the outcome map will be replaced with these values.
// If non-empty, must be of length ExpectedObservationsLen.
KeyOverrides []string
// If true, the encoder name and config will be overridden with the values
// provided under configured indices of the observations array (as long as consensus is reached).
Copy link
Contributor Author

@bolekk bolekk Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too happy with this approach... suggestions welcome.

@cedric-cordenier this is slightly different to what we discussed but I don't see how we can make a generic aggregator with only a single observation.

OverrideEncoder bool
EncoderNameObservationIndex int
EncoderConfigObservationIndex int
}

type counter struct {
fullObservation values.Value
count int
}

var _ types.Aggregator = (*identicalAggregator)(nil)

func (a *identicalAggregator) Aggregate(_ *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error) {
counters := []map[[32]byte]*counter{}
for i := 0; i < a.config.ExpectedObservationsLen; i++ {
counters = append(counters, map[[32]byte]*counter{})
}
for nodeID, nodeObservations := range observations {
if len(nodeObservations) == 0 || nodeObservations[0] == nil {
a.lggr.Warnf("node %d contributed with empty observations", nodeID)
continue
}
if len(nodeObservations) != a.config.ExpectedObservationsLen {
a.lggr.Warnf("node %d contributed with an incorrect number of observations %d - ignoring them", nodeID, len(nodeObservations))
continue
}
for idx, observation := range nodeObservations {
marshalled, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(observation))
if err != nil {
return nil, err
}
sha := sha256.Sum256(marshalled)
elem, ok := counters[idx][sha]
if !ok {
counters[idx][sha] = &counter{
fullObservation: observation,
count: 1,
}
} else {
elem.count++
}
}
}
return a.collectHighestCounts(counters, f)
}

func (a *identicalAggregator) collectHighestCounts(counters []map[[32]byte]*counter, f int) (*types.AggregationOutcome, error) {
useOverrides := len(a.config.KeyOverrides) == len(counters)
outcome := make(map[string]any)
var encoderName string
var encoderConfig *pb.Map
for idx, shaToCounter := range counters {
highestCount := 0
var highestObservation values.Value
for _, counter := range shaToCounter {
if counter.count > highestCount {
highestCount = counter.count
highestObservation = counter.fullObservation
}
}
if highestCount < 2*f+1 {
return nil, fmt.Errorf("can't reach consensus on observations with index %d", idx)
}
if useOverrides {
outcome[a.config.KeyOverrides[idx]] = highestObservation
} else {
outcome[fmt.Sprintf("%d", idx)] = highestObservation
}
if a.config.OverrideEncoder {
if idx == a.config.EncoderNameObservationIndex {
// extract encoder name from observation
name, ok := highestObservation.(*values.String)
if !ok {
return nil, fmt.Errorf("expected encoder name under index %d to be a String", idx)
}
encoderName = name.Underlying
} else if idx == a.config.EncoderConfigObservationIndex {
// extract encoder config from observation
conf, ok := highestObservation.(*values.Map)
if !ok {
return nil, fmt.Errorf("expected encoder config under index %d to be a Map", idx)
}
encoderConfig = values.ProtoMap(conf)
}
}
}
valMap, err := values.NewMap(outcome)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there needs to be a special case for a single observation to be just the object that's not keyed. This is our use case and I think most will be that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too but unfortunately the return type of outcome is a Map, not a Value... hence the key overrides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but if there's only one observation, the single observation's value should be a value.Value. That way the one observation can be unwrapped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that EncodableOutcome's type is values.Map so I can't assign Value to a Map... It needs at least one key.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was thinking is that the outcome gets stored in a fixed struct:

struct EncoderOutcome {
   Value any
}

Then you send the Value to the encoder.

if err != nil {
return nil, err
}
return &types.AggregationOutcome{
EncodableOutcome: values.ProtoMap(valMap),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you can make this a value.Value. There's no reason someone can't encode a number, a list of numbers etc.

Copy link
Contributor

@cedric-cordenier cedric-cordenier Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually assume that it needs to be a map since we add metadata to it. Any objection to leaving this as is and revisiting at a future date?

Metadata: nil,
ShouldReport: true,
EncoderName: encoderName,
EncoderConfig: encoderConfig,
}, nil
}

func NewIdenticalAggregator(config values.Map, lggr logger.Logger) (*identicalAggregator, error) {
parsedConfig, err := ParseConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to parse config (%+v): %w", config, err)
}
return &identicalAggregator{
config: parsedConfig,
lggr: lggr,
}, nil
}

func ParseConfig(config values.Map) (aggregatorConfig, error) {
parsedConfig := aggregatorConfig{}
if err := config.UnwrapTo(&parsedConfig); err != nil {
return aggregatorConfig{}, err
}
if parsedConfig.ExpectedObservationsLen == 0 {
parsedConfig.ExpectedObservationsLen = 1
}
return parsedConfig, nil
}
38 changes: 38 additions & 0 deletions pkg/capabilities/consensus/ocr3/aggregators/identical_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package aggregators_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/aggregators"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
config := getConfig(t)
agg, err := aggregators.NewIdenticalAggregator(*config, logger.Nop())
require.NoError(t, err)

observations := map[commontypes.OracleID][]values.Value{
0: {values.NewString("a")},
1: {values.NewString("a")},
2: {values.NewString("a")},
3: {values.NewString("a")},
}
outcome, err := agg.Aggregate(nil, observations, 1)
require.NoError(t, err)
require.True(t, outcome.ShouldReport)
}

func getConfig(t *testing.T) *values.Map {
unwrappedConfig := map[string]any{
"expectedObservationsLen": 1,
}
config, err := values.NewMap(unwrappedConfig)
require.NoError(t, err)
return config
}
8 changes: 6 additions & 2 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (o *capability) RegisterToWorkflow(ctx context.Context, request capabilitie
}
o.aggregators[request.Metadata.WorkflowID] = agg

encoder, err := o.encoderFactory(c.EncoderConfig)
encoder, err := o.encoderFactory(c.Encoder, c.EncoderConfig, o.lggr)
if err != nil {
return err
}
Expand All @@ -143,7 +143,7 @@ func (o *capability) getAggregator(workflowID string) (types.Aggregator, error)
return agg, nil
}

func (o *capability) getEncoder(workflowID string) (types.Encoder, error) {
func (o *capability) getEncoderByWorkflowID(workflowID string) (types.Encoder, error) {
enc, ok := o.encoders[workflowID]
if !ok {
return nil, fmt.Errorf("no aggregator found for workflowID %s", workflowID)
Expand All @@ -152,6 +152,10 @@ func (o *capability) getEncoder(workflowID string) (types.Encoder, error) {
return enc, nil
}

func (o *capability) getEncoderByName(encoderName string, config *values.Map) (types.Encoder, error) {
return o.encoderFactory(encoderName, config, o.lggr)
}

func (o *capability) getRegisteredWorkflowsIDs() []string {
o.mu.RLock()
defer o.mu.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type encoder struct {
types.Encoder
}

func mockEncoderFactory(_ *values.Map) (types.Encoder, error) {
func mockEncoderFactory(_ string, _ *values.Map, _ logger.Logger) (types.Encoder, error) {
return &encoder{}, nil
}

Expand Down
33 changes: 25 additions & 8 deletions pkg/capabilities/consensus/ocr3/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var _ ocr3types.ReportingPlugin[[]byte] = (*reportingPlugin)(nil)

type capabilityIface interface {
getAggregator(workflowID string) (pbtypes.Aggregator, error)
getEncoder(workflowID string) (pbtypes.Encoder, error)
getEncoderByWorkflowID(workflowID string) (pbtypes.Encoder, error)
getEncoderByName(encoderName string, config *values.Map) (pbtypes.Encoder, error)
getRegisteredWorkflowsIDs() []string
unregisterWorkflowID(workflowID string)
}
Expand Down Expand Up @@ -390,7 +391,7 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
ShouldReport: outcome.ShouldReport,
}

var report []byte
var rawReport []byte
if info.ShouldReport {
meta := &pbtypes.Metadata{
Version: 1,
Expand All @@ -409,10 +410,26 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
continue
}

enc, err := r.r.getEncoder(id.WorkflowId)
if err != nil {
lggr.Errorw("could not retrieve encoder for workflow", "error", err)
continue
var encoder pbtypes.Encoder
if newOutcome.EncoderName != "" {
lggr.Debugw("using encoder from outcome", "encoderName", newOutcome.EncoderName, "executionID", report.Id.WorkflowExecutionId)
encoderConfig, err2 := values.FromMapValueProto(newOutcome.EncoderConfig)
if err2 != nil {
lggr.Errorw("could not convert desired encoder config to values.Map", "error", err2, "executionID", report.Id.WorkflowExecutionId)
} else {
encoder, err2 = r.r.getEncoderByName(newOutcome.EncoderName, encoderConfig)
if err2 != nil {
lggr.Errorw("could not retrieve desired encoder, will use per-workflow default", "error", err2, "executionID", report.Id.WorkflowExecutionId)
}
}
}

if encoder == nil {
encoder, err = r.r.getEncoderByWorkflowID(id.WorkflowId)
if err != nil {
lggr.Errorw("could not retrieve encoder for workflow", "error", err)
continue
}
}

mv, err := values.FromMapValueProto(newOutcome.EncodableOutcome)
Expand All @@ -421,7 +438,7 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
continue
}

report, err = enc.Encode(context.Background(), *mv)
rawReport, err = encoder.Encode(context.Background(), *mv)
if err != nil {
r.lggr.Errorw("could not encode report for workflow", "error", err)
continue
Expand All @@ -436,7 +453,7 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc

// Append every report, even if shouldReport = false, to let the transmitter mark the step as complete.
reports = append(reports, ocr3types.ReportWithInfo[[]byte]{
Report: report,
Report: rawReport,
Info: infob,
})
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/capabilities/consensus/ocr3/reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func TestReportingPlugin_Query(t *testing.T) {
}

type mockCapability struct {
t *testing.T
aggregator *aggregator
encoder *enc
registeredWorkflows map[string]bool
expectedEncoderName string
}

type aggregator struct {
Expand Down Expand Up @@ -114,7 +116,12 @@ func (mc *mockCapability) getAggregator(workflowID string) (pbtypes.Aggregator,
return mc.aggregator, nil
}

func (mc *mockCapability) getEncoder(workflowID string) (pbtypes.Encoder, error) {
func (mc *mockCapability) getEncoderByWorkflowID(workflowID string) (pbtypes.Encoder, error) {
return mc.encoder, nil
}

func (mc *mockCapability) getEncoderByName(encoderName string, config *values.Map) (pbtypes.Encoder, error) {
require.Equal(mc.t, mc.expectedEncoderName, encoderName)
return mc.encoder, nil
}

Expand Down Expand Up @@ -460,10 +467,13 @@ func TestReportingPlugin_Reports_NilDerefs(t *testing.T) {

func TestReportingPlugin_Reports_ShouldReportTrue(t *testing.T) {
lggr := logger.Test(t)
dynamicEncoderName := "special_encoder"
s := requests.NewStore()
mcap := &mockCapability{
aggregator: &aggregator{},
encoder: &enc{},
t: t,
aggregator: &aggregator{},
encoder: &enc{},
expectedEncoderName: dynamicEncoderName,
}
rp, err := newReportingPlugin(s, mcap, defaultBatchSize, ocr3types.ReportingPluginConfig{}, defaultOutcomePruningThreshold, lggr)
require.NoError(t, err)
Expand Down Expand Up @@ -494,6 +504,7 @@ func TestReportingPlugin_Reports_ShouldReportTrue(t *testing.T) {
Outcome: &pbtypes.AggregationOutcome{
EncodableOutcome: nmp,
ShouldReport: true,
EncoderName: dynamicEncoderName,
},
},
},
Expand Down
8 changes: 6 additions & 2 deletions pkg/capabilities/consensus/ocr3/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func TestTransmitter(t *testing.T) {
clockwork.NewFakeClock(),
10*time.Second,
mockAggregatorFactory,
func(config *values.Map) (pbtypes.Encoder, error) { return &encoder{}, nil },
func(_ string, _ *values.Map, _ logger.Logger) (pbtypes.Encoder, error) {
return &encoder{}, nil
},
lggr,
10,
)
Expand Down Expand Up @@ -127,7 +129,9 @@ func TestTransmitter_ShouldReportFalse(t *testing.T) {
clockwork.NewFakeClock(),
10*time.Second,
mockAggregatorFactory,
func(config *values.Map) (pbtypes.Encoder, error) { return &encoder{}, nil },
func(_ string, _ *values.Map, _ logger.Logger) (pbtypes.Encoder, error) {
return &encoder{}, nil
},
lggr,
10,
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/capabilities/consensus/ocr3/types/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package types
import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

type Encoder interface {
Encode(ctx context.Context, input values.Map) ([]byte, error)
}

type EncoderFactory func(config *values.Map) (Encoder, error)
type EncoderFactory func(name string, config *values.Map, lggr logger.Logger) (Encoder, error)

type SignedReport struct {
Report []byte
Expand Down
Loading
Loading