Skip to content

Commit

Permalink
[KS-507] Make Streams Trigger ID (name+version) configurable (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Nov 8, 2024
1 parent 1531008 commit 44ef01d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 30 deletions.
6 changes: 4 additions & 2 deletions pkg/capabilities/triggers/mercury_remote_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ type mercuryRemoteAggregator struct {
allowedSigners [][]byte
minRequiredSignatures int
previousLatestReports map[datastreams.FeedID]datastreams.FeedReport
capID string
lggr logger.Logger
}

// This aggregator is used by TriggerSubscriber to aggregate trigger events from multiple remote nodes.
// NOTE: Once Mercury supports parallel composition (and thus guarantee identical sets of reports),
// this will be replaced by the default MODE aggregator.
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, lggr logger.Logger) *mercuryRemoteAggregator {
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, capID string, lggr logger.Logger) *mercuryRemoteAggregator {
if allowedSigners == nil {
allowedSigners = [][]byte{}
}
Expand All @@ -30,6 +31,7 @@ func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners []
allowedSigners: allowedSigners,
minRequiredSignatures: minRequiredSignatures,
previousLatestReports: make(map[datastreams.FeedID]datastreams.FeedReport),
capID: capID,
lggr: lggr,
}
}
Expand Down Expand Up @@ -91,5 +93,5 @@ func (a *mercuryRemoteAggregator) Aggregate(triggerEventID string, responses [][
Signers: a.allowedSigners,
MinRequiredSignatures: a.minRequiredSignatures,
}
return wrapReports(reportList, triggerEventID, latestGlobalTs, meta)
return wrapReports(reportList, triggerEventID, latestGlobalTs, meta, a.capID)
}
5 changes: 3 additions & 2 deletions pkg/capabilities/triggers/mercury_remote_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
eventID = "ev_id_1"
rawReport1 = "abcd"
rawReport2 = "efgh"
capID = "streams-trigger@3.2.1"
)

type testMercuryCodec struct {
Expand All @@ -36,7 +37,7 @@ func (c testMercuryCodec) Wrap(reports []datastreams.FeedReport) (values.Value,
}

func TestMercuryRemoteAggregator(t *testing.T) {
agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, logger.Nop())
agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, capID, logger.Nop())
signatures := [][]byte{{1, 2, 3}}

feed1Old := datastreams.FeedReport{
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestMercuryRemoteAggregator(t *testing.T) {
}

func getRawResponse(t *testing.T, reports []datastreams.FeedReport, timestamp int64) []byte {
resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{})
resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{}, capID)
require.NoError(t, err)
rawResp, err := pb.MarshalTriggerResponse(resp)
require.NoError(t, err)
Expand Down
45 changes: 27 additions & 18 deletions pkg/capabilities/triggers/mercury_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

const triggerID = "streams-trigger@1.0.0"

var capInfo = capabilities.MustNewCapabilityInfo(
triggerID,
capabilities.CapabilityTypeTrigger,
"Streams Trigger",
const (
defaultCapabilityName = "streams-trigger"
defaultCapabilityVersion = "1.0.0"
defaultTickerResolutionMs = 1000
// TODO pending capabilities configuration implementation - this should be configurable with a sensible default
defaultSendChannelBufferSize = 1000
)

const defaultTickerResolutionMs = 1000

// TODO pending capabilities configuration implementation - this should be configurable with a sensible default
const defaultSendChannelBufferSize = 1000

// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service.
type MercuryTriggerService struct {
capabilities.CapabilityInfo
Expand All @@ -54,17 +49,31 @@ type subscriber struct {
// Mercury Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber).
// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore,
// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs.
func NewMercuryTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MercuryTriggerService {
func NewMercuryTriggerService(tickerResolutionMs int64, capName string, capVersion string, lggr logger.Logger) (*MercuryTriggerService, error) {
if tickerResolutionMs == 0 {
tickerResolutionMs = defaultTickerResolutionMs
}
if capName == "" {
capName = defaultCapabilityName
}
if capVersion == "" {
capVersion = defaultCapabilityVersion
}
capInfo, err := capabilities.NewCapabilityInfo(
capName+"@"+capVersion,
capabilities.CapabilityTypeTrigger,
"Streams Trigger",
)
if err != nil {
return nil, err
}
return &MercuryTriggerService{
CapabilityInfo: capInfo,
tickerResolutionMs: tickerResolutionMs,
subscribers: make(map[string]*subscriber),
latestReports: make(map[datastreams.FeedID]datastreams.FeedReport),
stopCh: make(services.StopChan),
lggr: logger.Named(lggr, "MercuryTriggerService")}
lggr: logger.Named(lggr, "MercuryTriggerService")}, nil
}

func (o *MercuryTriggerService) SetMetaOverride(meta datastreams.Metadata) {
Expand Down Expand Up @@ -95,7 +104,7 @@ func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabil

// If triggerId is already registered, return an error
if _, ok := o.subscribers[req.TriggerID]; ok {
return nil, fmt.Errorf("triggerId %s already registered", triggerID)
return nil, fmt.Errorf("triggerId %s already registered", o.ID)
}

if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 {
Expand Down Expand Up @@ -133,7 +142,7 @@ func (o *MercuryTriggerService) UnregisterTrigger(ctx context.Context, req capab

subscriber, ok := o.subscribers[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", triggerID)
return fmt.Errorf("triggerId %s not registered", o.ID)
}
close(subscriber.ch)
delete(o.subscribers, req.TriggerID)
Expand Down Expand Up @@ -186,7 +195,7 @@ func (o *MercuryTriggerService) process(timestamp int64) {

// use 32-byte-padded timestamp as EventID (human-readable)
eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10))
capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride)
capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride, o.ID)
if err != nil {
o.lggr.Errorw("error wrapping reports", "err", err)
continue
Expand All @@ -202,7 +211,7 @@ func (o *MercuryTriggerService) process(timestamp int64) {
}
}

func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata) (capabilities.TriggerResponse, error) {
func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata, capID string) (capabilities.TriggerResponse, error) {
out := datastreams.StreamsTriggerEvent{
Payload: reportList,
Metadata: meta,
Expand All @@ -216,7 +225,7 @@ func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp
// Create a new TriggerRegistrationResponse with the MercuryTriggerEvent
return capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: triggerID,
TriggerType: capID,
ID: eventID,
Outputs: outputsv,
},
Expand Down
21 changes: 13 additions & 8 deletions pkg/capabilities/triggers/mercury_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func registerTrigger(
return triggerEventsCh, registerRequest
}

var (
const (
triggerID = "streams-trigger@4.5.6"
feedOne = "0x1111111111111111111100000000000000000000000000000000000000000000"
feedTwo = "0x2222222222222222222200000000000000000000000000000000000000000000"
feedThree = "0x3333333333333333333300000000000000000000000000000000000000000000"
Expand All @@ -60,9 +61,10 @@ var (
)

func TestMercuryTrigger(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
err := ts.Start(ctx)
err = ts.Start(ctx)
require.NoError(t, err)
// use registerTriggerHelper to register a trigger
callback, registerUnregisterRequest := registerTrigger(
Expand Down Expand Up @@ -100,9 +102,10 @@ func TestMercuryTrigger(t *testing.T) {
}

func TestMultipleMercuryTriggers(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
err := ts.Start(ctx)
err = ts.Start(ctx)
require.NoError(t, err)
callback1, cr1 := registerTrigger(
ctx,
Expand Down Expand Up @@ -214,7 +217,8 @@ func TestMultipleMercuryTriggers(t *testing.T) {
}

func TestMercuryTrigger_RegisterTriggerErrors(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
require.NoError(t, ts.Start(ctx))

Expand Down Expand Up @@ -293,7 +297,8 @@ func TestMercuryTrigger_ConfigValidation(t *testing.T) {
return newConfig(t, []string{feedID}, 1000)
}

ts := NewMercuryTriggerService(1000, logger.Nop())
ts, err := NewMercuryTriggerService(1000, "", "4.5.6", logger.Nop())
require.NoError(t, err)
rawConf := newConfigSingleFeed(t, "012345678901234567890123456789012345678901234567890123456789000000")
conf, err := ts.ValidateConfig(rawConf)
require.Error(t, err)
Expand Down Expand Up @@ -355,7 +360,7 @@ func TestMercuryTrigger_WrapReports(t *testing.T) {
ObservationTimestamp: 876543,
})
}
wrapped, err := wrapReports(reportList, "event_id", 1234, meta)
wrapped, err := wrapReports(reportList, "event_id", 1234, meta, triggerID)
require.NoError(t, err)
require.NotNil(t, wrapped.Event)
require.Len(t, wrapped.Event.Outputs.Underlying["Payload"].(*values.List).Underlying, P)
Expand Down

0 comments on commit 44ef01d

Please sign in to comment.