Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Event Trigger Capability Development: Part 1 #14308

Merged
merged 45 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a93f9e9
Log Event Trigger Capability
kidambisrinivas Sep 2, 2024
072214d
Minor refactoring
kidambisrinivas Sep 2, 2024
833c0c1
Moved main script to plugins/cmd
kidambisrinivas Sep 2, 2024
73644e6
Added initial implementation for UnregisterTrigger
kidambisrinivas Sep 2, 2024
0ff783b
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 5, 2024
1b617a6
Create NewContractReader in RegisterTrigger flow of the trigger capab…
kidambisrinivas Sep 9, 2024
b09646b
Refactoring to integrate with ChainReader QueryKey API
kidambisrinivas Sep 10, 2024
d90d860
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 10, 2024
16a87f4
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 13, 2024
a455565
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 16, 2024
4e49c39
Integrate with ChainReader QueryKey interface
kidambisrinivas Sep 17, 2024
d89020e
Minor changes
kidambisrinivas Sep 17, 2024
2967430
Send cursor in QueryKey in subsequent calls
kidambisrinivas Sep 18, 2024
5e3dec0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 25, 2024
75904af
Test utils for LOOP capability
kidambisrinivas Sep 25, 2024
c26d024
Happy path test for log event trigger capability
kidambisrinivas Sep 26, 2024
3880a5c
Float64 fix in values
kidambisrinivas Sep 26, 2024
60976f7
Happy path integration test for Log Event Trigger Capability
kidambisrinivas Sep 26, 2024
a4a7f0e
Fix code lint annotations
kidambisrinivas Sep 26, 2024
1cc9ce0
Addressed PR comments
kidambisrinivas Sep 27, 2024
d4c5efc
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
4a56bb5
Added changeset
kidambisrinivas Sep 30, 2024
344f6eb
Addressed Lint errors
kidambisrinivas Sep 30, 2024
3c629e4
Addressed PR comments
kidambisrinivas Sep 30, 2024
842da5a
Addressed more lint issues
kidambisrinivas Sep 30, 2024
17b6c1a
Simplified trigger ctx creation and cancel flows
kidambisrinivas Sep 30, 2024
2786cef
Added comment
kidambisrinivas Sep 30, 2024
31954ea
Addressed PR comments
kidambisrinivas Sep 30, 2024
c48b80d
Implemented Start/Close pattern in logEventTrigger and used stopChan …
kidambisrinivas Sep 30, 2024
33bed46
Addressed more PR comments
kidambisrinivas Sep 30, 2024
48c6728
Handled errors from Info and Close methods
kidambisrinivas Sep 30, 2024
3be3488
Fixed lint errors and pass ctx to Info
kidambisrinivas Sep 30, 2024
de4485c
Handle race conditions in log event trigger service
kidambisrinivas Sep 30, 2024
ecf40eb
Fixed lint errors
kidambisrinivas Sep 30, 2024
6e452f7
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
e23f0d1
Minor change
kidambisrinivas Sep 30, 2024
017c1f7
Test fix and lint fixes
kidambisrinivas Sep 30, 2024
d70aff4
Move EVM specific tests out of chain-agnostic capability
kidambisrinivas Sep 30, 2024
97e18b8
Set block time
kidambisrinivas Sep 30, 2024
7f316b0
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
723b872
Check existence of trigger in slow path
kidambisrinivas Sep 30, 2024
9b6d66b
Complete usage of services.Service with StartOnce and StopOnce with t…
kidambisrinivas Sep 30, 2024
ecd045e
Wait for all goroutines to exit in test
kidambisrinivas Sep 30, 2024
7562bc1
Merge branch 'develop' into CM-378-log-event-trigger
kidambisrinivas Sep 30, 2024
cd33c86
Cleanup logpoller and headtracker after test
kidambisrinivas Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions core/capabilities/log_event_trigger/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package log_event_trigger

import (
"fmt"
"sync"
)

type NewCapabilityFn[T any, Resp any] func() (T, chan Resp)

// Interface of the capabilities store
type CapabilitiesStore[T any, Resp any] interface {
Read(capabilityID string) (value T, ok bool)
ReadAll() (values map[string]T)
Write(capabilityID string, value T)
InsertIfNotExists(capabilityID string, fn NewCapabilityFn[T, Resp]) (chan Resp, error)
Delete(capabilityID string)
}

// Implementation for the CapabilitiesStore interface
type capabilitiesStore[T any, Resp any] struct {
mu sync.RWMutex
capabilities map[string]T
}

var _ CapabilitiesStore[string, string] = (CapabilitiesStore[string, string])(nil)

// Constructor for capabilitiesStore struct implementing CapabilitiesStore interface
func NewCapabilitiesStore[T any, Resp any]() CapabilitiesStore[T, Resp] {
return &capabilitiesStore[T, Resp]{
capabilities: map[string]T{},
}
}

func (cs *capabilitiesStore[T, Resp]) Read(capabilityID string) (value T, ok bool) {
cs.mu.RLock()
defer cs.mu.RUnlock()
trigger, ok := cs.capabilities[capabilityID]
return trigger, ok
}

func (cs *capabilitiesStore[T, Resp]) ReadAll() (values map[string]T) {
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.capabilities
}

func (cs *capabilitiesStore[T, Resp]) Write(capabilityID string, value T) {
cs.mu.Lock()
defer cs.mu.Unlock()
cs.capabilities[capabilityID] = value
}

func (cs *capabilitiesStore[T, Resp]) InsertIfNotExists(capabilityID string, fn NewCapabilityFn[T, Resp]) (chan Resp, error) {
cs.mu.Lock()
defer cs.mu.Unlock()
if _, ok := cs.capabilities[capabilityID]; ok {
return nil, fmt.Errorf("capabilityID %v already exists", capabilityID)
}
value, respCh := fn()
cs.capabilities[capabilityID] = value
return respCh, nil
}

func (cs *capabilitiesStore[T, Resp]) Delete(capabilityID string) {
cs.mu.Lock()
defer cs.mu.Unlock()
delete(cs.capabilities, capabilityID)
}
132 changes: 132 additions & 0 deletions core/capabilities/log_event_trigger/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package log_event_trigger

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

const ID = "cron-trigger@1.0.0"
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved

const defaultSendChannelBufferSize = 1000

var logEventTriggerInfo = capabilities.MustNewCapabilityInfo(
ID,
capabilities.CapabilityTypeTrigger,
"A trigger that listens for specific contract log events and starts a workflow run.",
)

// Log Event Trigger Capability Config
type Config struct {
}

// Log Event Trigger Capability Payload
type Payload struct {
// Time that Log Event Trigger's task execution occurred (RFC3339Nano formatted)
ActualExecutionTime string
}

// Log Event Trigger Capability Response
type Response struct {
capabilities.TriggerEvent
Metadata struct{}
Payload Payload
}

type logEventTrigger struct {
ch chan<- capabilities.TriggerResponse
}

// Log Event Trigger Capabilities Manager
// Manages different log event triggers using an underlying triggerStore
type LogEventTriggerService struct {
capabilities.CapabilityInfo
capabilities.Validator[Config, struct{}, capabilities.TriggerResponse]
lggr logger.Logger
triggers CapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]
}

type Params struct {
Logger logger.Logger
}

var _ capabilities.TriggerCapability = (*LogEventTriggerService)(nil)
var _ services.Service = &LogEventTriggerService{}

// Creates a new Cron Trigger Service.
// Scheduling will commence on calling .Start()
func NewLogEventTriggerService(p Params) *LogEventTriggerService {
l := logger.Named(p.Logger, "Log Event Trigger Capability Service")

logEventStore := NewCapabilitiesStore[logEventTrigger, capabilities.TriggerResponse]()

return &LogEventTriggerService{
CapabilityInfo: logEventTriggerInfo,
lggr: l,
triggers: logEventStore,
}
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (s *LogEventTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
if req.Config == nil {
return nil, errors.New("config is required to register a log event trigger")
}
_, err := s.ValidateConfig(req.Config)
if err != nil {
return nil, err
}
respCh, err := s.triggers.InsertIfNotExists(req.TriggerID, func() (logEventTrigger, chan capabilities.TriggerResponse) {
callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
return logEventTrigger{
ch: callbackCh,
}, callbackCh
})
if err != nil {
return nil, fmt.Errorf("log_event_trigger %v", err)
}
s.lggr.Debugw("log_event_trigger::RegisterTrigger", "triggerId", req.TriggerID)
kidambisrinivas marked this conversation as resolved.
Show resolved Hide resolved
return respCh, nil
}

func (s *LogEventTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
trigger, ok := s.triggers.Read(req.TriggerID)
if !ok {
return fmt.Errorf("triggerId %s not found", req.TriggerID)
}
// Close callback channel
close(trigger.ch)
// Remove from triggers context
s.triggers.Delete(req.TriggerID)
s.lggr.Debugw("log_event_trigger::UnregisterTrigger", "triggerId", req.TriggerID)
return nil
}

// Start the service.
func (s *LogEventTriggerService) Start(ctx context.Context) error {
return nil
}

// Close stops the Service.
// After this call the Service cannot be started again,
// The service will need to be re-built to start scheduling again.
func (s *LogEventTriggerService) Close() error {
return nil
}

func (s *LogEventTriggerService) Ready() error {
return nil
}

func (s *LogEventTriggerService) HealthReport() map[string]error {
return map[string]error{s.Name(): nil}
}

func (s *LogEventTriggerService) Name() string {
return "Service"
}
99 changes: 99 additions & 0 deletions plugins/cmd/capabilities/log-event-trigger/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"context"
"fmt"

"github.com/hashicorp/go-plugin"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/log_event_trigger"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
)

const (
serviceName = "LogEventTriggerCapability"
)

type LogEventTriggerGRPCService struct {
trigger capabilities.TriggerCapability
s *loop.Server
}

func main() {
s := loop.MustNewStartedServer(serviceName)
defer s.Stop()

s.Logger.Infof("Starting %s", serviceName)

stopCh := make(chan struct{})
defer close(stopCh)

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.StandardCapabilitiesHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginStandardCapabilitiesName: &loop.StandardCapabilitiesLoop{
PluginServer: &LogEventTriggerGRPCService{
s: s,
},
BrokerConfig: loop.BrokerConfig{Logger: s.Logger, StopCh: stopCh, GRPCOpts: s.GRPCOpts},
},
},
GRPCServer: s.GRPCOpts.NewServer,
})
}

func (cs *LogEventTriggerGRPCService) Start(ctx context.Context) error {
return nil
}

func (cs *LogEventTriggerGRPCService) Close() error {
return nil
}

func (cs *LogEventTriggerGRPCService) Ready() error {
return nil
}

func (cs *LogEventTriggerGRPCService) HealthReport() map[string]error {
return nil
}

func (cs *LogEventTriggerGRPCService) Name() string {
return serviceName
}

func (cs *LogEventTriggerGRPCService) Infos(ctx context.Context) ([]capabilities.CapabilityInfo, error) {
triggerInfo, err := cs.trigger.Info(ctx)
if err != nil {
return nil, err
}

return []capabilities.CapabilityInfo{
triggerInfo,
}, nil
}

func (cs *LogEventTriggerGRPCService) Initialise(
ctx context.Context,
config string,
telemetryService core.TelemetryService,
store core.KeyValueStore,
capabilityRegistry core.CapabilitiesRegistry,
errorLog core.ErrorLog,
pipelineRunner core.PipelineRunnerService,
relayerSet core.RelayerSet,
) error {
cs.s.Logger.Debugf("Initialising %s", serviceName)
cs.trigger = log_event_trigger.NewLogEventTriggerService(log_event_trigger.Params{
Logger: cs.s.Logger,
})

if err := capabilityRegistry.Add(ctx, cs.trigger); err != nil {
return fmt.Errorf("error when adding cron trigger to the registry: %w", err)
}

return nil
}
Loading