From db7e8af4842030f0e9ec7269d53c1cf41174ab3f Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 30 Oct 2023 12:12:52 -0500 Subject: [PATCH] pkg/loop: plugins report health to host --- pkg/loop/internal/median.go | 1 + pkg/loop/internal/relayer.go | 12 ++--- pkg/loop/internal/reporting_plugin_service.go | 1 + pkg/loop/internal/service.go | 5 ++- pkg/loop/internal/test/config.go | 13 +++--- pkg/loop/internal/test/median.go | 45 +++++++++++++++---- pkg/loop/internal/test/plugin_provider.go | 10 +++-- pkg/loop/internal/test/relayer.go | 34 ++++++++++++-- pkg/loop/internal/test/reporting_plugin.go | 45 ++++++++++++++++++- pkg/loop/internal/types.go | 2 + pkg/loop/median_service_test.go | 5 +-- pkg/loop/process_test.go | 9 ++-- pkg/loop/relayer_service_test.go | 6 ++- pkg/loop/reportingplugins/grpc.go | 28 ++++++++---- pkg/types/provider_median.go | 5 ++- pkg/types/reporting_plugin_service.go | 4 ++ 16 files changed, 177 insertions(+), 48 deletions(-) diff --git a/pkg/loop/internal/median.go b/pkg/loop/internal/median.go index 5924027b94..621758c9df 100644 --- a/pkg/loop/internal/median.go +++ b/pkg/loop/internal/median.go @@ -107,6 +107,7 @@ type pluginMedianServer struct { } func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl)) return nil } diff --git a/pkg/loop/internal/relayer.go b/pkg/loop/internal/relayer.go index 86824d5596..3fb6d690be 100644 --- a/pkg/loop/internal/relayer.go +++ b/pkg/loop/internal/relayer.go @@ -19,14 +19,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil) type PluginRelayerClient struct { *pluginClient + *serviceClient - grpc pb.PluginRelayerClient + pluginRelayer pb.PluginRelayerClient } func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient { brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient") pc := newPluginClient(broker, brokerCfg, conn) - return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)} + return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)} } func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) { @@ -40,7 +41,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key } deps.Add(ksRes) - reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{ + reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{ Config: config, KeystoreID: id, }) @@ -61,6 +62,7 @@ type pluginRelayerServer struct { } func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl)) return nil } @@ -436,14 +438,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck) } -func healthReport(s map[string]string) (hr map[string]error) { +func healthReport(prefix string, s map[string]string) (hr map[string]error) { hr = make(map[string]error, len(s)) for n, e := range s { var err error if e != "" { err = errors.New(e) } - hr[n] = err + hr[prefix+"."+n] = err } return hr } diff --git a/pkg/loop/internal/reporting_plugin_service.go b/pkg/loop/internal/reporting_plugin_service.go index 388b41403d..dc754aaf80 100644 --- a/pkg/loop/internal/reporting_plugin_service.go +++ b/pkg/loop/internal/reporting_plugin_service.go @@ -79,6 +79,7 @@ type reportingPluginServiceServer struct { } func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl)) return nil } diff --git a/pkg/loop/internal/service.go b/pkg/loop/internal/service.go index fbbb047c50..78d4975029 100644 --- a/pkg/loop/internal/service.go +++ b/pkg/loop/internal/service.go @@ -56,11 +56,12 @@ func (s *serviceClient) HealthReport() map[string]error { ctx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() + name := s.Name() reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{}) if err != nil { - return map[string]error{s.b.Logger.Name(): err} + return map[string]error{name: err} } - hr := healthReport(reply.HealthReport) + hr := healthReport(name, reply.HealthReport) hr[s.b.Logger.Name()] = nil return hr } diff --git a/pkg/loop/internal/test/config.go b/pkg/loop/internal/test/config.go index fa01e8446d..b2340a4b6d 100644 --- a/pkg/loop/internal/test/config.go +++ b/pkg/loop/internal/test/config.go @@ -10,18 +10,21 @@ import ( "github.com/stretchr/testify/assert" ) -type staticConfigProvider struct{} +type staticConfigProvider struct { + name string +} -// TODO validate start/Close calls? func (s staticConfigProvider) Start(ctx context.Context) error { return nil } func (s staticConfigProvider) Close() error { return nil } -func (s staticConfigProvider) Ready() error { panic("unimplemented") } +func (s staticConfigProvider) Ready() error { return nil } -func (s staticConfigProvider) Name() string { panic("unimplemented") } +func (s staticConfigProvider) Name() string { return s.name + ".staticConfigProvider" } -func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") } +func (s staticConfigProvider) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} diff --git a/pkg/loop/internal/test/median.go b/pkg/loop/internal/test/median.go index 925309a2c3..7d50e3295c 100644 --- a/pkg/loop/internal/test/median.go +++ b/pkg/loop/internal/test/median.go @@ -15,6 +15,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-relay/pkg/services" + "github.com/smartcontractkit/chainlink-relay/pkg/services/srvctest" "github.com/smartcontractkit/chainlink-relay/pkg/types" "github.com/smartcontractkit/chainlink-relay/pkg/utils/tests" ) @@ -34,6 +36,11 @@ func (m PluginMedianTest) TestPluginMedian(t *testing.T, p types.PluginMedian) { require.NoError(t, err) TestReportingPluginFactory(t, factory) + srvctest.AssertHealthReportNames(t, p.HealthReport(), + "PluginMedianClient", + "PluginMedianClient.StaticPluginMedian", + "PluginMedianClient.StaticPluginMedian.staticPluginFactory", + ) }) } @@ -68,6 +75,20 @@ func TestReportingPluginFactory(t *testing.T, factory types.ReportingPluginFacto type StaticPluginMedian struct{} +func (s StaticPluginMedian) Name() string { return "StaticPluginMedian" } + +func (s StaticPluginMedian) Start(ctx context.Context) error { return nil } + +func (s StaticPluginMedian) Close() error { return nil } + +func (s StaticPluginMedian) Ready() error { return nil } + +func (s StaticPluginMedian) HealthReport() map[string]error { + hp := map[string]error{s.Name(): s.Ready()} + services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport()) + return hp +} + func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() @@ -218,20 +239,24 @@ func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types if err := errorLog.SaveError(ctx, errMsg); err != nil { return nil, fmt.Errorf("failed to save error: %w", err) } - return staticPluginFactory{}, nil + return staticPluginFactory{s.Name()}, nil } -type staticPluginFactory struct{} +type staticPluginFactory struct { + name string +} -func (s staticPluginFactory) Name() string { panic("implement me") } +func (s staticPluginFactory) Name() string { return s.name + ".staticPluginFactory" } func (s staticPluginFactory) Start(ctx context.Context) error { return nil } func (s staticPluginFactory) Close() error { return nil } -func (s staticPluginFactory) Ready() error { panic("implement me") } +func (s staticPluginFactory) Ready() error { return nil } -func (s staticPluginFactory) HealthReport() map[string]error { panic("implement me") } +func (s staticPluginFactory) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) { if config.ConfigDigest != reportingPluginConfig.ConfigDigest { @@ -273,7 +298,9 @@ func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginCon return staticReportingPlugin{}, rpi, nil } -type StaticMedianProvider struct{} +type StaticMedianProvider struct { + name string +} func (s StaticMedianProvider) Start(ctx context.Context) error { return nil } @@ -281,9 +308,11 @@ func (s StaticMedianProvider) Close() error { return nil } func (s StaticMedianProvider) Ready() error { panic("unimplemented") } -func (s StaticMedianProvider) Name() string { panic("unimplemented") } +func (s StaticMedianProvider) Name() string { return s.name + ".StaticMedianProvider" } -func (s StaticMedianProvider) HealthReport() map[string]error { panic("unimplemented") } +func (s StaticMedianProvider) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} func (s StaticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} diff --git a/pkg/loop/internal/test/plugin_provider.go b/pkg/loop/internal/test/plugin_provider.go index c9878832f3..bd0d97a686 100644 --- a/pkg/loop/internal/test/plugin_provider.go +++ b/pkg/loop/internal/test/plugin_provider.go @@ -6,7 +6,9 @@ import ( libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) -type StaticPluginProvider struct{} +type StaticPluginProvider struct { + name string +} func (s StaticPluginProvider) Start(ctx context.Context) error { return nil } @@ -14,9 +16,11 @@ func (s StaticPluginProvider) Close() error { return nil } func (s StaticPluginProvider) Ready() error { panic("unimplemented") } -func (s StaticPluginProvider) Name() string { panic("unimplemented") } +func (s StaticPluginProvider) Name() string { return s.name + ".StaticPluginProvider" } -func (s StaticPluginProvider) HealthReport() map[string]error { panic("unimplemented") } +func (s StaticPluginProvider) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} func (s StaticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} diff --git a/pkg/loop/internal/test/relayer.go b/pkg/loop/internal/test/relayer.go index b97278723a..b0db45b43a 100644 --- a/pkg/loop/internal/test/relayer.go +++ b/pkg/loop/internal/test/relayer.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-relay/pkg/services" + "github.com/smartcontractkit/chainlink-relay/pkg/services/srvctest" "github.com/smartcontractkit/chainlink-relay/pkg/types" "github.com/smartcontractkit/chainlink-relay/pkg/utils/tests" ) @@ -35,6 +37,20 @@ func (s StaticKeystore) Sign(ctx context.Context, id string, data []byte) ([]byt type StaticPluginRelayer struct{} +func (s StaticPluginRelayer) Name() string { return "StaticPluginRelayer" } + +func (s StaticPluginRelayer) Start(ctx context.Context) error { return nil } + +func (s StaticPluginRelayer) Close() error { return nil } + +func (s StaticPluginRelayer) Ready() error { return nil } + +func (s StaticPluginRelayer) HealthReport() map[string]error { + hp := map[string]error{s.Name(): s.Ready()} + services.CopyHealth(hp, staticRelayer{s.Name()}.HealthReport()) + return hp +} + func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) { if config != ConfigTOML { return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config) @@ -53,10 +69,12 @@ func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keys if !bytes.Equal(signed, gotSigned) { return nil, fmt.Errorf("expected signed bytes %x but got %x", signed, gotSigned) } - return staticRelayer{}, nil + return staticRelayer{s.Name()}, nil } -type staticRelayer struct{} +type staticRelayer struct { + name string +} func (s staticRelayer) Start(ctx context.Context) error { return nil } @@ -64,10 +82,12 @@ func (s staticRelayer) Close() error { return nil } func (s staticRelayer) Ready() error { return nil } -func (s staticRelayer) Name() string { return "staticRelayer" } +func (s staticRelayer) Name() string { return s.name + ".staticRelayer" } func (s staticRelayer) HealthReport() map[string]error { - return map[string]error{s.Name(): s.Ready()} + hp := map[string]error{s.Name(): s.Ready()} + services.CopyHealth(hp, staticConfigProvider{s.Name()}.HealthReport()) + return hp } func (s staticRelayer) NewConfigProvider(ctx context.Context, r types.RelayArgs) (types.ConfigProvider, error) { if !equalRelayArgs(r, RelayArgs) { @@ -158,6 +178,12 @@ func TestPluginRelayer(t *testing.T, p internal.PluginRelayer) { require.NoError(t, relayer.Start(ctx)) t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) TestRelayer(t, relayer) + srvctest.AssertHealthReportNames(t, p.HealthReport(), + "PluginRelayerClient", + "PluginRelayerClient.StaticPluginRelayer", + "PluginRelayerClient.StaticPluginRelayer.staticRelayer", + "PluginRelayerClient.StaticPluginRelayer.staticRelayer.staticConfigProvider", + ) }) } diff --git a/pkg/loop/internal/test/reporting_plugin.go b/pkg/loop/internal/test/reporting_plugin.go index 8d5f13e32e..f68ecab6a8 100644 --- a/pkg/loop/internal/test/reporting_plugin.go +++ b/pkg/loop/internal/test/reporting_plugin.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-relay/pkg/services" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) @@ -19,6 +20,24 @@ type MockConn struct { const ReportingPluginWithMedianProviderName = "reporting-plugin-with-median-provider" type StaticReportingPluginWithMedianProvider struct { + name string +} + +func (s StaticReportingPluginWithMedianProvider) Start(ctx context.Context) error { return nil } + +func (s StaticReportingPluginWithMedianProvider) Close() error { return nil } + +func (s StaticReportingPluginWithMedianProvider) Ready() error { return nil } + +func (s StaticReportingPluginWithMedianProvider) HealthReport() map[string]error { + hp := map[string]error{s.Name(): s.Ready()} + services.CopyHealth(hp, StaticMedianProvider{s.Name()}.HealthReport()) + services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport()) + return hp +} + +func (s StaticReportingPluginWithMedianProvider) Name() string { + return s.name + ".StaticReportingPluginWithMedianProvider" } func (s StaticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider { @@ -26,6 +45,7 @@ func (s StaticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.Client } func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { + //TODO validate config ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -158,13 +178,34 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c if !reflect.DeepEqual(gotDecoded, onchainConfig) { return nil, fmt.Errorf("expected OnchainConfig %s but got %s", onchainConfig, gotDecoded) } + + //TODO validate pipelineRunner + if err := errorLog.SaveError(ctx, errMsg); err != nil { return nil, fmt.Errorf("failed to save error: %w", err) } - return staticPluginFactory{}, nil + return staticPluginFactory{s.Name()}, nil } type StaticReportingPluginWithPluginProvider struct { + name string +} + +func (s StaticReportingPluginWithPluginProvider) Start(ctx context.Context) error { return nil } + +func (s StaticReportingPluginWithPluginProvider) Close() error { return nil } + +func (s StaticReportingPluginWithPluginProvider) Ready() error { return nil } + +func (s StaticReportingPluginWithPluginProvider) HealthReport() map[string]error { + hp := map[string]error{s.Name(): s.Ready()} + services.CopyHealth(hp, StaticPluginProvider{s.Name()}.HealthReport()) + services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport()) + return hp +} + +func (s StaticReportingPluginWithPluginProvider) Name() string { + return s.name + ".StaticReportingPluginWithPluginProvider" } func (s StaticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider { @@ -244,5 +285,5 @@ func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx c if !reflect.DeepEqual(tr, taskResults) { return nil, fmt.Errorf("expected TaskResults %+v but got %+v", taskResults, tr) } - return staticPluginFactory{}, nil + return staticPluginFactory{s.Name()}, nil } diff --git a/pkg/loop/internal/types.go b/pkg/loop/internal/types.go index f476484724..97540fd7ad 100644 --- a/pkg/loop/internal/types.go +++ b/pkg/loop/internal/types.go @@ -3,10 +3,12 @@ package internal import ( "context" + "github.com/smartcontractkit/chainlink-relay/pkg/services" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) type PluginRelayer interface { + services.Service NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) } diff --git a/pkg/loop/median_service_test.go b/pkg/loop/median_service_test.go index 961a4786d8..763be121e6 100644 --- a/pkg/loop/median_service_test.go +++ b/pkg/loop/median_service_test.go @@ -52,11 +52,10 @@ func TestMedianService_recovery(t *testing.T) { t.Parallel() var limit atomic.Int32 median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - h := HelperProcessCommand{ + return HelperProcessCommand{ Command: loop.PluginMedianName, Limit: int(limit.Add(1)), - } - return h.New() + }.New() }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) require.NoError(t, median.Start(tests.Context(t))) t.Cleanup(func() { assert.NoError(t, median.Close()) }) diff --git a/pkg/loop/process_test.go b/pkg/loop/process_test.go index cf009fd993..b0c1d3ec41 100644 --- a/pkg/loop/process_test.go +++ b/pkg/loop/process_test.go @@ -8,14 +8,13 @@ import ( type HelperProcessCommand test.HelperProcessCommand -func (h *HelperProcessCommand) New() *exec.Cmd { +func (h HelperProcessCommand) New() *exec.Cmd { h.CommandLocation = "./internal/test/cmd/main.go" - return (test.HelperProcessCommand)(*h).New() + return (test.HelperProcessCommand)(h).New() } func NewHelperProcessCommand(command string) *exec.Cmd { - h := HelperProcessCommand{ + return HelperProcessCommand{ Command: command, - } - return h.New() + }.New() } diff --git a/pkg/loop/relayer_service_test.go b/pkg/loop/relayer_service_test.go index 69b40d1a9c..8185a9dbef 100644 --- a/pkg/loop/relayer_service_test.go +++ b/pkg/loop/relayer_service_test.go @@ -68,7 +68,7 @@ func TestRelayerService_recovery(t *testing.T) { func TestRelayerService_HealthReport(t *testing.T) { lggr := logger.Named(logger.Test(t), "Foo") s := loop.NewRelayerService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(loop.PluginRelayerName) + return HelperProcessCommand{Command: loop.PluginRelayerName}.New() }, test.ConfigTOML, test.StaticKeystore{}) srvctest.AssertHealthReportNames(t, s.HealthReport(), @@ -82,5 +82,7 @@ func TestRelayerService_HealthReport(t *testing.T) { srvctest.AssertHealthReportNames(t, s.HealthReport(), "Foo.RelayerService", "Foo.RelayerService.PluginRelayerClient.ChainRelayerClient", - "staticRelayer") + "Foo.RelayerService.PluginRelayerClient.ChainRelayerClient.StaticPluginRelayer.staticRelayer", + "Foo.RelayerService.PluginRelayerClient.ChainRelayerClient.StaticPluginRelayer.staticRelayer.staticConfigProvider", + ) } diff --git a/pkg/loop/reportingplugins/grpc.go b/pkg/loop/reportingplugins/grpc.go index e192a7ef6c..bb4bce90dd 100644 --- a/pkg/loop/reportingplugins/grpc.go +++ b/pkg/loop/reportingplugins/grpc.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-relay/pkg/services" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) @@ -39,18 +40,29 @@ type GRPCService[T types.PluginProvider] struct { pluginClient *internal.ReportingPluginServiceClient } -type serverAdapter func(context.Context, types.ReportingPluginServiceConfig, grpc.ClientConnInterface, types.PipelineRunnerService, types.ErrorLog) (types.ReportingPluginFactory, error) +type serverAdapter[T types.PluginProvider] struct { + services.StateMachine + BrokerConfig loop.BrokerConfig + ProviderServer ProviderServer[T] + GRPCBroker *plugin.GRPCBroker +} + +func (s *serverAdapter[T]) Start(ctx context.Context) error { return s.ProviderServer.Start(ctx) } + +func (s *serverAdapter[T]) Close() error { return s.ProviderServer.Close() } -func (s serverAdapter) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, conn grpc.ClientConnInterface, pr types.PipelineRunnerService, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { - return s(ctx, config, conn, pr, errorLog) +func (s *serverAdapter[T]) HealthReport() map[string]error { return s.ProviderServer.HealthReport() } + +func (s *serverAdapter[T]) Name() string { return s.ProviderServer.Name() } + +func (s *serverAdapter[T]) NewReportingPluginFactory(ctx context.Context, cfg types.ReportingPluginServiceConfig, conn grpc.ClientConnInterface, pr types.PipelineRunnerService, el types.ErrorLog) (types.ReportingPluginFactory, error) { + provider := s.ProviderServer.ConnToProvider(conn, s.GRPCBroker, s.BrokerConfig) + return s.ProviderServer.NewReportingPluginFactory(ctx, cfg, provider, pr, el) } func (g *GRPCService[T]) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { - adapter := func(ctx context.Context, cfg types.ReportingPluginServiceConfig, conn grpc.ClientConnInterface, pr types.PipelineRunnerService, el types.ErrorLog) (types.ReportingPluginFactory, error) { - provider := g.PluginServer.ConnToProvider(conn, broker, g.BrokerConfig) - return g.PluginServer.NewReportingPluginFactory(ctx, cfg, provider, pr, el) - } - return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, serverAdapter(adapter)) + impl := &serverAdapter[T]{BrokerConfig: g.BrokerConfig, ProviderServer: g.PluginServer, GRPCBroker: broker} + return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, impl) } // GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.PluginClient], updated with the new broker and conn. diff --git a/pkg/types/provider_median.go b/pkg/types/provider_median.go index 29010d82d7..df42a3cb7d 100644 --- a/pkg/types/provider_median.go +++ b/pkg/types/provider_median.go @@ -5,6 +5,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-relay/pkg/services" ) // MedianProvider provides all components needed for a median OCR2 plugin. @@ -16,11 +18,12 @@ type MedianProvider interface { } type PluginMedian interface { + services.Service // NewMedianFactory returns a new ReportingPluginFactory. If provider implements GRPCClientConn, it can be forwarded efficiently via proxy. NewMedianFactory(ctx context.Context, provider MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog ErrorLog) (ReportingPluginFactory, error) } type ReportingPluginFactory interface { - Service + services.Service libocr.ReportingPluginFactory } diff --git a/pkg/types/reporting_plugin_service.go b/pkg/types/reporting_plugin_service.go index dd4efad018..d35525bda5 100644 --- a/pkg/types/reporting_plugin_service.go +++ b/pkg/types/reporting_plugin_service.go @@ -4,6 +4,8 @@ import ( "context" "google.golang.org/grpc" + + "github.com/smartcontractkit/chainlink-relay/pkg/services" ) type ReportingPluginServiceConfig struct { @@ -16,6 +18,7 @@ type ReportingPluginServiceConfig struct { // ReportingPluginClient is the client interface to a plugin running // as a generic job (job type = GenericPlugin) inside the core node. type ReportingPluginClient interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, grpcProvider grpc.ClientConnInterface, pipelineRunner PipelineRunnerService, errorLog ErrorLog) (ReportingPluginFactory, error) } @@ -24,5 +27,6 @@ type ReportingPluginClient interface { // with the passthrough provider connection converted to the provider // expected by the plugin. type ReportingPluginServer[T PluginProvider] interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, provider T, pipelineRunner PipelineRunnerService, errorLog ErrorLog) (ReportingPluginFactory, error) }