diff --git a/pkg/capabilities/triggers/mercury_remote_aggregator.go b/pkg/capabilities/triggers/mercury_remote_aggregator.go index 479c09bdc..5cc3f75dd 100644 --- a/pkg/capabilities/triggers/mercury_remote_aggregator.go +++ b/pkg/capabilities/triggers/mercury_remote_aggregator.go @@ -15,13 +15,14 @@ type mercuryRemoteAggregator struct { allowedSigners [][]byte minRequiredSignatures int previousLatestReports map[datastreams.FeedID]datastreams.FeedReport + capID string lggr logger.Logger } // This aggregator is used by TriggerSubscriber to aggregate trigger events from multiple remote nodes. // NOTE: Once Mercury supports parallel composition (and thus guarantee identical sets of reports), // this will be replaced by the default MODE aggregator. -func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, lggr logger.Logger) *mercuryRemoteAggregator { +func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, capID string, lggr logger.Logger) *mercuryRemoteAggregator { if allowedSigners == nil { allowedSigners = [][]byte{} } @@ -30,6 +31,7 @@ func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [] allowedSigners: allowedSigners, minRequiredSignatures: minRequiredSignatures, previousLatestReports: make(map[datastreams.FeedID]datastreams.FeedReport), + capID: capID, lggr: lggr, } } @@ -91,5 +93,5 @@ func (a *mercuryRemoteAggregator) Aggregate(triggerEventID string, responses [][ Signers: a.allowedSigners, MinRequiredSignatures: a.minRequiredSignatures, } - return wrapReports(reportList, triggerEventID, latestGlobalTs, meta) + return wrapReports(reportList, triggerEventID, latestGlobalTs, meta, a.capID) } diff --git a/pkg/capabilities/triggers/mercury_remote_aggregator_test.go b/pkg/capabilities/triggers/mercury_remote_aggregator_test.go index a659393f5..914a5ba38 100644 --- a/pkg/capabilities/triggers/mercury_remote_aggregator_test.go +++ b/pkg/capabilities/triggers/mercury_remote_aggregator_test.go @@ -16,6 +16,7 @@ const ( eventID = "ev_id_1" rawReport1 = "abcd" rawReport2 = "efgh" + capID = "streams-trigger@3.2.1" ) type testMercuryCodec struct { @@ -36,7 +37,7 @@ func (c testMercuryCodec) Wrap(reports []datastreams.FeedReport) (values.Value, } func TestMercuryRemoteAggregator(t *testing.T) { - agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, logger.Nop()) + agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, capID, logger.Nop()) signatures := [][]byte{{1, 2, 3}} feed1Old := datastreams.FeedReport{ @@ -99,7 +100,7 @@ func TestMercuryRemoteAggregator(t *testing.T) { } func getRawResponse(t *testing.T, reports []datastreams.FeedReport, timestamp int64) []byte { - resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{}) + resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{}, capID) require.NoError(t, err) rawResp, err := pb.MarshalTriggerResponse(resp) require.NoError(t, err) diff --git a/pkg/capabilities/triggers/mercury_trigger.go b/pkg/capabilities/triggers/mercury_trigger.go index 3e9ab1efe..6c521e3e5 100644 --- a/pkg/capabilities/triggers/mercury_trigger.go +++ b/pkg/capabilities/triggers/mercury_trigger.go @@ -16,19 +16,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" ) -const triggerID = "streams-trigger@1.0.0" - -var capInfo = capabilities.MustNewCapabilityInfo( - triggerID, - capabilities.CapabilityTypeTrigger, - "Streams Trigger", +const ( + defaultCapabilityName = "streams-trigger" + defaultCapabilityVersion = "1.0.0" + defaultTickerResolutionMs = 1000 + // TODO pending capabilities configuration implementation - this should be configurable with a sensible default + defaultSendChannelBufferSize = 1000 ) -const defaultTickerResolutionMs = 1000 - -// TODO pending capabilities configuration implementation - this should be configurable with a sensible default -const defaultSendChannelBufferSize = 1000 - // This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service. type MercuryTriggerService struct { capabilities.CapabilityInfo @@ -54,17 +49,31 @@ type subscriber struct { // Mercury Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber). // Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore, // all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs. -func NewMercuryTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MercuryTriggerService { +func NewMercuryTriggerService(tickerResolutionMs int64, capName string, capVersion string, lggr logger.Logger) (*MercuryTriggerService, error) { if tickerResolutionMs == 0 { tickerResolutionMs = defaultTickerResolutionMs } + if capName == "" { + capName = defaultCapabilityName + } + if capVersion == "" { + capVersion = defaultCapabilityVersion + } + capInfo, err := capabilities.NewCapabilityInfo( + capName+"@"+capVersion, + capabilities.CapabilityTypeTrigger, + "Streams Trigger", + ) + if err != nil { + return nil, err + } return &MercuryTriggerService{ CapabilityInfo: capInfo, tickerResolutionMs: tickerResolutionMs, subscribers: make(map[string]*subscriber), latestReports: make(map[datastreams.FeedID]datastreams.FeedReport), stopCh: make(services.StopChan), - lggr: logger.Named(lggr, "MercuryTriggerService")} + lggr: logger.Named(lggr, "MercuryTriggerService")}, nil } func (o *MercuryTriggerService) SetMetaOverride(meta datastreams.Metadata) { @@ -95,7 +104,7 @@ func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabil // If triggerId is already registered, return an error if _, ok := o.subscribers[req.TriggerID]; ok { - return nil, fmt.Errorf("triggerId %s already registered", triggerID) + return nil, fmt.Errorf("triggerId %s already registered", o.ID) } if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 { @@ -133,7 +142,7 @@ func (o *MercuryTriggerService) UnregisterTrigger(ctx context.Context, req capab subscriber, ok := o.subscribers[req.TriggerID] if !ok { - return fmt.Errorf("triggerId %s not registered", triggerID) + return fmt.Errorf("triggerId %s not registered", o.ID) } close(subscriber.ch) delete(o.subscribers, req.TriggerID) @@ -186,7 +195,7 @@ func (o *MercuryTriggerService) process(timestamp int64) { // use 32-byte-padded timestamp as EventID (human-readable) eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) - capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride) + capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride, o.ID) if err != nil { o.lggr.Errorw("error wrapping reports", "err", err) continue @@ -202,7 +211,7 @@ func (o *MercuryTriggerService) process(timestamp int64) { } } -func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata) (capabilities.TriggerResponse, error) { +func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata, capID string) (capabilities.TriggerResponse, error) { out := datastreams.StreamsTriggerEvent{ Payload: reportList, Metadata: meta, @@ -216,7 +225,7 @@ func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp // Create a new TriggerRegistrationResponse with the MercuryTriggerEvent return capabilities.TriggerResponse{ Event: capabilities.TriggerEvent{ - TriggerType: triggerID, + TriggerType: capID, ID: eventID, Outputs: outputsv, }, diff --git a/pkg/capabilities/triggers/mercury_trigger_test.go b/pkg/capabilities/triggers/mercury_trigger_test.go index a3c404728..80ea04940 100644 --- a/pkg/capabilities/triggers/mercury_trigger_test.go +++ b/pkg/capabilities/triggers/mercury_trigger_test.go @@ -51,7 +51,8 @@ func registerTrigger( return triggerEventsCh, registerRequest } -var ( +const ( + triggerID = "streams-trigger@4.5.6" feedOne = "0x1111111111111111111100000000000000000000000000000000000000000000" feedTwo = "0x2222222222222222222200000000000000000000000000000000000000000000" feedThree = "0x3333333333333333333300000000000000000000000000000000000000000000" @@ -60,9 +61,10 @@ var ( ) func TestMercuryTrigger(t *testing.T) { - ts := NewMercuryTriggerService(100, logger.Nop()) + ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop()) + require.NoError(t, err) ctx := tests.Context(t) - err := ts.Start(ctx) + err = ts.Start(ctx) require.NoError(t, err) // use registerTriggerHelper to register a trigger callback, registerUnregisterRequest := registerTrigger( @@ -100,9 +102,10 @@ func TestMercuryTrigger(t *testing.T) { } func TestMultipleMercuryTriggers(t *testing.T) { - ts := NewMercuryTriggerService(100, logger.Nop()) + ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop()) + require.NoError(t, err) ctx := tests.Context(t) - err := ts.Start(ctx) + err = ts.Start(ctx) require.NoError(t, err) callback1, cr1 := registerTrigger( ctx, @@ -214,7 +217,8 @@ func TestMultipleMercuryTriggers(t *testing.T) { } func TestMercuryTrigger_RegisterTriggerErrors(t *testing.T) { - ts := NewMercuryTriggerService(100, logger.Nop()) + ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop()) + require.NoError(t, err) ctx := tests.Context(t) require.NoError(t, ts.Start(ctx)) @@ -293,7 +297,8 @@ func TestMercuryTrigger_ConfigValidation(t *testing.T) { return newConfig(t, []string{feedID}, 1000) } - ts := NewMercuryTriggerService(1000, logger.Nop()) + ts, err := NewMercuryTriggerService(1000, "", "4.5.6", logger.Nop()) + require.NoError(t, err) rawConf := newConfigSingleFeed(t, "012345678901234567890123456789012345678901234567890123456789000000") conf, err := ts.ValidateConfig(rawConf) require.Error(t, err) @@ -355,7 +360,7 @@ func TestMercuryTrigger_WrapReports(t *testing.T) { ObservationTimestamp: 876543, }) } - wrapped, err := wrapReports(reportList, "event_id", 1234, meta) + wrapped, err := wrapReports(reportList, "event_id", 1234, meta, triggerID) require.NoError(t, err) require.NotNil(t, wrapped.Event) require.Len(t, wrapped.Event.Outputs.Underlying["Payload"].(*values.List).Underlying, P)