From c2c52093a0c61f8538bb15f4bfaf6d925f21d645 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 30 Oct 2023 12:12:52 -0500 Subject: [PATCH] pkg/loop: plugins report health to host --- pkg/logger/logger.go | 7 + pkg/loop/internal/median.go | 5 + pkg/loop/internal/relayer.go | 14 +- pkg/loop/internal/reporting_plugin_service.go | 5 + pkg/loop/internal/service.go | 5 +- pkg/loop/internal/test/cmd/main.go | 8 +- pkg/loop/internal/test/config.go | 19 +- pkg/loop/internal/test/median.go | 73 ++-- pkg/loop/internal/test/plugin_provider.go | 26 +- pkg/loop/internal/test/relayer.go | 312 +++++++++--------- pkg/loop/internal/test/reporting_plugin.go | 48 ++- pkg/loop/internal/test/test.go | 29 ++ pkg/loop/internal/types.go | 2 + pkg/loop/median_service_test.go | 15 +- pkg/loop/plugin_median.go | 1 + pkg/loop/plugin_median_test.go | 7 +- pkg/loop/plugin_relayer.go | 1 + pkg/loop/plugin_relayer_test.go | 3 +- pkg/loop/process_test.go | 9 +- pkg/loop/relayer_service_test.go | 90 ++++- pkg/loop/reportingplugins/grpc.go | 48 ++- pkg/loop/reportingplugins/grpc_test.go | 4 +- pkg/services/srvctest/health.go | 10 + pkg/types/provider_median.go | 5 +- pkg/types/reporting_plugin_service.go | 4 + 25 files changed, 464 insertions(+), 286 deletions(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index b4af751ee9..3b8ec95432 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -137,6 +137,13 @@ func With(l Logger, keyvals ...interface{}) Logger { // Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l. func Named(l Logger, n string) Logger { + l = named(l, n) + if testing.Testing() { + l.Debugf("New logger: %s", n) + } + return l +} +func named(l Logger, n string) Logger { switch t := l.(type) { case *logger: return t.named(n) diff --git a/pkg/loop/internal/median.go b/pkg/loop/internal/median.go index 692535127a..35b5724f22 100644 --- a/pkg/loop/internal/median.go +++ b/pkg/loop/internal/median.go @@ -107,6 +107,7 @@ type pluginMedianServer struct { } func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl)) return nil } @@ -152,6 +153,10 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N m.closeAll(dsRes, juelsRes, providerRes, errorLogRes) return nil, err } + if err = factory.Start(ctx); err != nil { + m.closeAll(dsRes, juelsRes, providerRes, errorLogRes) + return nil, err + } id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: factory}) diff --git a/pkg/loop/internal/relayer.go b/pkg/loop/internal/relayer.go index e0bec15742..c33913251a 100644 --- a/pkg/loop/internal/relayer.go +++ b/pkg/loop/internal/relayer.go @@ -19,14 +19,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil) type PluginRelayerClient struct { *pluginClient + *serviceClient - grpc pb.PluginRelayerClient + pluginRelayer pb.PluginRelayerClient } func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient { brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient") pc := newPluginClient(broker, brokerCfg, conn) - return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)} + return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)} } func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) { @@ -40,7 +41,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key } deps.Add(ksRes) - reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{ + reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{ Config: config, KeystoreID: id, }) @@ -61,6 +62,7 @@ type pluginRelayerServer struct { } func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl)) return nil } @@ -161,7 +163,7 @@ type relayerClient struct { } func newRelayerClient(b *brokerExt, conn grpc.ClientConnInterface) *relayerClient { - b = b.withName("ChainRelayerClient") + b = b.withName("RelayerClient") return &relayerClient{b, newServiceClient(b, conn), pb.NewRelayerClient(conn)} } @@ -436,14 +438,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck) } -func healthReport(s map[string]string) (hr map[string]error) { +func healthReport(prefix string, s map[string]string) (hr map[string]error) { hr = make(map[string]error, len(s)) for n, e := range s { var err error if e != "" { err = errors.New(e) } - hr[n] = err + hr[prefix+"."+n] = err } return hr } diff --git a/pkg/loop/internal/reporting_plugin_service.go b/pkg/loop/internal/reporting_plugin_service.go index 953d97dc61..0fa5ea4243 100644 --- a/pkg/loop/internal/reporting_plugin_service.go +++ b/pkg/loop/internal/reporting_plugin_service.go @@ -96,6 +96,7 @@ type reportingPluginServiceServer struct { } func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error { + pb.RegisterServiceServer(server, &serviceServer{srv: impl}) pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl)) return nil } @@ -148,6 +149,10 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes) return nil, err } + if err = factory.Start(ctx); err != nil { + m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes) + return nil, err + } id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: factory}) diff --git a/pkg/loop/internal/service.go b/pkg/loop/internal/service.go index 6240f8ffd6..b948926f4f 100644 --- a/pkg/loop/internal/service.go +++ b/pkg/loop/internal/service.go @@ -56,11 +56,12 @@ func (s *serviceClient) HealthReport() map[string]error { ctx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() + name := s.Name() reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{}) if err != nil { - return map[string]error{s.b.Logger.Name(): err} + return map[string]error{name: err} } - hr := healthReport(reply.HealthReport) + hr := healthReport(name, reply.HealthReport) hr[s.b.Logger.Name()] = nil return hr } diff --git a/pkg/loop/internal/test/cmd/main.go b/pkg/loop/internal/test/cmd/main.go index 4b64664568..37ad1e89a8 100644 --- a/pkg/loop/internal/test/cmd/main.go +++ b/pkg/loop/internal/test/cmd/main.go @@ -58,7 +58,7 @@ func main() { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginRelayerHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, + loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) @@ -68,7 +68,7 @@ func main() { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginMedianHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, + loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) @@ -89,7 +89,7 @@ func main() { HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(), Plugins: map[string]plugin.Plugin{ reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.PluginProvider]{ - PluginServer: test.StaticReportingPluginWithPluginProvider{}, + PluginServer: test.NewStaticReportingPluginWithPluginProvider(lggr), BrokerConfig: loop.BrokerConfig{ Logger: lggr, StopCh: stopCh, @@ -105,7 +105,7 @@ func main() { HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(), Plugins: map[string]plugin.Plugin{ reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.MedianProvider]{ - PluginServer: test.StaticReportingPluginWithMedianProvider{}, + PluginServer: test.NewStaticReportingPluginWithMedianProvider(lggr), BrokerConfig: loop.BrokerConfig{ Logger: lggr, StopCh: stopCh, diff --git a/pkg/loop/internal/test/config.go b/pkg/loop/internal/test/config.go index fa01e8446d..dfffa0227a 100644 --- a/pkg/loop/internal/test/config.go +++ b/pkg/loop/internal/test/config.go @@ -8,20 +8,17 @@ import ( libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/assert" -) - -type staticConfigProvider struct{} - -// TODO validate start/Close calls? -func (s staticConfigProvider) Start(ctx context.Context) error { return nil } - -func (s staticConfigProvider) Close() error { return nil } -func (s staticConfigProvider) Ready() error { panic("unimplemented") } + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) -func (s staticConfigProvider) Name() string { panic("unimplemented") } +type staticConfigProvider struct { + staticService +} -func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") } +func newStaticConfigProvider(lggr logger.Logger) staticConfigProvider { + return staticConfigProvider{staticService{lggr: logger.Named(lggr, "staticConfigProvider")}} +} func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} diff --git a/pkg/loop/internal/test/median.go b/pkg/loop/internal/test/median.go index fda3c96e35..4b2ccb7789 100644 --- a/pkg/loop/internal/test/median.go +++ b/pkg/loop/internal/test/median.go @@ -15,12 +15,16 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/srvctest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) func PluginMedian(t *testing.T, p types.PluginMedian) { - PluginMedianTest{&StaticMedianProvider{}}.TestPluginMedian(t, p) + PluginMedianTest{&staticMedianProvider{}}.TestPluginMedian(t, p) } type PluginMedianTest struct { @@ -34,6 +38,11 @@ func (m PluginMedianTest) TestPluginMedian(t *testing.T, p types.PluginMedian) { require.NoError(t, err) ReportingPluginFactory(t, factory) + srvctest.AssertHealthReportNames(t, p.HealthReport(), + "PluginMedianClient", + "PluginMedianClient.staticPluginMedian", + "PluginMedianClient.staticPluginMedian.staticPluginFactory", + ) }) } @@ -65,9 +74,21 @@ func ReportingPluginFactory(t *testing.T, factory types.ReportingPluginFactory) }) } -type StaticPluginMedian struct{} +type staticPluginMedian struct { + staticService +} -func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func NewStaticPluginMedian(lggr logger.Logger) staticPluginMedian { + return staticPluginMedian{staticService{lggr: logger.Named(lggr, "staticPluginMedian")}} +} + +func (s staticPluginMedian) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp +} + +func (s staticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -217,20 +238,16 @@ func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types if err := errorLog.SaveError(ctx, errMsg); err != nil { return nil, fmt.Errorf("failed to save error: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil } -type staticPluginFactory struct{} - -func (s staticPluginFactory) Name() string { panic("implement me") } - -func (s staticPluginFactory) Start(ctx context.Context) error { return nil } - -func (s staticPluginFactory) Close() error { return nil } - -func (s staticPluginFactory) Ready() error { panic("implement me") } +type staticPluginFactory struct { + staticService +} -func (s staticPluginFactory) HealthReport() map[string]error { panic("implement me") } +func newStaticPluginFactory(lggr logger.Logger) staticPluginFactory { + return staticPluginFactory{staticService{lggr: logger.Named(lggr, "staticPluginFactory")}} +} func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) { if config.ConfigDigest != reportingPluginConfig.ConfigDigest { @@ -272,35 +289,31 @@ func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginCon return staticReportingPlugin{}, rpi, nil } -type StaticMedianProvider struct{} - -func (s StaticMedianProvider) Start(ctx context.Context) error { return nil } - -func (s StaticMedianProvider) Close() error { return nil } - -func (s StaticMedianProvider) Ready() error { panic("unimplemented") } - -func (s StaticMedianProvider) Name() string { panic("unimplemented") } +type staticMedianProvider struct { + staticService +} -func (s StaticMedianProvider) HealthReport() map[string]error { panic("unimplemented") } +func NewStaticMedianProvider(lggr logger.Logger) staticMedianProvider { + return staticMedianProvider{staticService{lggr: logger.Named(lggr, "staticMedianProvider")}} +} -func (s StaticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { +func (s staticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} } -func (s StaticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker { +func (s staticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker { return staticContractConfigTracker{} } -func (s StaticMedianProvider) ContractTransmitter() libocr.ContractTransmitter { +func (s staticMedianProvider) ContractTransmitter() libocr.ContractTransmitter { return staticContractTransmitter{} } -func (s StaticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} } +func (s staticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} } -func (s StaticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} } +func (s staticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} } -func (s StaticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec { +func (s staticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec { return staticOnchainConfigCodec{} } diff --git a/pkg/loop/internal/test/plugin_provider.go b/pkg/loop/internal/test/plugin_provider.go index c9878832f3..fb67fc190f 100644 --- a/pkg/loop/internal/test/plugin_provider.go +++ b/pkg/loop/internal/test/plugin_provider.go @@ -1,31 +1,27 @@ package test import ( - "context" - libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" -) - -type StaticPluginProvider struct{} - -func (s StaticPluginProvider) Start(ctx context.Context) error { return nil } -func (s StaticPluginProvider) Close() error { return nil } - -func (s StaticPluginProvider) Ready() error { panic("unimplemented") } + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) -func (s StaticPluginProvider) Name() string { panic("unimplemented") } +type staticPluginProvider struct { + staticService +} -func (s StaticPluginProvider) HealthReport() map[string]error { panic("unimplemented") } +func NewStaticPluginProvider(lggr logger.Logger) staticPluginProvider { + return staticPluginProvider{staticService{lggr: logger.Named(lggr, "staticPluginProvider")}} +} -func (s StaticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { +func (s staticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester { return staticOffchainConfigDigester{} } -func (s StaticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker { +func (s staticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker { return staticContractConfigTracker{} } -func (s StaticPluginProvider) ContractTransmitter() libocr.ContractTransmitter { +func (s staticPluginProvider) ContractTransmitter() libocr.ContractTransmitter { return staticContractTransmitter{} } diff --git a/pkg/loop/internal/test/relayer.go b/pkg/loop/internal/test/relayer.go index ca3bb68b64..02310755f3 100644 --- a/pkg/loop/internal/test/relayer.go +++ b/pkg/loop/internal/test/relayer.go @@ -12,7 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/services/srvctest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) @@ -33,9 +36,21 @@ func (s StaticKeystore) Sign(ctx context.Context, id string, data []byte) ([]byt return signed, nil } -type StaticPluginRelayer struct{} +type staticPluginRelayer struct { + staticService +} + +func NewStaticPluginRelayer(lggr logger.Logger) staticPluginRelayer { + return staticPluginRelayer{staticService{lggr: logger.Named(lggr, "staticPluginRelayer")}} +} -func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) { +func (s staticPluginRelayer) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticRelayer(s.lggr).HealthReport()) + return hp +} + +func (s staticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) { if config != ConfigTOML { return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config) } @@ -53,27 +68,29 @@ func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keys if !bytes.Equal(signed, gotSigned) { return nil, fmt.Errorf("expected signed bytes %x but got %x", signed, gotSigned) } - return staticRelayer{}, nil + return newStaticRelayer(s.lggr), nil } -type staticRelayer struct{} - -func (s staticRelayer) Start(ctx context.Context) error { return nil } - -func (s staticRelayer) Close() error { return nil } - -func (s staticRelayer) Ready() error { return nil } +type staticRelayer struct { + staticService +} -func (s staticRelayer) Name() string { return "staticRelayer" } +func newStaticRelayer(lggr logger.Logger) staticRelayer { + return staticRelayer{staticService{lggr: logger.Named(lggr, "staticRelayer")}} +} func (s staticRelayer) HealthReport() map[string]error { - return map[string]error{s.Name(): s.Ready()} + hp := s.staticService.HealthReport() + services.CopyHealth(hp, newStaticConfigProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, NewStaticMedianProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, NewStaticPluginProvider(s.lggr).HealthReport()) + return hp } func (s staticRelayer) NewConfigProvider(ctx context.Context, r types.RelayArgs) (types.ConfigProvider, error) { if !equalRelayArgs(r, RelayArgs) { return nil, fmt.Errorf("expected relay args:\n\t%v\nbut got:\n\t%v", RelayArgs, r) } - return staticConfigProvider{}, nil + return newStaticConfigProvider(s.lggr), nil } func (s staticRelayer) NewMedianProvider(ctx context.Context, r types.RelayArgs, p types.PluginArgs) (types.MedianProvider, error) { @@ -84,7 +101,7 @@ func (s staticRelayer) NewMedianProvider(ctx context.Context, r types.RelayArgs, if !reflect.DeepEqual(PluginArgs, p) { return nil, fmt.Errorf("expected plugin args %v but got %v", PluginArgs, p) } - return StaticMedianProvider{}, nil + return NewStaticMedianProvider(s.lggr), nil } func (s staticRelayer) NewPluginProvider(ctx context.Context, r types.RelayArgs, p types.PluginArgs) (types.PluginProvider, error) { @@ -95,7 +112,7 @@ func (s staticRelayer) NewPluginProvider(ctx context.Context, r types.RelayArgs, if !reflect.DeepEqual(PluginArgs, p) { return nil, fmt.Errorf("expected plugin args %v but got %v", PluginArgs, p) } - return StaticPluginProvider{}, nil + return NewStaticPluginProvider(s.lggr), nil } func (s staticRelayer) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { @@ -149,25 +166,32 @@ func newRelayArgsWithProviderType(_type types.OCR2PluginType) types.RelayArgs { func RunPluginRelayer(t *testing.T, p internal.PluginRelayer) { ctx := tests.Context(t) + srvctest.Run(t, p, nil) t.Run("Relayer", func(t *testing.T) { relayer, err := p.NewRelayer(ctx, ConfigTOML, StaticKeystore{}) - require.NoError(t, err) - require.NoError(t, relayer.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) + srvctest.Run(t, relayer, err) RunRelayer(t, relayer) + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), + "PluginRelayerClient", + "PluginRelayerClient.RelayerClient", + "PluginRelayerClient.RelayerClient.staticPluginRelayer", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticMedianProvider", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticPluginProvider", + "PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticConfigProvider", + ) }) } func RunRelayer(t *testing.T, relayer internal.Relayer) { ctx := tests.Context(t) + configProvider, err := relayer.NewConfigProvider(ctx, RelayArgs) + srvctest.Run(t, configProvider, err) + t.Run("ConfigProvider", func(t *testing.T) { t.Parallel() - configProvider, err := relayer.NewConfigProvider(ctx, RelayArgs) - require.NoError(t, err) - require.NoError(t, configProvider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, configProvider.Close()) }) t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() @@ -195,145 +219,133 @@ func RunRelayer(t *testing.T, relayer internal.Relayer) { }) }) + pp, err := relayer.NewPluginProvider(ctx, newRelayArgsWithProviderType(types.Median), PluginArgs) + provider := pp.(types.MedianProvider) + srvctest.Run(t, provider, err) + t.Run("MedianProvider", func(t *testing.T) { t.Parallel() - ra := newRelayArgsWithProviderType(types.Median) - p, err := relayer.NewPluginProvider(ctx, ra, PluginArgs) - provider := p.(types.MedianProvider) - require.NoError(t, err) - require.NoError(t, provider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, provider.Close()) }) - t.Run("ReportingPluginProvider", func(t *testing.T) { + t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() - - t.Run("OffchainConfigDigester", func(t *testing.T) { - t.Parallel() - ocd := provider.OffchainConfigDigester() - gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() - require.NoError(t, err) - assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) - gotConfigDigest, err := ocd.ConfigDigest(contractConfig) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - }) - t.Run("ContractConfigTracker", func(t *testing.T) { - t.Parallel() - cct := provider.ContractConfigTracker() - gotBlockHeight, err := cct.LatestBlockHeight(ctx) - require.NoError(t, err) - assert.Equal(t, blockHeight, gotBlockHeight) - gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) - require.NoError(t, err) - assert.Equal(t, changedInBlock, gotChangedInBlock) - assert.Equal(t, configDigest, gotConfigDigest) - gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) - require.NoError(t, err) - assert.Equal(t, contractConfig, gotContractConfig) - }) - t.Run("ContractTransmitter", func(t *testing.T) { - t.Parallel() - ct := provider.ContractTransmitter() - gotAccount, err := ct.FromAccount() - require.NoError(t, err) - assert.Equal(t, account, gotAccount) - gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - err = ct.Transmit(ctx, reportContext, report, sigs) - require.NoError(t, err) - }) - t.Run("ReportCodec", func(t *testing.T) { - t.Parallel() - rc := provider.ReportCodec() - gotReport, err := rc.BuildReport(pobs) - require.NoError(t, err) - assert.Equal(t, report, gotReport) - gotMedianValue, err := rc.MedianFromReport(report) - require.NoError(t, err) - assert.Equal(t, medianValue, gotMedianValue) - gotMax, err := rc.MaxReportLength(n) - require.NoError(t, err) - assert.Equal(t, max, gotMax) - }) - t.Run("MedianContract", func(t *testing.T) { - t.Parallel() - mc := provider.MedianContract() - gotConfigDigest, gotEpoch, gotRound, err := mc.LatestRoundRequested(ctx, lookbackDuration) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - assert.Equal(t, round, gotRound) - gotConfigDigest, gotEpoch, gotRound, gotLatestAnswer, gotLatestTimestamp, err := mc.LatestTransmissionDetails(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - assert.Equal(t, round, gotRound) - assert.Equal(t, latestAnswer, gotLatestAnswer) - assert.WithinDuration(t, latestTimestamp, gotLatestTimestamp, time.Second) - }) - t.Run("OnchainConfigCodec", func(t *testing.T) { - t.Parallel() - occ := provider.OnchainConfigCodec() - gotEncoded, err := occ.Encode(onchainConfig) - require.NoError(t, err) - assert.Equal(t, encoded, gotEncoded) - gotDecoded, err := occ.Decode(encoded) - require.NoError(t, err) - assert.Equal(t, onchainConfig, gotDecoded) - }) + ocd := provider.OffchainConfigDigester() + gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() + require.NoError(t, err) + assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) + gotConfigDigest, err := ocd.ConfigDigest(contractConfig) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + }) + t.Run("ContractConfigTracker", func(t *testing.T) { + t.Parallel() + cct := provider.ContractConfigTracker() + gotBlockHeight, err := cct.LatestBlockHeight(ctx) + require.NoError(t, err) + assert.Equal(t, blockHeight, gotBlockHeight) + gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) + require.NoError(t, err) + assert.Equal(t, changedInBlock, gotChangedInBlock) + assert.Equal(t, configDigest, gotConfigDigest) + gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) + require.NoError(t, err) + assert.Equal(t, contractConfig, gotContractConfig) + }) + t.Run("ContractTransmitter", func(t *testing.T) { + t.Parallel() + ct := provider.ContractTransmitter() + gotAccount, err := ct.FromAccount() + require.NoError(t, err) + assert.Equal(t, account, gotAccount) + gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + err = ct.Transmit(ctx, reportContext, report, sigs) + require.NoError(t, err) + }) + t.Run("ReportCodec", func(t *testing.T) { + t.Parallel() + rc := provider.ReportCodec() + gotReport, err := rc.BuildReport(pobs) + require.NoError(t, err) + assert.Equal(t, report, gotReport) + gotMedianValue, err := rc.MedianFromReport(report) + require.NoError(t, err) + assert.Equal(t, medianValue, gotMedianValue) + gotMax, err := rc.MaxReportLength(n) + require.NoError(t, err) + assert.Equal(t, max, gotMax) + }) + t.Run("MedianContract", func(t *testing.T) { + t.Parallel() + mc := provider.MedianContract() + gotConfigDigest, gotEpoch, gotRound, err := mc.LatestRoundRequested(ctx, lookbackDuration) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + assert.Equal(t, round, gotRound) + gotConfigDigest, gotEpoch, gotRound, gotLatestAnswer, gotLatestTimestamp, err := mc.LatestTransmissionDetails(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + assert.Equal(t, round, gotRound) + assert.Equal(t, latestAnswer, gotLatestAnswer) + assert.WithinDuration(t, latestTimestamp, gotLatestTimestamp, time.Second) + }) + t.Run("OnchainConfigCodec", func(t *testing.T) { + t.Parallel() + occ := provider.OnchainConfigCodec() + gotEncoded, err := occ.Encode(onchainConfig) + require.NoError(t, err) + assert.Equal(t, encoded, gotEncoded) + gotDecoded, err := occ.Decode(encoded) + require.NoError(t, err) + assert.Equal(t, onchainConfig, gotDecoded) }) }) + pp, err = relayer.NewPluginProvider(ctx, newRelayArgsWithProviderType(types.GenericPlugin), PluginArgs) + srvctest.Run(t, pp, err) + t.Run("PluginProvider", func(t *testing.T) { t.Parallel() - ra := newRelayArgsWithProviderType(types.GenericPlugin) - provider, err := relayer.NewPluginProvider(ctx, ra, PluginArgs) - require.NoError(t, err) - require.NoError(t, provider.Start(ctx)) - t.Cleanup(func() { assert.NoError(t, provider.Close()) }) - t.Run("ReportingPluginProvider", func(t *testing.T) { + t.Run("OffchainConfigDigester", func(t *testing.T) { t.Parallel() - - t.Run("OffchainConfigDigester", func(t *testing.T) { - t.Parallel() - ocd := provider.OffchainConfigDigester() - gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() - require.NoError(t, err) - assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) - gotConfigDigest, err := ocd.ConfigDigest(contractConfig) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - }) - t.Run("ContractConfigTracker", func(t *testing.T) { - t.Parallel() - cct := provider.ContractConfigTracker() - gotBlockHeight, err := cct.LatestBlockHeight(ctx) - require.NoError(t, err) - assert.Equal(t, blockHeight, gotBlockHeight) - gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) - require.NoError(t, err) - assert.Equal(t, changedInBlock, gotChangedInBlock) - assert.Equal(t, configDigest, gotConfigDigest) - gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) - require.NoError(t, err) - assert.Equal(t, contractConfig, gotContractConfig) - }) - t.Run("ContractTransmitter", func(t *testing.T) { - t.Parallel() - ct := provider.ContractTransmitter() - gotAccount, err := ct.FromAccount() - require.NoError(t, err) - assert.Equal(t, account, gotAccount) - gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) - require.NoError(t, err) - assert.Equal(t, configDigest, gotConfigDigest) - assert.Equal(t, epoch, gotEpoch) - err = ct.Transmit(ctx, reportContext, report, sigs) - require.NoError(t, err) - }) + ocd := pp.OffchainConfigDigester() + gotConfigDigestPrefix, err := ocd.ConfigDigestPrefix() + require.NoError(t, err) + assert.Equal(t, configDigestPrefix, gotConfigDigestPrefix) + gotConfigDigest, err := ocd.ConfigDigest(contractConfig) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + }) + t.Run("ContractConfigTracker", func(t *testing.T) { + t.Parallel() + cct := pp.ContractConfigTracker() + gotBlockHeight, err := cct.LatestBlockHeight(ctx) + require.NoError(t, err) + assert.Equal(t, blockHeight, gotBlockHeight) + gotChangedInBlock, gotConfigDigest, err := cct.LatestConfigDetails(ctx) + require.NoError(t, err) + assert.Equal(t, changedInBlock, gotChangedInBlock) + assert.Equal(t, configDigest, gotConfigDigest) + gotContractConfig, err := cct.LatestConfig(ctx, changedInBlock) + require.NoError(t, err) + assert.Equal(t, contractConfig, gotContractConfig) + }) + t.Run("ContractTransmitter", func(t *testing.T) { + t.Parallel() + ct := pp.ContractTransmitter() + gotAccount, err := ct.FromAccount() + require.NoError(t, err) + assert.Equal(t, account, gotAccount) + gotConfigDigest, gotEpoch, err := ct.LatestConfigDigestAndEpoch(ctx) + require.NoError(t, err) + assert.Equal(t, configDigest, gotConfigDigest) + assert.Equal(t, epoch, gotEpoch) + err = ct.Transmit(ctx, reportContext, report, sigs) + require.NoError(t, err) }) }) diff --git a/pkg/loop/internal/test/reporting_plugin.go b/pkg/loop/internal/test/reporting_plugin.go index 20aa4a0d4e..438009064c 100644 --- a/pkg/loop/internal/test/reporting_plugin.go +++ b/pkg/loop/internal/test/reporting_plugin.go @@ -8,7 +8,9 @@ import ( "google.golang.org/grpc" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -18,14 +20,27 @@ type MockConn struct { const ReportingPluginWithMedianProviderName = "reporting-plugin-with-median-provider" -type StaticReportingPluginWithMedianProvider struct { +type staticReportingPluginWithMedianProvider struct { + staticService } -func (s StaticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider { - return StaticMedianProvider{} +func NewStaticReportingPluginWithMedianProvider(lggr logger.Logger) staticReportingPluginWithMedianProvider { + return staticReportingPluginWithMedianProvider{staticService{lggr: logger.Named(lggr, "staticReportingPluginWithMedianProvider")}} } -func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func (s staticReportingPluginWithMedianProvider) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, NewStaticMedianProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp +} + +func (s staticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider { + return NewStaticMedianProvider(s.lggr) +} + +func (s staticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { + //TODO validate config ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -158,6 +173,7 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c if !reflect.DeepEqual(gotDecoded, onchainConfig) { return nil, fmt.Errorf("expected OnchainConfig %s but got %s", onchainConfig, gotDecoded) } + //TODO validate pipelineRunner if err2 := errorLog.SaveError(ctx, errMsg); err2 != nil { return nil, fmt.Errorf("failed to save error: %w", err2) } @@ -169,17 +185,29 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c if err != nil { return nil, fmt.Errorf("failed to send log: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil +} + +type staticReportingPluginWithPluginProvider struct { + staticService +} + +func NewStaticReportingPluginWithPluginProvider(lggr logger.Logger) staticReportingPluginWithPluginProvider { + return staticReportingPluginWithPluginProvider{staticService{lggr: logger.Named(lggr, "staticReportingPluginWithPluginProvider")}} } -type StaticReportingPluginWithPluginProvider struct { +func (s staticReportingPluginWithPluginProvider) HealthReport() map[string]error { + hp := s.staticService.HealthReport() + services.CopyHealth(hp, NewStaticPluginProvider(s.lggr).HealthReport()) + services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport()) + return hp } -func (s StaticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider { - return StaticPluginProvider{} +func (s staticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider { + return NewStaticPluginProvider(s.lggr) } -func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.PluginProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { +func (s staticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.PluginProvider, pipelineRunner types.PipelineRunnerService, telemetry types.TelemetryClient, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) { ocd := provider.OffchainConfigDigester() gotDigestPrefix, err := ocd.ConfigDigestPrefix() if err != nil { @@ -260,5 +288,5 @@ func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx c if err != nil { return nil, fmt.Errorf("failed to send log: %w", err) } - return staticPluginFactory{}, nil + return newStaticPluginFactory(s.lggr), nil } diff --git a/pkg/loop/internal/test/test.go b/pkg/loop/internal/test/test.go index b7b064d53e..131d8905c1 100644 --- a/pkg/loop/internal/test/test.go +++ b/pkg/loop/internal/test/test.go @@ -1,6 +1,7 @@ package test import ( + "context" "math/big" "time" @@ -10,6 +11,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -149,3 +151,30 @@ URL = 'https://test.url' }) payload = []byte("oops") ) + +// TODO could have a default healthReport too +type staticService struct { + lggr logger.Logger +} + +func (s staticService) Name() string { return s.lggr.Name() } + +func (s staticService) Start(ctx context.Context) error { + s.lggr.Info("Started") + return nil +} + +func (s staticService) Close() error { + s.lggr.Info("Closed") + return nil +} + +func (s staticService) Ready() error { + s.lggr.Info("Ready") + return nil +} + +// HealthReport reports only for this single service. Override to include sub-services. +func (s staticService) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Ready()} +} diff --git a/pkg/loop/internal/types.go b/pkg/loop/internal/types.go index 1d508a0572..d291a7a8f6 100644 --- a/pkg/loop/internal/types.go +++ b/pkg/loop/internal/types.go @@ -3,10 +3,12 @@ package internal import ( "context" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) type PluginRelayer interface { + services.Service NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) } diff --git a/pkg/loop/median_service_test.go b/pkg/loop/median_service_test.go index 0794a6d691..9842621be0 100644 --- a/pkg/loop/median_service_test.go +++ b/pkg/loop/median_service_test.go @@ -18,9 +18,10 @@ import ( func TestMedianService(t *testing.T) { t.Parallel() - median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { + lggr := logger.Test(t) + median := loop.NewMedianService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { return NewHelperProcessCommand(loop.PluginMedianName) - }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) + }, test.NewStaticMedianProvider(lggr), test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) hook := median.PluginService.XXXTestHook() require.NoError(t, median.Start(tests.Context(t))) t.Cleanup(func() { assert.NoError(t, median.Close()) }) @@ -50,14 +51,14 @@ func TestMedianService(t *testing.T) { func TestMedianService_recovery(t *testing.T) { t.Parallel() + lggr := logger.Test(t) var limit atomic.Int32 - median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - h := HelperProcessCommand{ + median := loop.NewMedianService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { + return HelperProcessCommand{ Command: loop.PluginMedianName, Limit: int(limit.Add(1)), - } - return h.New() - }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) + }.New() + }, test.NewStaticMedianProvider(lggr), test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) require.NoError(t, median.Start(tests.Context(t))) t.Cleanup(func() { assert.NoError(t, median.Close()) }) diff --git a/pkg/loop/plugin_median.go b/pkg/loop/plugin_median.go index 2855fcbf25..910ed74330 100644 --- a/pkg/loop/plugin_median.go +++ b/pkg/loop/plugin_median.go @@ -40,6 +40,7 @@ type GRPCPluginMedian struct { } func (p *GRPCPluginMedian) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { + //TODO when to start PluginServer return internal.RegisterPluginMedianServer(server, broker, p.BrokerConfig, p.PluginServer) } diff --git a/pkg/loop/plugin_median_test.go b/pkg/loop/plugin_median_test.go index 9c624eda58..0a27ecf1fd 100644 --- a/pkg/loop/plugin_median_test.go +++ b/pkg/loop/plugin_median_test.go @@ -18,14 +18,15 @@ import ( func TestPluginMedian(t *testing.T) { t.Parallel() + lggr := logger.Test(t) stopCh := newStopCh(t) - test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.PluginMedian) + test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.PluginMedian) t.Run("proxy", func(t *testing.T) { - test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { + test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { p := newMedianProvider(t, pr) pm := test.PluginMedianTest{MedianProvider: p} - test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) + test.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) }) }) } diff --git a/pkg/loop/plugin_relayer.go b/pkg/loop/plugin_relayer.go index d5d99ecd0f..1151d70342 100644 --- a/pkg/loop/plugin_relayer.go +++ b/pkg/loop/plugin_relayer.go @@ -43,6 +43,7 @@ type GRPCPluginRelayer struct { } func (p *GRPCPluginRelayer) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { + //TODO when to start return internal.RegisterPluginRelayerServer(server, broker, p.BrokerConfig, p.PluginServer) } diff --git a/pkg/loop/plugin_relayer_test.go b/pkg/loop/plugin_relayer_test.go index 8d40978042..153bd9b78b 100644 --- a/pkg/loop/plugin_relayer_test.go +++ b/pkg/loop/plugin_relayer_test.go @@ -14,8 +14,9 @@ import ( func TestPluginRelayer(t *testing.T) { t.Parallel() + lggr := logger.Test(t) stopCh := newStopCh(t) - test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.RunPluginRelayer) + test.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.RunPluginRelayer) } func TestPluginRelayerExec(t *testing.T) { diff --git a/pkg/loop/process_test.go b/pkg/loop/process_test.go index 94ae500227..e5f4d22d41 100644 --- a/pkg/loop/process_test.go +++ b/pkg/loop/process_test.go @@ -8,14 +8,13 @@ import ( type HelperProcessCommand test.HelperProcessCommand -func (h *HelperProcessCommand) New() *exec.Cmd { +func (h HelperProcessCommand) New() *exec.Cmd { h.CommandLocation = "./internal/test/cmd/main.go" - return (test.HelperProcessCommand)(*h).New() + return (test.HelperProcessCommand)(h).New() } func NewHelperProcessCommand(command string) *exec.Cmd { - h := HelperProcessCommand{ + return HelperProcessCommand{ Command: command, - } - return h.New() + }.New() } diff --git a/pkg/loop/relayer_service_test.go b/pkg/loop/relayer_service_test.go index c273ac0bda..47af977fb8 100644 --- a/pkg/loop/relayer_service_test.go +++ b/pkg/loop/relayer_service_test.go @@ -2,12 +2,16 @@ package loop_test import ( "os/exec" + "strings" "sync/atomic" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" @@ -19,15 +23,26 @@ import ( func TestRelayerService(t *testing.T) { t.Parallel() + serviceNames := []string{ + "RelayerService", + //TODO why not? "RelayerService.PluginRelayerClient", + "RelayerService.PluginRelayerClient.RelayerClient", + //TODO why not? "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticMedianProvider", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticPluginProvider", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticConfigProvider", + } + relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { return NewHelperProcessCommand(loop.PluginRelayerName) }, test.ConfigTOML, test.StaticKeystore{}) hook := relayer.XXXTestHook() - require.NoError(t, relayer.Start(tests.Context(t))) - t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) + srvctest.Run(t, relayer, nil) t.Run("control", func(t *testing.T) { test.RunRelayer(t, relayer) + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), serviceNames...) }) t.Run("Kill", func(t *testing.T) { @@ -37,6 +52,7 @@ func TestRelayerService(t *testing.T) { time.Sleep(2 * internal.KeepAliveTickDuration) test.RunRelayer(t, relayer) + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), serviceNames...) }) t.Run("Reset", func(t *testing.T) { @@ -46,7 +62,10 @@ func TestRelayerService(t *testing.T) { time.Sleep(2 * internal.KeepAliveTickDuration) test.RunRelayer(t, relayer) + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), serviceNames...) }) + //TODO check for a subset of names? + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), serviceNames[:4]...) } func TestRelayerService_recovery(t *testing.T) { @@ -59,28 +78,71 @@ func TestRelayerService_recovery(t *testing.T) { } return h.New() }, test.ConfigTOML, test.StaticKeystore{}) - require.NoError(t, relayer.Start(tests.Context(t))) - t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) + srvctest.Run(t, relayer, nil) test.RunRelayer(t, relayer) + + srvctest.AssertHealthReportNames(t, relayer.HealthReport(), + "RelayerService", + "RelayerService.PluginRelayerClient.RelayerClient", + //TODO why no more?! + ) } func TestRelayerService_HealthReport(t *testing.T) { - lggr := logger.Named(logger.Test(t), "Foo") + //TODO dedupe these? + serviceNames := []string{ + "RelayerService", + //TODO logs "RelayerService.PluginRelayerClient", + //TODO logs "RelayerService.PluginRelayerClient.Relayer", + "RelayerService.PluginRelayerClient.RelayerClient", + //TODO logs "Root.RelayerService.RelayerPluginServer", + //TODO logs "RelayerService.staticPluginRelayer.staticRelayer", + //TODO an three more variants.... + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticMedianProvider", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticPluginProvider", + "RelayerService.PluginRelayerClient.RelayerClient.staticPluginRelayer.staticRelayer.staticConfigProvider"} + + lggr, obsLogs := logger.TestObserved(t, zapcore.DebugLevel) + t.Cleanup(AssertLogsObserved(t, obsLogs, serviceNames)) s := loop.NewRelayerService(lggr, loop.GRPCOpts{}, func() *exec.Cmd { return test.HelperProcessCommand{Command: loop.PluginRelayerName}.New() }, test.ConfigTOML, test.StaticKeystore{}) - srvctest.AssertHealthReportNames(t, s.HealthReport(), - "Foo.RelayerService") + srvctest.AssertHealthReportNames(t, s.HealthReport(), serviceNames[0]) - require.NoError(t, s.Start(tests.Context(t))) - t.Cleanup(func() { require.NoError(t, s.Close()) }) + srvctest.Run(t, s, nil) require.Eventually(t, func() bool { return s.Ready() == nil }, tests.WaitTimeout(t)/2, time.Second, s.Ready()) - srvctest.AssertHealthReportNames(t, s.HealthReport(), - "Foo.RelayerService", - "Foo.RelayerService.PluginRelayerClient.ChainRelayerClient", - "staticRelayer") + srvctest.AssertHealthReportNames(t, s.HealthReport(), serviceNames...) + +} + +// TODO observed only or at least? +func AssertLogsObserved(t *testing.T, obsLogs *observer.ObservedLogs, names []string) func() { + return func() { + t.Helper() + + obsNames := map[string]struct{}{} + for _, l := range obsLogs.All() { + obsNames[l.LoggerName] = struct{}{} + } + var failed bool + for _, n := range names { + if _, ok := obsNames[n]; !ok { + t.Errorf("No logs observed for service: %s", n) + failed = true + } + } + if failed { + keys := maps.Keys(obsNames) + slices.Sort(keys) + t.Logf("Loggers observed:\n%s\n", strings.Join(keys, "\n")) + for _, l := range obsLogs.All() { + t.Log(l) + } + } + } } diff --git a/pkg/loop/reportingplugins/grpc.go b/pkg/loop/reportingplugins/grpc.go index c379343fa7..f72a7da656 100644 --- a/pkg/loop/reportingplugins/grpc.go +++ b/pkg/loop/reportingplugins/grpc.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -39,40 +40,37 @@ type GRPCService[T types.PluginProvider] struct { pluginClient *internal.ReportingPluginServiceClient } -type serverAdapter func( - context.Context, - types.ReportingPluginServiceConfig, - grpc.ClientConnInterface, - types.PipelineRunnerService, - types.TelemetryService, - types.ErrorLog, -) (types.ReportingPluginFactory, error) +type serverAdapter[T types.PluginProvider] struct { + services.StateMachine + BrokerConfig loop.BrokerConfig + ProviderServer ProviderServer[T] + GRPCBroker *plugin.GRPCBroker +} + +func (s *serverAdapter[T]) Start(ctx context.Context) error { return s.ProviderServer.Start(ctx) } + +func (s *serverAdapter[T]) Close() error { return s.ProviderServer.Close() } -func (s serverAdapter) NewReportingPluginFactory( +func (s *serverAdapter[T]) HealthReport() map[string]error { return s.ProviderServer.HealthReport() } + +func (s *serverAdapter[T]) Name() string { return s.ProviderServer.Name() } + +func (s *serverAdapter[T]) NewReportingPluginFactory( ctx context.Context, - config types.ReportingPluginServiceConfig, + cfg types.ReportingPluginServiceConfig, conn grpc.ClientConnInterface, pr types.PipelineRunnerService, ts types.TelemetryService, - errorLog types.ErrorLog, + el types.ErrorLog, ) (types.ReportingPluginFactory, error) { - return s(ctx, config, conn, pr, ts, errorLog) + provider := s.ProviderServer.ConnToProvider(conn, s.GRPCBroker, s.BrokerConfig) + return s.ProviderServer.NewReportingPluginFactory(ctx, cfg, provider, pr, ts, el) } func (g *GRPCService[T]) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { - adapter := func( - ctx context.Context, - cfg types.ReportingPluginServiceConfig, - conn grpc.ClientConnInterface, - pr types.PipelineRunnerService, - ts types.TelemetryService, - el types.ErrorLog, - ) (types.ReportingPluginFactory, error) { - provider := g.PluginServer.ConnToProvider(conn, broker, g.BrokerConfig) - tc := internal.NewTelemetryClient(ts) - return g.PluginServer.NewReportingPluginFactory(ctx, cfg, provider, pr, tc, el) - } - return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, serverAdapter(adapter)) + impl := &serverAdapter[T]{BrokerConfig: g.BrokerConfig, ProviderServer: g.PluginServer, GRPCBroker: broker} + //TODO when to start + return internal.RegisterReportingPluginServiceServer(server, broker, g.BrokerConfig, impl) } // GRPCClient implements [plugin.GRPCPlugin] and returns the pluginClient [types.PluginClient], updated with the new broker and conn. diff --git a/pkg/loop/reportingplugins/grpc_test.go b/pkg/loop/reportingplugins/grpc_test.go index 7a3745159d..ec8e83a674 100644 --- a/pkg/loop/reportingplugins/grpc_test.go +++ b/pkg/loop/reportingplugins/grpc_test.go @@ -40,7 +40,7 @@ func TestGRPCService_MedianProvider(t *testing.T) { t, test.ReportingPluginWithMedianProviderName, &reportingplugins.GRPCService[types.MedianProvider]{ - PluginServer: test.StaticReportingPluginWithMedianProvider{}, + PluginServer: test.NewStaticReportingPluginWithMedianProvider(logger.Test(t)), BrokerConfig: loop.BrokerConfig{ Logger: logger.Test(t), StopCh: stopCh, @@ -58,7 +58,7 @@ func TestGRPCService_PluginProvider(t *testing.T) { t, reportingplugins.PluginServiceName, &reportingplugins.GRPCService[types.PluginProvider]{ - PluginServer: test.StaticReportingPluginWithPluginProvider{}, + PluginServer: test.NewStaticReportingPluginWithPluginProvider(logger.Test(t)), BrokerConfig: loop.BrokerConfig{ Logger: logger.Test(t), StopCh: stopCh, diff --git a/pkg/services/srvctest/health.go b/pkg/services/srvctest/health.go index f2ba2268b5..52674a8f37 100644 --- a/pkg/services/srvctest/health.go +++ b/pkg/services/srvctest/health.go @@ -4,10 +4,20 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) +func Run(t *testing.T, s services.Service, err error) { + require.NoError(t, err) + require.NoError(t, s.Start(tests.Context(t))) + t.Cleanup(func() { assert.NoError(t, s.Close()) }) +} + func AssertHealthReportNames(t *testing.T, hp map[string]error, names ...string) { t.Helper() keys := maps.Keys(hp) diff --git a/pkg/types/provider_median.go b/pkg/types/provider_median.go index 29010d82d7..5fdda2d20e 100644 --- a/pkg/types/provider_median.go +++ b/pkg/types/provider_median.go @@ -5,6 +5,8 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/services" ) // MedianProvider provides all components needed for a median OCR2 plugin. @@ -16,11 +18,12 @@ type MedianProvider interface { } type PluginMedian interface { + services.Service // NewMedianFactory returns a new ReportingPluginFactory. If provider implements GRPCClientConn, it can be forwarded efficiently via proxy. NewMedianFactory(ctx context.Context, provider MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog ErrorLog) (ReportingPluginFactory, error) } type ReportingPluginFactory interface { - Service + services.Service libocr.ReportingPluginFactory } diff --git a/pkg/types/reporting_plugin_service.go b/pkg/types/reporting_plugin_service.go index 0d4535d89b..3f0f18db2b 100644 --- a/pkg/types/reporting_plugin_service.go +++ b/pkg/types/reporting_plugin_service.go @@ -4,6 +4,8 @@ import ( "context" "google.golang.org/grpc" + + "github.com/smartcontractkit/chainlink-common/pkg/services" ) type ReportingPluginServiceConfig struct { @@ -17,6 +19,7 @@ type ReportingPluginServiceConfig struct { // ReportingPluginClient is the client interface to a plugin running // as a generic job (job type = GenericPlugin) inside the core node. type ReportingPluginClient interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, grpcProvider grpc.ClientConnInterface, pipelineRunner PipelineRunnerService, telemetry TelemetryService, errorLog ErrorLog) (ReportingPluginFactory, error) } @@ -25,5 +28,6 @@ type ReportingPluginClient interface { // with the passthrough provider connection converted to the provider // expected by the plugin. type ReportingPluginServer[T PluginProvider] interface { + services.Service NewReportingPluginFactory(ctx context.Context, config ReportingPluginServiceConfig, provider T, pipelineRunner PipelineRunnerService, telemetry TelemetryClient, errorLog ErrorLog) (ReportingPluginFactory, error) }