From 86072cdfd843e30f17d35aeb17202d38395601b9 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 30 Oct 2023 12:12:52 -0500 Subject: [PATCH] pkg/loop: plugins report health to host --- pkg/logger/logger.go | 7 + pkg/loop/internal/median.go | 5 + pkg/loop/internal/plugin_service.go | 28 +- pkg/loop/internal/relayer.go | 14 +- pkg/loop/internal/reporting_plugin_service.go | 5 + pkg/loop/internal/service.go | 6 +- pkg/loop/internal/test/cmd/main.go | 8 +- pkg/loop/internal/test/config.go | 18 +- pkg/loop/internal/test/median.go | 74 ++-- pkg/loop/internal/test/plugin_provider.go | 27 +- pkg/loop/internal/test/relayer.go | 315 +++++++++--------- pkg/loop/internal/test/reporting_plugin.go | 48 ++- pkg/loop/internal/test/test.go | 28 ++ pkg/loop/internal/types.go | 2 + pkg/loop/median_service.go | 12 +- pkg/loop/median_service_test.go | 16 +- pkg/loop/plugin_median.go | 1 + pkg/loop/plugin_median_test.go | 7 +- pkg/loop/plugin_relayer.go | 1 + pkg/loop/plugin_relayer_test.go | 3 +- pkg/loop/process_test.go | 9 +- pkg/loop/relayer_service.go | 13 +- pkg/loop/relayer_service_test.go | 69 +++- pkg/loop/reportingplugins/grpc.go | 45 ++- pkg/loop/reportingplugins/grpc_test.go | 4 +- pkg/loop/reportingplugins/loopp_service.go | 12 +- pkg/loop/standalone_provider_test.go | 3 +- pkg/services/servicetest/health.go | 4 +- pkg/services/servicetest/run.go | 4 +- pkg/types/provider_median.go | 5 +- pkg/types/reporting_plugin_service.go | 4 + 31 files changed, 491 insertions(+), 306 deletions(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 5c57c702e..1ee7e2507 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -142,6 +142,13 @@ func With(l Logger, keyvals ...interface{}) Logger { // Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l. func Named(l Logger, n string) Logger { + l = named(l, n) + if testing.Testing() { + l.Debugf("New logger: %s", n) + } + return l +} +func named(l Logger, n string) Logger { switch t := l.(type) { case *logger: return t.named(n) diff --git a/pkg/loop/internal/median.go b/pkg/loop/internal/median.go index 474c303ab..e1a84a8ec 100644 --- a/pkg/loop/internal/median.go +++ b/pkg/loop/internal/median.go @@ -108,6 +108,7 @@ type pluginMedianServer struct { } func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl)) return nil } @@ -153,6 +154,10 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N m.closeAll(dsRes, juelsRes, providerRes, errorLogRes) return nil, err } + if err = factory.Start(ctx); err != nil { + m.closeAll(dsRes, juelsRes, providerRes, errorLogRes) + return nil, err + } id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: factory}) diff --git a/pkg/loop/internal/plugin_service.go b/pkg/loop/internal/plugin_service.go index 09de253bf..5a53fdcf6 100644 --- a/pkg/loop/internal/plugin_service.go +++ b/pkg/loop/internal/plugin_service.go @@ -43,20 +43,30 @@ type PluginService[P grpcPlugin, S services.Service] struct { client *plugin.Client clientProtocol plugin.ClientProtocol - newService func(context.Context, any) (S, error) + newService func(context.Context, any) (S, services.HealthReporter, error) serviceCh chan struct{} // closed when service is available Service S + Health services.HealthReporter //TODO may or may not be the same as Service? testInterrupt chan func(*PluginService[P, S]) // tests only (via TestHook) to enable access to internals without racing } -func (s *PluginService[P, S]) Init(pluginName string, p P, newService func(context.Context, any) (S, error), lggr logger.Logger, cmd func() *exec.Cmd, stopCh chan struct{}) { +type NewService[S any] func(context.Context, any) (S, services.HealthReporter, error) + +func (s *PluginService[P, S]) Init( + pluginName string, + grpcPlug P, + newService NewService[S], + lggr logger.Logger, + cmd func() *exec.Cmd, + stopCh chan struct{}, +) { s.pluginName = pluginName s.lggr = lggr s.cmd = cmd s.stopCh = stopCh - s.grpcPlug = p + s.grpcPlug = grpcPlug s.newService = newService s.serviceCh = make(chan struct{}) } @@ -140,7 +150,7 @@ func (s *PluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, e case <-s.serviceCh: // s.service already set default: - s.Service, err = s.newService(ctx, i) + s.Service, s.Health, err = s.newService(ctx, i) if err != nil { abort() return nil, nil, fmt.Errorf("failed to create service: %w", err) @@ -173,7 +183,7 @@ func (s *PluginService[P, S]) HealthReport() map[string]error { select { case <-s.serviceCh: hr := map[string]error{s.Name(): s.Healthy()} - services.CopyHealth(hr, s.Service.HealthReport()) + services.CopyHealth(hr, s.Health.HealthReport()) return hr default: return map[string]error{s.Name(): ErrPluginUnavailable} @@ -187,7 +197,7 @@ func (s *PluginService[P, S]) Close() error { select { case <-s.serviceCh: - if cerr := s.Service.Close(); !errors.Is(cerr, context.Canceled) && status.Code(cerr) != codes.Canceled { + if cerr := s.Service.Close(); !isCanceled(cerr) { err = errors.Join(err, cerr) } default: @@ -199,7 +209,7 @@ func (s *PluginService[P, S]) Close() error { func (s *PluginService[P, S]) closeClient() (err error) { if s.clientProtocol != nil { - if cerr := s.clientProtocol.Close(); !errors.Is(cerr, context.Canceled) { + if cerr := s.clientProtocol.Close(); !isCanceled(cerr) { err = cerr } } @@ -256,3 +266,7 @@ func (ch TestPluginService[P, S]) Reset() { } <-done } + +func isCanceled(err error) bool { + return errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled +} diff --git a/pkg/loop/internal/relayer.go b/pkg/loop/internal/relayer.go index 5920b371a..c30fae13b 100644 --- a/pkg/loop/internal/relayer.go +++ b/pkg/loop/internal/relayer.go @@ -21,14 +21,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil) type PluginRelayerClient struct { *pluginClient + *serviceClient - grpc pb.PluginRelayerClient + pluginRelayer pb.PluginRelayerClient } func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient { brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient") pc := newPluginClient(broker, brokerCfg, conn) - return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)} + return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)} } func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) { @@ -42,7 +43,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key } deps.Add(ksRes) - reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{ + reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{ Config: config, KeystoreID: id, }) @@ -63,6 +64,7 @@ type pluginRelayerServer struct { } func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl)) return nil } @@ -163,7 +165,7 @@ type relayerClient struct { } func newRelayerClient(b *brokerExt, conn grpc.ClientConnInterface) *relayerClient { - b = b.withName("ChainRelayerClient") + b = b.withName("RelayerClient") return &relayerClient{b, newServiceClient(b, conn), pb.NewRelayerClient(conn)} } @@ -439,14 +441,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck) } -func healthReport(s map[string]string) (hr map[string]error) { +func healthReport(prefix string, s map[string]string) (hr map[string]error) { hr = make(map[string]error, len(s)) for n, e := range s { var err error if e != "" { err = errors.New(e) } - hr[n] = err + hr[prefix+"."+n] = err } return hr } diff --git a/pkg/loop/internal/reporting_plugin_service.go b/pkg/loop/internal/reporting_plugin_service.go index 953d97dc6..0fa5ea424 100644 --- a/pkg/loop/internal/reporting_plugin_service.go +++ b/pkg/loop/internal/reporting_plugin_service.go @@ -96,6 +96,7 @@ type reportingPluginServiceServer struct { } func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl)) return nil } @@ -148,6 +149,10 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes) return nil, err } + if err = factory.Start(ctx); err != nil { + m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes) + return nil, err + } id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: factory}) diff --git a/pkg/loop/internal/service.go b/pkg/loop/internal/service.go index 6240f8ffd..09af732cc 100644 --- a/pkg/loop/internal/service.go +++ b/pkg/loop/internal/service.go @@ -27,6 +27,7 @@ func newServiceClient(b *brokerExt, cc grpc.ClientConnInterface) *serviceClient } func (s *serviceClient) Start(ctx context.Context) error { + //TODO reconsider? return nil // no-op: server side starts automatically } @@ -56,11 +57,12 @@ func (s *serviceClient) HealthReport() map[string]error { ctx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() + name := s.Name() reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{}) if err != nil { - return map[string]error{s.b.Logger.Name(): err} + return map[string]error{name: err} } - hr := healthReport(reply.HealthReport) + hr := healthReport(name, reply.HealthReport) hr[s.b.Logger.Name()] = nil return hr } diff --git a/pkg/loop/internal/test/cmd/main.go b/pkg/loop/internal/test/cmd/main.go index 4b6466456..37ad1e89a 100644 --- a/pkg/loop/internal/test/cmd/main.go +++ b/pkg/loop/internal/test/cmd/main.go @@ -58,7 +58,7 @@ func main() { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginRelayerHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, + loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) @@ -68,7 +68,7 @@ func main() { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginMedianHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, + loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) @@ -89,7 +89,7 @@ func main() { HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(), Plugins: map[string]plugin.Plugin{ reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.PluginProvider]{ - PluginServer: test.StaticReportingPluginWithPluginProvider{}, + PluginServer: test.NewStaticReportingPluginWithPluginProvider(lggr), BrokerConfig: loop.BrokerConfig{ Logger: lggr, StopCh: stopCh, @@ -105,7 +105,7 @@ func main() { HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(), Plugins: map[string]plugin.Plugin{ reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.MedianProvider]{ - PluginServer: test.StaticReportingPluginWithMedianProvider{}, + PluginServer: test.NewStaticReportingPluginWithMedianProvider(lggr), BrokerConfig: loop.BrokerConfig{ Logger: lggr, StopCh: stopCh, diff --git a/pkg/loop/internal/test/config.go b/pkg/loop/internal/test/config.go index 635121c96..5bb9f1cbb 100644 --- a/pkg/loop/internal/test/config.go +++ b/pkg/loop/internal/test/config.go @@ -10,21 +10,17 @@ import ( "github.com/stretchr/testify/assert" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types" ) -type staticConfigProvider struct{} - -// TODO validate start/Close calls? -func (s staticConfigProvider) Start(ctx context.Context) error { return nil } - -func (s staticConfigProvider) Close() error { return nil } - -func (s staticConfigProvider) Ready() error { panic("unimplemented") } - -func (s staticConfigProvider) Name() string { panic("unimplemented") } +type staticConfigProvider struct { + staticService +} -func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") } +func newStaticConfigProvider(lggr logger.Logger) staticConfigProvider { + return staticConfigProvider{staticService{lggr: logger.Named(lggr, "staticConfigProvider")}} +} func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} diff --git a/pkg/loop/internal/test/median.go b/pkg/loop/internal/test/median.go index 395770ada..00afc4e78 100644 --- a/pkg/loop/internal/test/median.go +++ b/pkg/loop/internal/test/median.go @@ -15,12 +15,15 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) func PluginMedian(t *testing.T, p types.PluginMedian) { - PluginMedianTest{&StaticMedianProvider{}}.TestPluginMedian(t, p) + PluginMedianTest{&staticMedianProvider{}}.TestPluginMedian(t, p) } type PluginMedianTest struct { @@ -34,6 +37,11 @@ func (m PluginMedianTest) TestPluginMedian(t *testing.T, p types.PluginMedian) { require.NoError(t, err) ReportingPluginFactory(t, factory) + servicetest.AssertHealthReportNames(t, p.HealthReport(), + "PluginMedianClient", + "PluginMedianClient.staticPluginMedian", + "PluginMedianClient.staticPluginMedian.staticPluginFactory", + ) }) } @@ -65,9 +73,21 @@ func ReportingPluginFactory(t *testing.T, factory types.ReportingPluginFactory) }) } -type StaticPluginMedian struct{} +type staticPluginMedian struct { + staticService +} + +func NewStaticPluginMedian(lggr logger.Logger) staticPluginMedian { + return staticPluginMedian{staticService{lggr: logger.Named(lggr, "staticPluginMedian")}} +} + +func (s staticPluginMedian) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp +} -func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func (s staticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { cr := provider.ChainReader() var gotLatestValue map[string]int @@ -229,20 +249,16 @@ func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types if err := errorLog.SaveError(ctx, errMsg); err != nil { return nil, fmt.Errorf("failed to save error: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil } -type staticPluginFactory struct{} - -func (s staticPluginFactory) Name() string { panic("implement me") } - -func (s staticPluginFactory) Start(ctx context.Context) error { return nil } - -func (s staticPluginFactory) Close() error { return nil } - -func (s staticPluginFactory) Ready() error { panic("implement me") } +type staticPluginFactory struct { + staticService +} -func (s staticPluginFactory) HealthReport() map[string]error { panic("implement me") } +func newStaticPluginFactory(lggr logger.Logger) staticPluginFactory { + return staticPluginFactory{staticService{lggr: logger.Named(lggr, "staticPluginFactory")}} +} func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) { if config.ConfigDigest != reportingPluginConfig.ConfigDigest { @@ -284,39 +300,35 @@ func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginCon return staticReportingPlugin{}, rpi, nil } -type StaticMedianProvider struct{} - -func (s StaticMedianProvider) Start(ctx context.Context) error { return nil } - -func (s StaticMedianProvider) Close() error { return nil } - -func (s StaticMedianProvider) Ready() error { panic("unimplemented") } - -func (s StaticMedianProvider) Name() string { panic("unimplemented") } +type staticMedianProvider struct { + staticService +} -func (s StaticMedianProvider) HealthReport() map[string]error { panic("unimplemented") } +func NewStaticMedianProvider(lggr logger.Logger) staticMedianProvider { + return staticMedianProvider{staticService{lggr: logger.Named(lggr, "staticMedianProvider")}} +} -func (s StaticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { +func (s staticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} } -func (s StaticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker { +func (s staticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker { return staticContractConfigTracker{} } -func (s StaticMedianProvider) ContractTransmitter() libocr.ContractTransmitter { +func (s staticMedianProvider) ContractTransmitter() libocr.ContractTransmitter { return staticContractTransmitter{} } -func (s StaticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} } +func (s staticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} } -func (s StaticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} } +func (s staticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} } -func (s StaticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec { +func (s staticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec { return staticOnchainConfigCodec{} } -func (s StaticMedianProvider) ChainReader() types.ChainReader { +func (s staticMedianProvider) ChainReader() types.ChainReader { return staticChainReader{} } diff --git a/pkg/loop/internal/test/plugin_provider.go b/pkg/loop/internal/test/plugin_provider.go index 956be7039..2e7d8f34a 100644 --- a/pkg/loop/internal/test/plugin_provider.go +++ b/pkg/loop/internal/test/plugin_provider.go @@ -1,37 +1,32 @@ package test import ( - "context" - libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types" ) -type StaticPluginProvider struct{} - -func (s StaticPluginProvider) Start(ctx context.Context) error { return nil } - -func (s StaticPluginProvider) Close() error { return nil } - -func (s StaticPluginProvider) Ready() error { panic("unimplemented") } - -func (s StaticPluginProvider) Name() string { panic("unimplemented") } +type staticPluginProvider struct { + staticService +} -func (s StaticPluginProvider) HealthReport() map[string]error { panic("unimplemented") } +func NewStaticPluginProvider(lggr logger.Logger) staticPluginProvider { + return staticPluginProvider{staticService{lggr: logger.Named(lggr, "staticPluginProvider")}} +} -func (s StaticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { +func (s staticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} } -func (s StaticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker { +func (s staticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker { return staticContractConfigTracker{} } -func (s StaticPluginProvider) ContractTransmitter() libocr.ContractTransmitter { +func (s staticPluginProvider) ContractTransmitter() libocr.ContractTransmitter { return staticContractTransmitter{} } -func (s StaticPluginProvider) ChainReader() types.ChainReader { +func (s staticPluginProvider) ChainReader() types.ChainReader { return staticChainReader{} } diff --git a/pkg/loop/internal/test/relayer.go b/pkg/loop/internal/test/relayer.go index ca3bb68b6..eb8600ecb 100644 --- a/pkg/loop/internal/test/relayer.go +++ b/pkg/loop/internal/test/relayer.go @@ -12,7 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) @@ -33,9 +36,21 @@ func (s StaticKeystore) Sign(ctx context.Context, id string, data []byte) ([]byt return signed, nil } -type StaticPluginRelayer struct{} +type staticPluginRelayer struct { + staticService +} + +func NewStaticPluginRelayer(lggr logger.Logger) staticPluginRelayer { + return staticPluginRelayer{staticService{lggr: logger.Named(lggr, "staticPluginRelayer")}} +} -func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) { +func (s staticPluginRelayer) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticRelayer(s.lggr).HealthReport()) + return hp +} + +func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) { if config != ConfigTOML { return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config) } @@ -53,27 +68,30 @@ func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keys if !bytes.Equal(signed, gotSigned) { return nil, fmt.Errorf("expected signed bytes %x but got %x", signed, gotSigned) } - return staticRelayer{}, nil + return newStaticRelayer(s.lggr), nil } -type staticRelayer struct{} - -func (s staticRelayer) Start(ctx context.Context) error { return nil } - -func (s staticRelayer) Close() error { return nil } - -func (s staticRelayer) Ready() error { return nil } +type staticRelayer struct { + staticService +} -func (s staticRelayer) Name() string { return "staticRelayer" } +func newStaticRelayer(lggr logger.Logger) staticRelayer { + return staticRelayer{staticService{lggr: logger.Named(lggr, "staticRelayer")}} +} func (s staticRelayer) HealthReport() map[string]error { - return map[string]error{s.Name(): s.Ready()} + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticConfigProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, NewStaticMedianProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, NewStaticPluginProvider(s.lggr).HealthReport()) + return hp } + func (s staticRelayer) NewConfigProvider(ctx context.Context, r types.RelayArgs) (types.ConfigProvider, error) { if !equalRelayArgs(r, RelayArgs) { return nil, fmt.Errorf("expected relay args:\n\t%v\nbut got:\n\t%v", RelayArgs, r) } - return staticConfigProvider{}, nil + return newStaticConfigProvider(s.lggr), nil } func (s staticRelayer) NewMedianProvider(ctx context.Context, r types.RelayArgs, p types.PluginArgs) (types.MedianProvider, error) { @@ -84,7 +102,7 @@ func (s staticRelayer) NewMedianProvider(ctx context.Context, r types.RelayArgs, if !reflect.DeepEqual(PluginArgs, p) { return nil, fmt.Errorf("expected plugin args %v but got %v", PluginArgs, p) } - return StaticMedianProvider{}, nil + return NewStaticMedianProvider(s.lggr), nil } func (s staticRelayer) NewPluginProvider(ctx context.Context, r types.RelayArgs, p types.PluginArgs) (types.PluginProvider, error) { @@ -95,7 +113,7 @@ func (s staticRelayer) NewPluginProvider(ctx context.Context, r types.RelayArgs, if !reflect.DeepEqual(PluginArgs, p) { return nil, fmt.Errorf("expected plugin args %v but got %v", PluginArgs, p) } - return StaticPluginProvider{}, nil + return NewStaticPluginProvider(s.lggr), nil } func (s staticRelayer) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { @@ -149,25 +167,34 @@ func newRelayArgsWithProviderType(_type types.OCR2PluginType) types.RelayArgs { func RunPluginRelayer(t *testing.T, p internal.PluginRelayer) { ctx := tests.Context(t) + servicetest.Run(t, p) t.Run("Relayer", func(t *testing.T) { relayer, err := p.NewRelayer(ctx, ConfigTOML, StaticKeystore{}) require.NoError(t, err) - require.NoError(t, relayer.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) + servicetest.Run(t, relayer) RunRelayer(t, relayer) + servicetest.AssertHealthReportNames(t, relayer.HealthReport(), + "PluginRelayerClient", //TODO missing + "PluginRelayerClient.RelayerClient", + "PluginRelayerClient.RelayerClient.staticPluginRelayer", //TODO missing + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticMedianProvider", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticPluginProvider", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticConfigProvider", + ) }) } func RunRelayer(t *testing.T, relayer internal.Relayer) { ctx := tests.Context(t) + configProvider, err0 := relayer.NewConfigProvider(ctx, RelayArgs) + require.NoError(t, err0) + servicetest.Run(t, configProvider) + t.Run("ConfigProvider", func(t *testing.T) { t.Parallel() - configProvider, err := relayer.NewConfigProvider(ctx, RelayArgs) - require.NoError(t, err) - require.NoError(t, configProvider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, configProvider.Close()) }) t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() @@ -195,145 +222,135 @@ func RunRelayer(t *testing.T, relayer internal.Relayer) { }) }) + pp, err0 := relayer.NewPluginProvider(ctx, newRelayArgsWithProviderType(types.Median), PluginArgs) + require.NoError(t, err0) + provider := pp.(types.MedianProvider) + servicetest.Run(t, provider) + t.Run("MedianProvider", func(t *testing.T) { t.Parallel() - ra := newRelayArgsWithProviderType(types.Median) - p, err := relayer.NewPluginProvider(ctx, ra, PluginArgs) - provider := p.(types.MedianProvider) - require.NoError(t, err) - require.NoError(t, provider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, provider.Close()) }) - t.Run("ReportingPluginProvider", func(t *testing.T) { + t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() - - t.Run("OffchainConfigDigester", func(t *testing.T) { - t.Parallel() - ocd := provider.OffchainConfigDigester() - gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() - require.NoError(t, err) - assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) - gotConfigDigest, err := ocd.ConfigDigest(contractConfig) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - }) - t.Run("ContractConfigTracker", func(t *testing.T) { - t.Parallel() - cct := provider.ContractConfigTracker() - gotBlockHeight, err := cct.LatestBlockHeight(ctx) - require.NoError(t, err) - assert.Equal(t, blockHeight, gotBlockHeight) - gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) - require.NoError(t, err) - assert.Equal(t, changedInBlock, gotChangedInBlock) - assert.Equal(t, configDigest, gotConfigDigest) - gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) - require.NoError(t, err) - assert.Equal(t, contractConfig, gotContractConfig) - }) - t.Run("ContractTransmitter", func(t *testing.T) { - t.Parallel() - ct := provider.ContractTransmitter() - gotAccount, err := ct.FromAccount() - require.NoError(t, err) - assert.Equal(t, account, gotAccount) - gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - err = ct.Transmit(ctx, reportContext, report, sigs) - require.NoError(t, err) - }) - t.Run("ReportCodec", func(t *testing.T) { - t.Parallel() - rc := provider.ReportCodec() - gotReport, err := rc.BuildReport(pobs) - require.NoError(t, err) - assert.Equal(t, report, gotReport) - gotMedianValue, err := rc.MedianFromReport(report) - require.NoError(t, err) - assert.Equal(t, medianValue, gotMedianValue) - gotMax, err := rc.MaxReportLength(n) - require.NoError(t, err) - assert.Equal(t, max, gotMax) - }) - t.Run("MedianContract", func(t *testing.T) { - t.Parallel() - mc := provider.MedianContract() - gotConfigDigest, gotEpoch, gotRound, err := mc.LatestRoundRequested(ctx, lookbackDuration) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - assert.Equal(t, round, gotRound) - gotConfigDigest, gotEpoch, gotRound, gotLatestAnswer, gotLatestTimestamp, err := mc.LatestTransmissionDetails(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - assert.Equal(t, round, gotRound) - assert.Equal(t, latestAnswer, gotLatestAnswer) - assert.WithinDuration(t, latestTimestamp, gotLatestTimestamp, time.Second) - }) - t.Run("OnchainConfigCodec", func(t *testing.T) { - t.Parallel() - occ := provider.OnchainConfigCodec() - gotEncoded, err := occ.Encode(onchainConfig) - require.NoError(t, err) - assert.Equal(t, encoded, gotEncoded) - gotDecoded, err := occ.Decode(encoded) - require.NoError(t, err) - assert.Equal(t, onchainConfig, gotDecoded) - }) + ocd := provider.OffchainConfigDigester() + gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() + require.NoError(t, err) + assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) + gotConfigDigest, err := ocd.ConfigDigest(contractConfig) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + }) + t.Run("ContractConfigTracker", func(t *testing.T) { + t.Parallel() + cct := provider.ContractConfigTracker() + gotBlockHeight, err := cct.LatestBlockHeight(ctx) + require.NoError(t, err) + assert.Equal(t, blockHeight, gotBlockHeight) + gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) + require.NoError(t, err) + assert.Equal(t, changedInBlock, gotChangedInBlock) + assert.Equal(t, configDigest, gotConfigDigest) + gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) + require.NoError(t, err) + assert.Equal(t, contractConfig, gotContractConfig) + }) + t.Run("ContractTransmitter", func(t *testing.T) { + t.Parallel() + ct := provider.ContractTransmitter() + gotAccount, err := ct.FromAccount() + require.NoError(t, err) + assert.Equal(t, account, gotAccount) + gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + err = ct.Transmit(ctx, reportContext, report, sigs) + require.NoError(t, err) + }) + t.Run("ReportCodec", func(t *testing.T) { + t.Parallel() + rc := provider.ReportCodec() + gotReport, err := rc.BuildReport(pobs) + require.NoError(t, err) + assert.Equal(t, report, gotReport) + gotMedianValue, err := rc.MedianFromReport(report) + require.NoError(t, err) + assert.Equal(t, medianValue, gotMedianValue) + gotMax, err := rc.MaxReportLength(n) + require.NoError(t, err) + assert.Equal(t, max, gotMax) + }) + t.Run("MedianContract", func(t *testing.T) { + t.Parallel() + mc := provider.MedianContract() + gotConfigDigest, gotEpoch, gotRound, err := mc.LatestRoundRequested(ctx, lookbackDuration) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + assert.Equal(t, round, gotRound) + gotConfigDigest, gotEpoch, gotRound, gotLatestAnswer, gotLatestTimestamp, err := mc.LatestTransmissionDetails(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + assert.Equal(t, round, gotRound) + assert.Equal(t, latestAnswer, gotLatestAnswer) + assert.WithinDuration(t, latestTimestamp, gotLatestTimestamp, time.Second) + }) + t.Run("OnchainConfigCodec", func(t *testing.T) { + t.Parallel() + occ := provider.OnchainConfigCodec() + gotEncoded, err := occ.Encode(onchainConfig) + require.NoError(t, err) + assert.Equal(t, encoded, gotEncoded) + gotDecoded, err := occ.Decode(encoded) + require.NoError(t, err) + assert.Equal(t, onchainConfig, gotDecoded) }) }) + pp, err0 = relayer.NewPluginProvider(ctx, newRelayArgsWithProviderType(types.GenericPlugin), PluginArgs) + require.NoError(t, err0) + servicetest.Run(t, pp) + t.Run("PluginProvider", func(t *testing.T) { t.Parallel() - ra := newRelayArgsWithProviderType(types.GenericPlugin) - provider, err := relayer.NewPluginProvider(ctx, ra, PluginArgs) - require.NoError(t, err) - require.NoError(t, provider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, provider.Close()) }) - t.Run("ReportingPluginProvider", func(t *testing.T) { + t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() - - t.Run("OffchainConfigDigester", func(t *testing.T) { - t.Parallel() - ocd := provider.OffchainConfigDigester() - gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() - require.NoError(t, err) - assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) - gotConfigDigest, err := ocd.ConfigDigest(contractConfig) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - }) - t.Run("ContractConfigTracker", func(t *testing.T) { - t.Parallel() - cct := provider.ContractConfigTracker() - gotBlockHeight, err := cct.LatestBlockHeight(ctx) - require.NoError(t, err) - assert.Equal(t, blockHeight, gotBlockHeight) - gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) - require.NoError(t, err) - assert.Equal(t, changedInBlock, gotChangedInBlock) - assert.Equal(t, configDigest, gotConfigDigest) - gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) - require.NoError(t, err) - assert.Equal(t, contractConfig, gotContractConfig) - }) - t.Run("ContractTransmitter", func(t *testing.T) { - t.Parallel() - ct := provider.ContractTransmitter() - gotAccount, err := ct.FromAccount() - require.NoError(t, err) - assert.Equal(t, account, gotAccount) - gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - err = ct.Transmit(ctx, reportContext, report, sigs) - require.NoError(t, err) - }) + ocd := pp.OffchainConfigDigester() + gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() + require.NoError(t, err) + assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) + gotConfigDigest, err := ocd.ConfigDigest(contractConfig) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + }) + t.Run("ContractConfigTracker", func(t *testing.T) { + t.Parallel() + cct := pp.ContractConfigTracker() + gotBlockHeight, err := cct.LatestBlockHeight(ctx) + require.NoError(t, err) + assert.Equal(t, blockHeight, gotBlockHeight) + gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) + require.NoError(t, err) + assert.Equal(t, changedInBlock, gotChangedInBlock) + assert.Equal(t, configDigest, gotConfigDigest) + gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) + require.NoError(t, err) + assert.Equal(t, contractConfig, gotContractConfig) + }) + t.Run("ContractTransmitter", func(t *testing.T) { + t.Parallel() + ct := pp.ContractTransmitter() + gotAccount, err := ct.FromAccount() + require.NoError(t, err) + assert.Equal(t, account, gotAccount) + gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + err = ct.Transmit(ctx, reportContext, report, sigs) + require.NoError(t, err) }) }) diff --git a/pkg/loop/internal/test/reporting_plugin.go b/pkg/loop/internal/test/reporting_plugin.go index 20aa4a0d4..438009064 100644 --- a/pkg/loop/internal/test/reporting_plugin.go +++ b/pkg/loop/internal/test/reporting_plugin.go @@ -8,7 +8,9 @@ import ( "google.golang.org/grpc" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -18,14 +20,27 @@ type MockConn struct { const ReportingPluginWithMedianProviderName = "reporting-plugin-with-median-provider" -type StaticReportingPluginWithMedianProvider struct { +type staticReportingPluginWithMedianProvider struct { + staticService } -func (s StaticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider { - return StaticMedianProvider{} +func NewStaticReportingPluginWithMedianProvider(lggr logger.Logger) staticReportingPluginWithMedianProvider { + return staticReportingPluginWithMedianProvider{staticService{lggr: logger.Named(lggr, "staticReportingPluginWithMedianProvider")}} } -func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func (s staticReportingPluginWithMedianProvider) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, NewStaticMedianProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp +} + +func (s staticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider { + return NewStaticMedianProvider(s.lggr) +} + +func (s staticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { + //TODO validate config ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -158,6 +173,7 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c if !reflect.DeepEqual(gotDecoded, onchainConfig) { return nil, fmt.Errorf("expected OnchainConfig %s but got %s", onchainConfig, gotDecoded) } + //TODO validate pipelineRunner if err2 := errorLog.SaveError(ctx, errMsg); err2 != nil { return nil, fmt.Errorf("failed to save error: %w", err2) } @@ -169,17 +185,29 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c if err != nil { return nil, fmt.Errorf("failed to send log: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil +} + +type staticReportingPluginWithPluginProvider struct { + staticService +} + +func NewStaticReportingPluginWithPluginProvider(lggr logger.Logger) staticReportingPluginWithPluginProvider { + return staticReportingPluginWithPluginProvider{staticService{lggr: logger.Named(lggr, "staticReportingPluginWithPluginProvider")}} } -type StaticReportingPluginWithPluginProvider struct { +func (s staticReportingPluginWithPluginProvider) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, NewStaticPluginProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp } -func (s StaticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider { - return StaticPluginProvider{} +func (s staticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider { + return NewStaticPluginProvider(s.lggr) } -func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.PluginProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func (s staticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.PluginProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -260,5 +288,5 @@ func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx c if err != nil { return nil, fmt.Errorf("failed to send log: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil } diff --git a/pkg/loop/internal/test/test.go b/pkg/loop/internal/test/test.go index 995e50cab..85da5fa1e 100644 --- a/pkg/loop/internal/test/test.go +++ b/pkg/loop/internal/test/test.go @@ -1,6 +1,7 @@ package test import ( + "context" "math/big" "time" @@ -10,6 +11,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -157,3 +159,29 @@ URL = 'https://test.url' } latestValue = map[string]int{"ret1": 1, "ret2": 2} ) + +type staticService struct { + lggr logger.Logger +} + +func (s staticService) Name() string { return s.lggr.Name() } + +func (s staticService) Start(ctx context.Context) error { + s.lggr.Info("Started") + return nil +} + +func (s staticService) Close() error { + s.lggr.Info("Closed") + return nil +} + +func (s staticService) Ready() error { + s.lggr.Info("Ready") + return nil +} + +// HealthReport reports only for this single service. Override to include sub-services. +func (s staticService) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} diff --git a/pkg/loop/internal/types.go b/pkg/loop/internal/types.go index 9cb289bc2..d127307df 100644 --- a/pkg/loop/internal/types.go +++ b/pkg/loop/internal/types.go @@ -3,10 +3,12 @@ package internal import ( "context" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) type PluginRelayer interface { + services.Service NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) } diff --git a/pkg/loop/median_service.go b/pkg/loop/median_service.go index bdeecb936..d34f65967 100644 --- a/pkg/loop/median_service.go +++ b/pkg/loop/median_service.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -23,12 +24,17 @@ type MedianService struct { // NewMedianService returns a new [*MedianService]. // cmd must return a new exec.Cmd each time it is called. func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog types.ErrorLog) *MedianService { - newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) { + newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) { plug, ok := instance.(types.PluginMedian) if !ok { - return nil, fmt.Errorf("expected PluginMedian but got %T", instance) + return nil, nil, fmt.Errorf("expected PluginMedian but got %T", instance) } - return plug.NewMedianFactory(ctx, provider, dataSource, juelsPerFeeCoin, errorLog) + //TODO plug.Start(ctx)? (how to close?) + factory, err := plug.NewMedianFactory(ctx, provider, dataSource, juelsPerFeeCoin, errorLog) + if err != nil { + return nil, nil, err + } + return factory, plug, nil } stopCh := make(chan struct{}) lggr = logger.Named(lggr, "MedianService") diff --git a/pkg/loop/median_service_test.go b/pkg/loop/median_service_test.go index f2bd8adbb..76dee6e7f 100644 --- a/pkg/loop/median_service_test.go +++ b/pkg/loop/median_service_test.go @@ -15,10 +15,10 @@ import ( func TestMedianService(t *testing.T) { t.Parallel() - - median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { + lggr := logger.Test(t) + median := loop.NewMedianService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { return NewHelperProcessCommand(loop.PluginMedianName) - }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) + }, test.NewStaticMedianProvider(lggr), test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) hook := median.PluginService.XXXTestHook() servicetest.Run(t, median) @@ -47,14 +47,14 @@ func TestMedianService(t *testing.T) { func TestMedianService_recovery(t *testing.T) { t.Parallel() + lggr := logger.Test(t) var limit atomic.Int32 - median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - h := HelperProcessCommand{ + median := loop.NewMedianService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { + return HelperProcessCommand{ Command: loop.PluginMedianName, Limit: int(limit.Add(1)), - } - return h.New() - }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) + }.New() + }, test.NewStaticMedianProvider(lggr), test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) servicetest.Run(t, median) test.ReportingPluginFactory(t, median) diff --git a/pkg/loop/plugin_median.go b/pkg/loop/plugin_median.go index 2855fcbf2..910ed7433 100644 --- a/pkg/loop/plugin_median.go +++ b/pkg/loop/plugin_median.go @@ -40,6 +40,7 @@ type GRPCPluginMedian struct { } func (p *GRPCPluginMedian) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { + //TODO when to start PluginServer return internal.RegisterPluginMedianServer(server, broker, p.BrokerConfig, p.PluginServer) } diff --git a/pkg/loop/plugin_median_test.go b/pkg/loop/plugin_median_test.go index 3755ea1e0..a915e3492 100644 --- a/pkg/loop/plugin_median_test.go +++ b/pkg/loop/plugin_median_test.go @@ -18,14 +18,15 @@ import ( func TestPluginMedian(t *testing.T) { t.Parallel() + lggr := logger.Test(t) stopCh := newStopCh(t) - test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.PluginMedian) + test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.PluginMedian) t.Run("proxy", func(t *testing.T) { - test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { + test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { p := newMedianProvider(t, pr) pm := test.PluginMedianTest{MedianProvider: p} - test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) + test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) }) }) } diff --git a/pkg/loop/plugin_relayer.go b/pkg/loop/plugin_relayer.go index d5d99ecd0..db1f8bff8 100644 --- a/pkg/loop/plugin_relayer.go +++ b/pkg/loop/plugin_relayer.go @@ -43,6 +43,7 @@ type GRPCPluginRelayer struct { } func (p *GRPCPluginRelayer) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { + //TODO when to start... Auto? Or explicit call from other side? return internal.RegisterPluginRelayerServer(server, broker, p.BrokerConfig, p.PluginServer) } diff --git a/pkg/loop/plugin_relayer_test.go b/pkg/loop/plugin_relayer_test.go index 8d4097804..153bd9b78 100644 --- a/pkg/loop/plugin_relayer_test.go +++ b/pkg/loop/plugin_relayer_test.go @@ -14,8 +14,9 @@ import ( func TestPluginRelayer(t *testing.T) { t.Parallel() + lggr := logger.Test(t) stopCh := newStopCh(t) - test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.RunPluginRelayer) + test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.RunPluginRelayer) } func TestPluginRelayerExec(t *testing.T) { diff --git a/pkg/loop/process_test.go b/pkg/loop/process_test.go index 94ae50022..e5f4d22d4 100644 --- a/pkg/loop/process_test.go +++ b/pkg/loop/process_test.go @@ -8,14 +8,13 @@ import ( type HelperProcessCommand test.HelperProcessCommand -func (h *HelperProcessCommand) New() *exec.Cmd { +func (h HelperProcessCommand) New() *exec.Cmd { h.CommandLocation = "./internal/test/cmd/main.go" - return (test.HelperProcessCommand)(*h).New() + return (test.HelperProcessCommand)(h).New() } func NewHelperProcessCommand(command string) *exec.Cmd { - h := HelperProcessCommand{ + return HelperProcessCommand{ Command: command, - } - return h.New() + }.New() } diff --git a/pkg/loop/relayer_service.go b/pkg/loop/relayer_service.go index ef5ff52f2..b6b839d10 100644 --- a/pkg/loop/relayer_service.go +++ b/pkg/loop/relayer_service.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -21,16 +22,20 @@ type RelayerService struct { // NewRelayerService returns a new [*RelayerService]. // cmd must return a new exec.Cmd each time it is called. func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore types.Keystore) *RelayerService { - newService := func(ctx context.Context, instance any) (Relayer, error) { + newService := func(ctx context.Context, instance any) (Relayer, services.HealthReporter, error) { plug, ok := instance.(PluginRelayer) if !ok { - return nil, fmt.Errorf("expected PluginRelayer but got %T", instance) + return nil, nil, fmt.Errorf("expected PluginRelayer but got %T", instance) + } + //TODo plug.Start(ctx)? (how to close?) + if err := plug.Start(ctx); err != nil { + return nil, nil, fmt.Errorf("failed to start PluginRelayer: %w", err) } r, err := plug.NewRelayer(ctx, config, keystore) if err != nil { - return nil, fmt.Errorf("failed to create Relayer: %w", err) + return nil, nil, fmt.Errorf("failed to create Relayer: %w", err) } - return r, nil + return r, plug, nil } stopCh := make(chan struct{}) lggr = logger.Named(lggr, "RelayerService") diff --git a/pkg/loop/relayer_service_test.go b/pkg/loop/relayer_service_test.go index ba870b761..463036447 100644 --- a/pkg/loop/relayer_service_test.go +++ b/pkg/loop/relayer_service_test.go @@ -2,11 +2,16 @@ package loop_test import ( "os/exec" + "strings" "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" @@ -16,8 +21,19 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) +var relayerServiceNames = []string{ + "RelayerService", + "RelayerService.PluginRelayerClient", + "RelayerService.PluginRelayerClient.staticPluginRelayer", + "RelayerService.PluginRelayerClient.staticPluginRelayer.staticRelayer", + "RelayerService.PluginRelayerClient.staticPluginRelayer.staticRelayer.staticMedianProvider", + "RelayerService.PluginRelayerClient.staticPluginRelayer.staticRelayer.staticPluginProvider", + "RelayerService.PluginRelayerClient.staticPluginRelayer.staticRelayer.staticConfigProvider", +} + func TestRelayerService(t *testing.T) { t.Parallel() + relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { return NewHelperProcessCommand(loop.PluginRelayerName) }, test.ConfigTOML, test.StaticKeystore{}) @@ -26,6 +42,7 @@ func TestRelayerService(t *testing.T) { t.Run("control", func(t *testing.T) { test.RunRelayer(t, relayer) + servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames...) }) t.Run("Kill", func(t *testing.T) { @@ -35,6 +52,7 @@ func TestRelayerService(t *testing.T) { time.Sleep(2 * internal.KeepAliveTickDuration) test.RunRelayer(t, relayer) + servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames...) }) t.Run("Reset", func(t *testing.T) { @@ -44,6 +62,7 @@ func TestRelayerService(t *testing.T) { time.Sleep(2 * internal.KeepAliveTickDuration) test.RunRelayer(t, relayer) + servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames...) }) } @@ -51,33 +70,57 @@ func TestRelayerService_recovery(t *testing.T) { t.Parallel() var limit atomic.Int32 relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - h := HelperProcessCommand{ + return HelperProcessCommand{ Command: loop.PluginRelayerName, Limit: int(limit.Add(1)), - } - return h.New() + }.New() }, test.ConfigTOML, test.StaticKeystore{}) servicetest.Run(t, relayer) test.RunRelayer(t, relayer) + + servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames[:2]...) } func TestRelayerService_HealthReport(t *testing.T) { - lggr := logger.Named(logger.Test(t), "Foo") + t.Parallel() + + lggr, obsLogs := logger.TestObserved(t, zapcore.DebugLevel) + t.Cleanup(AssertLogsObserved(t, obsLogs, relayerServiceNames)) //TODO but not all logging? Or with extra names in-between? s := loop.NewRelayerService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { - return test.HelperProcessCommand{Command: loop.PluginRelayerName}.New() + return NewHelperProcessCommand(loop.PluginRelayerName) }, test.ConfigTOML, test.StaticKeystore{}) - servicetest.AssertHealthReportNames(t, s.HealthReport(), - "Foo.RelayerService") + servicetest.AssertHealthReportNames(t, s.HealthReport(), relayerServiceNames[0]) - require.NoError(t, s.Start(tests.Context(t))) - t.Cleanup(func() { require.NoError(t, s.Close()) }) + servicetest.Run(t, s) require.Eventually(t, func() bool { return s.Ready() == nil }, tests.WaitTimeout(t)/2, time.Second, s.Ready()) - servicetest.AssertHealthReportNames(t, s.HealthReport(), - "Foo.RelayerService", - "Foo.RelayerService.PluginRelayerClient.ChainRelayerClient", - "staticRelayer") + servicetest.AssertHealthReportNames(t, s.HealthReport(), relayerServiceNames...) + +} + +// TODO observed only or at least? +func AssertLogsObserved(t *testing.T, obsLogs *observer.ObservedLogs, names []string) func() { + return func() { + t.Helper() + + obsNames := map[string]struct{}{} + for _, l := range obsLogs.All() { + obsNames[l.LoggerName] = struct{}{} + } + var failed bool + for _, n := range names { + if _, ok := obsNames[n]; !ok { + t.Errorf("No logs observed for service: %s", n) + failed = true + } + } + if failed { + keys := maps.Keys(obsNames) + slices.Sort(keys) + t.Logf("Loggers observed:\n%s\n", strings.Join(keys, "\n")) + } + } } diff --git a/pkg/loop/reportingplugins/grpc.go b/pkg/loop/reportingplugins/grpc.go index c379343fa..6eb8903c1 100644 --- a/pkg/loop/reportingplugins/grpc.go +++ b/pkg/loop/reportingplugins/grpc.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -39,16 +40,22 @@ type GRPCService[T types.PluginProvider] struct { pluginClient *internal.ReportingPluginServiceClient } -type serverAdapter func( - context.Context, - types.ReportingPluginServiceConfig, - grpc.ClientConnInterface, - types.PipelineRunnerService, - types.TelemetryService, - types.ErrorLog, -) (types.ReportingPluginFactory, error) +type serverAdapter[T types.PluginProvider] struct { + services.StateMachine + BrokerConfig loop.BrokerConfig + ProviderServer ProviderServer[T] + GRPCBroker *plugin.GRPCBroker +} + +func (s *serverAdapter[T]) Start(ctx context.Context) error { return s.ProviderServer.Start(ctx) } + +func (s *serverAdapter[T]) Close() error { return s.ProviderServer.Close() } -func (s serverAdapter) NewReportingPluginFactory( +func (s *serverAdapter[T]) HealthReport() map[string]error { return s.ProviderServer.HealthReport() } + +func (s *serverAdapter[T]) Name() string { return s.ProviderServer.Name() } + +func (s *serverAdapter[T]) NewReportingPluginFactory( ctx context.Context, config types.ReportingPluginServiceConfig, conn grpc.ClientConnInterface, @@ -56,23 +63,15 @@ func (s serverAdapter) NewReportingPluginFactory( ts types.TelemetryService, errorLog types.ErrorLog, ) (types.ReportingPluginFactory, error) { - return s(ctx, config, conn, pr, ts, errorLog) + provider := s.ProviderServer.ConnToProvider(conn, s.GRPCBroker, s.BrokerConfig) + tc := internal.NewTelemetryClient(ts) + return s.ProviderServer.NewReportingPluginFactory(ctx, config, provider, pr, tc, errorLog) } func (g *GRPCService[T]) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { - adapter := func( - ctx context.Context, - cfg types.ReportingPluginServiceConfig, - conn grpc.ClientConnInterface, - pr types.PipelineRunnerService, - ts types.TelemetryService, - el types.ErrorLog, - ) (types.ReportingPluginFactory, error) { - provider := g.PluginServer.ConnToProvider(conn, broker, g.BrokerConfig) - tc := internal.NewTelemetryClient(ts) - return g.PluginServer.NewReportingPluginFactory(ctx, cfg, provider, pr, tc, el) - } - return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, serverAdapter(adapter)) + impl := &serverAdapter[T]{BrokerConfig: g.BrokerConfig, ProviderServer: g.PluginServer, GRPCBroker: broker} + //TODO when to start + return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, impl) } // GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.PluginClient], updated with the new broker and conn. diff --git a/pkg/loop/reportingplugins/grpc_test.go b/pkg/loop/reportingplugins/grpc_test.go index 7a3745159..ec8e83a67 100644 --- a/pkg/loop/reportingplugins/grpc_test.go +++ b/pkg/loop/reportingplugins/grpc_test.go @@ -40,7 +40,7 @@ func TestGRPCService_MedianProvider(t *testing.T) { t, test.ReportingPluginWithMedianProviderName, &reportingplugins.GRPCService[types.MedianProvider]{ - PluginServer: test.StaticReportingPluginWithMedianProvider{}, + PluginServer: test.NewStaticReportingPluginWithMedianProvider(logger.Test(t)), BrokerConfig: loop.BrokerConfig{ Logger: logger.Test(t), StopCh: stopCh, @@ -58,7 +58,7 @@ func TestGRPCService_PluginProvider(t *testing.T) { t, reportingplugins.PluginServiceName, &reportingplugins.GRPCService[types.PluginProvider]{ - PluginServer: test.StaticReportingPluginWithPluginProvider{}, + PluginServer: test.NewStaticReportingPluginWithPluginProvider(logger.Test(t)), BrokerConfig: loop.BrokerConfig{ Logger: logger.Test(t), StopCh: stopCh, diff --git a/pkg/loop/reportingplugins/loopp_service.go b/pkg/loop/reportingplugins/loopp_service.go index 2dc00f759..681e2dac9 100644 --- a/pkg/loop/reportingplugins/loopp_service.go +++ b/pkg/loop/reportingplugins/loopp_service.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -34,12 +35,17 @@ func NewLOOPPService( telemetryService types.TelemetryService, errorLog types.ErrorLog, ) *LOOPPService { - newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) { + newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, services.HealthReporter, error) { plug, ok := instance.(types.ReportingPluginClient) if !ok { - return nil, fmt.Errorf("expected GenericPluginClient but got %T", instance) + return nil, nil, fmt.Errorf("expected GenericPluginClient but got %T", instance) } - return plug.NewReportingPluginFactory(ctx, config, providerConn, pipelineRunner, telemetryService, errorLog) + //TODO plug.Start(ctx)? (how to close?) + factory, err := plug.NewReportingPluginFactory(ctx, config, providerConn, pipelineRunner, telemetryService, errorLog) + if err != nil { + return nil, nil, err + } + return factory, plug, nil } stopCh := make(chan struct{}) lggr = logger.Named(lggr, "GenericService") diff --git a/pkg/loop/standalone_provider_test.go b/pkg/loop/standalone_provider_test.go index 3f69fae77..9f0148038 100644 --- a/pkg/loop/standalone_provider_test.go +++ b/pkg/loop/standalone_provider_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test" ) @@ -13,7 +14,7 @@ import ( func TestRegisterStandAloneProvider(t *testing.T) { s := grpc.NewServer() - p := test.StaticPluginProvider{} + p := test.NewStaticPluginProvider(logger.Test(t)) err := loop.RegisterStandAloneProvider(s, p, "some-type-we-do-not-support") require.ErrorContains(t, err, "stand alone provider only supports median") diff --git a/pkg/services/servicetest/health.go b/pkg/services/servicetest/health.go index b43e0fe6e..fd84a1877 100644 --- a/pkg/services/servicetest/health.go +++ b/pkg/services/servicetest/health.go @@ -1,6 +1,7 @@ package servicetest import ( + "strings" "testing" "github.com/stretchr/testify/assert" @@ -13,5 +14,6 @@ func AssertHealthReportNames(t *testing.T, hp map[string]error, names ...string) keys := maps.Keys(hp) slices.Sort(keys) slices.Sort(names) - assert.EqualValues(t, names, keys) + join := func(s []string) string { return strings.Join(s, "\n") } + assert.Equal(t, join(names), join(keys)) } diff --git a/pkg/services/servicetest/run.go b/pkg/services/servicetest/run.go index 8a1046e3c..4086682bb 100644 --- a/pkg/services/servicetest/run.go +++ b/pkg/services/servicetest/run.go @@ -27,8 +27,8 @@ type TestingT interface { // Run fails tb if the service fails to start or close. func Run[R Runnable](tb TestingT, r R) R { tb.Helper() - require.NoError(tb, r.Start(tests.Context(tb)), "service failed to start") - tb.Cleanup(func() { assert.NoError(tb, r.Close(), "error closing service") }) + require.NoError(tb, r.Start(tests.Context(tb)), "service failed to start: %T", r) + tb.Cleanup(func() { assert.NoError(tb, r.Close(), "error closing service: %T", r) }) return r } diff --git a/pkg/types/provider_median.go b/pkg/types/provider_median.go index 29010d82d..5fdda2d20 100644 --- a/pkg/types/provider_median.go +++ b/pkg/types/provider_median.go @@ -5,6 +5,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/services" ) // MedianProvider provides all components needed for a median OCR2 plugin. @@ -16,11 +18,12 @@ type MedianProvider interface { } type PluginMedian interface { + services.Service // NewMedianFactory returns a new ReportingPluginFactory. If provider implements GRPCClientConn, it can be forwarded efficiently via proxy. NewMedianFactory(ctx context.Context, provider MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog ErrorLog) (ReportingPluginFactory, error) } type ReportingPluginFactory interface { - Service + services.Service libocr.ReportingPluginFactory } diff --git a/pkg/types/reporting_plugin_service.go b/pkg/types/reporting_plugin_service.go index 0d4535d89..3f0f18db2 100644 --- a/pkg/types/reporting_plugin_service.go +++ b/pkg/types/reporting_plugin_service.go @@ -4,6 +4,8 @@ import ( "context" "google.golang.org/grpc" + + "github.com/smartcontractkit/chainlink-common/pkg/services" ) type ReportingPluginServiceConfig struct { @@ -17,6 +19,7 @@ type ReportingPluginServiceConfig struct { // ReportingPluginClient is the client interface to a plugin running // as a generic job (job type = GenericPlugin) inside the core node. type ReportingPluginClient interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, grpcProvider grpc.ClientConnInterface, pipelineRunner PipelineRunnerService, telemetry TelemetryService, errorLog ErrorLog) (ReportingPluginFactory, error) } @@ -25,5 +28,6 @@ type ReportingPluginClient interface { // with the passthrough provider connection converted to the provider // expected by the plugin. type ReportingPluginServer[T PluginProvider] interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, provider T, pipelineRunner PipelineRunnerService, telemetry TelemetryClient, errorLog ErrorLog) (ReportingPluginFactory, error) }