From 6ea97859abe70f89a31185320c4d3e8a4bdd9365 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Mon, 16 Oct 2023 13:49:25 +0100 Subject: [PATCH] WIP --- pkg/loop/median_service.go | 11 +- pkg/loop/median_service_test.go | 7 +- pkg/loop/plugin_median_test.go | 9 +- pkg/loop/plugin_relayer_test.go | 5 +- pkg/loop/process_test.go | 23 +++ pkg/loop/relayer_service.go | 24 +-- pkg/loop/relayer_service_test.go | 7 +- .../grpc.go} | 11 +- .../grpc_test.go} | 17 ++- .../loopp_service.go} | 26 ++-- .../loopp_service_test.go} | 37 ++++- pkg/loop/service.go | 74 +++++++--- pkg/loop/service_test.go | 37 ----- .../{plugin_test.go => testutils/cmd/main.go} | 137 ++++-------------- pkg/loop/testutils/service.go | 60 ++++++++ 15 files changed, 257 insertions(+), 228 deletions(-) create mode 100644 pkg/loop/process_test.go rename pkg/loop/{grpc_plugin_service.go => reporting_plugins/grpc.go} (92%) rename pkg/loop/{grpc_plugin_service_test.go => reporting_plugins/grpc_test.go} (55%) rename pkg/loop/{plugin_service.go => reporting_plugins/loopp_service.go} (54%) rename pkg/loop/{plugin_service_test.go => reporting_plugins/loopp_service_test.go} (60%) delete mode 100644 pkg/loop/service_test.go rename pkg/loop/{plugin_test.go => testutils/cmd/main.go} (53%) create mode 100644 pkg/loop/testutils/service.go diff --git a/pkg/loop/median_service.go b/pkg/loop/median_service.go index 08e2b1a8b9..598b3ebac2 100644 --- a/pkg/loop/median_service.go +++ b/pkg/loop/median_service.go @@ -10,14 +10,13 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/types" - "github.com/smartcontractkit/chainlink-relay/pkg/utils" ) var _ ocrtypes.ReportingPluginFactory = (*MedianService)(nil) // MedianService is a [types.Service] that maintains an internal [types.PluginMedian]. type MedianService struct { - pluginService[*GRPCPluginMedian, types.ReportingPluginFactory] + PluginService[*GRPCPluginMedian, types.ReportingPluginFactory] } // NewMedianService returns a new [*MedianService]. @@ -34,15 +33,13 @@ func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cm lggr = logger.Named(lggr, "MedianService") var ms MedianService broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts} - ms.init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh) + ms.Init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh) return &ms } func (m *MedianService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) { - ctx, cancel := utils.ContextFromChan(m.pluginService.stopCh) - defer cancel() - if err := m.wait(ctx); err != nil { + if err := m.Wait(context.Background()); err != nil { return nil, ocrtypes.ReportingPluginInfo{}, err } - return m.service.NewReportingPlugin(config) + return m.Service.NewReportingPlugin(config) } diff --git a/pkg/loop/median_service_test.go b/pkg/loop/median_service_test.go index 2774212f95..cd5cd701d5 100644 --- a/pkg/loop/median_service_test.go +++ b/pkg/loop/median_service_test.go @@ -2,7 +2,6 @@ package loop_test import ( "os/exec" - "strconv" "sync/atomic" "testing" "time" @@ -19,9 +18,9 @@ import ( func TestMedianService(t *testing.T) { t.Parallel() median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(loop.PluginMedianName) + return HelperProcess(loop.PluginMedianName, HelperProcessOptions{}) }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) - hook := median.TestHook() + hook := median.PluginService.XXXTestHook() require.NoError(t, median.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, median.Close()) }) @@ -52,7 +51,7 @@ func TestMedianService_recovery(t *testing.T) { t.Parallel() var limit atomic.Int32 median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(loop.PluginMedianName, strconv.Itoa(int(limit.Add(1)))) + return HelperProcess(loop.PluginMedianName, HelperProcessOptions{Limit: int(limit.Add(1))}) }, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{}) require.NoError(t, median.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, median.Close()) }) diff --git a/pkg/loop/plugin_median_test.go b/pkg/loop/plugin_median_test.go index dfb75cfff7..28393cf7c3 100644 --- a/pkg/loop/plugin_median_test.go +++ b/pkg/loop/plugin_median_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) @@ -19,13 +20,13 @@ func TestPluginMedian(t *testing.T) { t.Parallel() stopCh := newStopCh(t) - testPlugin(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginMedian) + testutils.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginMedian) t.Run("proxy", func(t *testing.T) { - testPlugin(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { + testutils.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) { p := newMedianProvider(t, pr) pm := test.PluginMedianTest{MedianProvider: p} - testPlugin(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) + testutils.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian) }) }) } @@ -35,7 +36,7 @@ func TestPluginMedianExec(t *testing.T) { stopCh := newStopCh(t) median := loop.GRPCPluginMedian{BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}} cc := median.ClientConfig() - cc.Cmd = helperProcess(loop.PluginMedianName) + cc.Cmd = HelperProcess(loop.PluginMedianName, HelperProcessOptions{}) c := plugin.NewClient(cc) t.Cleanup(c.Kill) client, err := c.Client() diff --git a/pkg/loop/plugin_relayer_test.go b/pkg/loop/plugin_relayer_test.go index 4e444ad694..37d39f1126 100644 --- a/pkg/loop/plugin_relayer_test.go +++ b/pkg/loop/plugin_relayer_test.go @@ -9,13 +9,14 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils" ) func TestPluginRelayer(t *testing.T) { t.Parallel() stopCh := newStopCh(t) - testPlugin(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginRelayer) + testutils.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginRelayer) } func TestPluginRelayerExec(t *testing.T) { @@ -30,7 +31,7 @@ func TestPluginRelayerExec(t *testing.T) { func newPluginRelayerExec(t *testing.T, stopCh <-chan struct{}) loop.PluginRelayer { relayer := loop.GRPCPluginRelayer{BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}} cc := relayer.ClientConfig() - cc.Cmd = helperProcess(loop.PluginRelayerName) + cc.Cmd = HelperProcess(loop.PluginRelayerName, HelperProcessOptions{}) c := plugin.NewClient(cc) t.Cleanup(c.Kill) client, err := c.Client() diff --git a/pkg/loop/process_test.go b/pkg/loop/process_test.go new file mode 100644 index 0000000000..f66e2bc71e --- /dev/null +++ b/pkg/loop/process_test.go @@ -0,0 +1,23 @@ +package loop_test + +import ( + "fmt" + "os" + "os/exec" +) + +type HelperProcessOptions struct { + Limit int +} + +func HelperProcess(command string, opts HelperProcessOptions) *exec.Cmd { + cmdArgs := []string{ + "go", "run", "./testutils/cmd/main.go", fmt.Sprintf("-cmd=%s", command), + } + if opts.Limit != 0 { + cmdArgs = append(cmdArgs, fmt.Sprintf("-limit=%d", opts.Limit)) + } + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + cmd.Env = append(os.Environ()) + return cmd +} diff --git a/pkg/loop/relayer_service.go b/pkg/loop/relayer_service.go index fd650cba7d..66e749739f 100644 --- a/pkg/loop/relayer_service.go +++ b/pkg/loop/relayer_service.go @@ -17,7 +17,7 @@ var _ Relayer = (*RelayerService)(nil) // RelayerService is a [types.Service] that maintains an internal [Relayer]. type RelayerService struct { - pluginService[*GRPCPluginRelayer, Relayer] + PluginService[*GRPCPluginRelayer, Relayer] } // NewRelayerService returns a new [*RelayerService]. @@ -38,41 +38,41 @@ func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.C lggr = logger.Named(lggr, "RelayerService") var rs RelayerService broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts} - rs.init(PluginRelayerName, &GRPCPluginRelayer{BrokerConfig: broker}, newService, lggr, cmd, stopCh) + rs.Init(PluginRelayerName, &GRPCPluginRelayer{BrokerConfig: broker}, newService, lggr, cmd, stopCh) return &rs } func (r *RelayerService) NewConfigProvider(ctx context.Context, args types.RelayArgs) (types.ConfigProvider, error) { - if err := r.wait(ctx); err != nil { + if err := r.Wait(ctx); err != nil { return nil, err } - return r.service.NewConfigProvider(ctx, args) + return r.Service.NewConfigProvider(ctx, args) } func (r *RelayerService) NewPluginProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.PluginProvider, error) { - if err := r.wait(ctx); err != nil { + if err := r.Wait(ctx); err != nil { return nil, err } - return r.service.NewPluginProvider(ctx, rargs, pargs) + return r.Service.NewPluginProvider(ctx, rargs, pargs) } func (r *RelayerService) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { - if err := r.wait(ctx); err != nil { + if err := r.Wait(ctx); err != nil { return types.ChainStatus{}, err } - return r.service.GetChainStatus(ctx) + return r.Service.GetChainStatus(ctx) } func (r *RelayerService) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (nodes []types.NodeStatus, nextPageToken string, total int, err error) { - if err := r.wait(ctx); err != nil { + if err := r.Wait(ctx); err != nil { return nil, "", -1, err } - return r.service.ListNodeStatuses(ctx, pageSize, pageToken) + return r.Service.ListNodeStatuses(ctx, pageSize, pageToken) } func (r *RelayerService) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { - if err := r.wait(ctx); err != nil { + if err := r.Wait(ctx); err != nil { return err } - return r.service.Transact(ctx, from, to, amount, balanceCheck) + return r.Service.Transact(ctx, from, to, amount, balanceCheck) } diff --git a/pkg/loop/relayer_service_test.go b/pkg/loop/relayer_service_test.go index 63289b674b..d47d2ece8b 100644 --- a/pkg/loop/relayer_service_test.go +++ b/pkg/loop/relayer_service_test.go @@ -2,7 +2,6 @@ package loop_test import ( "os/exec" - "strconv" "sync/atomic" "testing" "time" @@ -19,9 +18,9 @@ import ( func TestRelayerService(t *testing.T) { t.Parallel() relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(loop.PluginRelayerName) + return HelperProcess(loop.PluginRelayerName, HelperProcessOptions{}) }, test.ConfigTOML, test.StaticKeystore{}) - hook := relayer.TestHook() + hook := relayer.XXXTestHook() require.NoError(t, relayer.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) @@ -52,7 +51,7 @@ func TestRelayerService_recovery(t *testing.T) { t.Parallel() var limit atomic.Int32 relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(loop.PluginRelayerName, strconv.Itoa(int(limit.Add(1)))) + return HelperProcess(loop.PluginRelayerName, HelperProcessOptions{Limit: int(limit.Add(1))}) }, test.ConfigTOML, test.StaticKeystore{}) require.NoError(t, relayer.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, relayer.Close()) }) diff --git a/pkg/loop/grpc_plugin_service.go b/pkg/loop/reporting_plugins/grpc.go similarity index 92% rename from pkg/loop/grpc_plugin_service.go rename to pkg/loop/reporting_plugins/grpc.go index 1c980447df..dff5134424 100644 --- a/pkg/loop/grpc_plugin_service.go +++ b/pkg/loop/reporting_plugins/grpc.go @@ -1,4 +1,4 @@ -package loop +package reporting_plugins import ( "context" @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/go-plugin" "google.golang.org/grpc" + "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) @@ -22,7 +23,7 @@ func PluginGenericHandshakeConfig() plugin.HandshakeConfig { type ProviderServer[T types.PluginProvider] interface { types.PluginServer[T] - ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig BrokerConfig) T + ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig loop.BrokerConfig) T } // GRPCPluginService is the loopp interface for a plugin that can @@ -31,7 +32,7 @@ type ProviderServer[T types.PluginProvider] interface { type GRPCPluginService[T types.PluginProvider] struct { plugin.NetRPCUnsupportedPlugin - BrokerConfig + loop.BrokerConfig PluginServer ProviderServer[T] @@ -68,8 +69,8 @@ func (p *GRPCPluginService[T]) ClientConfig() *plugin.ClientConfig { HandshakeConfig: PluginGenericHandshakeConfig(), Plugins: map[string]plugin.Plugin{PluginServiceName: p}, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, - GRPCDialOptions: p.DialOpts, - Logger: HCLogLogger(p.Logger), + GRPCDialOptions: p.BrokerConfig.DialOpts, + Logger: loop.HCLogLogger(p.BrokerConfig.Logger), } } diff --git a/pkg/loop/grpc_plugin_service_test.go b/pkg/loop/reporting_plugins/grpc_test.go similarity index 55% rename from pkg/loop/grpc_plugin_service_test.go rename to pkg/loop/reporting_plugins/grpc_test.go index 47058ba1c2..8ae9344dfa 100644 --- a/pkg/loop/grpc_plugin_service_test.go +++ b/pkg/loop/reporting_plugins/grpc_test.go @@ -1,22 +1,33 @@ -package loop_test +package reporting_plugins_test import ( "testing" + "time" "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) +func newStopCh(t *testing.T) <-chan struct{} { + stopCh := make(chan struct{}) + if d, ok := t.Deadline(); ok { + time.AfterFunc(time.Until(d), func() { close(stopCh) }) + } + return stopCh +} + func TestGRPCPluginService(t *testing.T) { t.Parallel() stopCh := newStopCh(t) - testPlugin( + testutils.PluginTest( t, PluginGenericMedian, - &loop.GRPCPluginService[types.MedianProvider]{ + &reporting_plugins.GRPCPluginService[types.MedianProvider]{ PluginServer: test.StaticGenericPluginMedianProvider{}, BrokerConfig: loop.BrokerConfig{ Logger: logger.Test(t), diff --git a/pkg/loop/plugin_service.go b/pkg/loop/reporting_plugins/loopp_service.go similarity index 54% rename from pkg/loop/plugin_service.go rename to pkg/loop/reporting_plugins/loopp_service.go index 9838081123..117fd720f2 100644 --- a/pkg/loop/plugin_service.go +++ b/pkg/loop/reporting_plugins/loopp_service.go @@ -1,4 +1,4 @@ -package loop +package reporting_plugins import ( "context" @@ -9,21 +9,21 @@ import ( "google.golang.org/grpc" "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/types" - "github.com/smartcontractkit/chainlink-relay/pkg/utils" ) -var _ ocrtypes.ReportingPluginFactory = (*PluginService)(nil) +var _ ocrtypes.ReportingPluginFactory = (*LOOPPService)(nil) // PluginService is a [types.Service] that maintains an internal [types.PluginClient]. -type PluginService struct { - pluginService[*GRPCPluginService[types.PluginProvider], types.ReportingPluginFactory] +type LOOPPService struct { + loop.PluginService[*GRPCPluginService[types.PluginProvider], types.ReportingPluginFactory] } // NewPluginService returns a new [*PluginService]. // cmd must return a new exec.Cmd each time it is called. // We use a `conn` here rather than a provider so that we can enforce proxy providers being passed in. -func NewPluginService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config types.PluginServiceConfig, providerConn grpc.ClientConnInterface, errorLog types.ErrorLog) *PluginService { +func NewLOOPPService(lggr logger.Logger, grpcOpts loop.GRPCOpts, cmd func() *exec.Cmd, config types.PluginServiceConfig, providerConn grpc.ClientConnInterface, errorLog types.ErrorLog) *LOOPPService { newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) { plug, ok := instance.(types.PluginClient) if !ok { @@ -33,17 +33,15 @@ func NewPluginService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cm } stopCh := make(chan struct{}) lggr = logger.Named(lggr, "GenericService") - var ps PluginService - broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts} - ps.init(PluginServiceName, &GRPCPluginService[types.PluginProvider]{BrokerConfig: broker}, newService, lggr, cmd, stopCh) + var ps LOOPPService + broker := loop.BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts} + ps.Init(PluginServiceName, &GRPCPluginService[types.PluginProvider]{BrokerConfig: broker}, newService, lggr, cmd, stopCh) return &ps } -func (g *PluginService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) { - ctx, cancel := utils.ContextFromChan(g.pluginService.stopCh) - defer cancel() - if err := g.wait(ctx); err != nil { +func (g *LOOPPService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) { + if err := g.Wait(context.Background()); err != nil { return nil, ocrtypes.ReportingPluginInfo{}, err } - return g.service.NewReportingPlugin(config) + return g.Service.NewReportingPlugin(config) } diff --git a/pkg/loop/plugin_service_test.go b/pkg/loop/reporting_plugins/loopp_service_test.go similarity index 60% rename from pkg/loop/plugin_service_test.go rename to pkg/loop/reporting_plugins/loopp_service_test.go index 5714114e30..2dd6012dce 100644 --- a/pkg/loop/plugin_service_test.go +++ b/pkg/loop/reporting_plugins/loopp_service_test.go @@ -1,8 +1,9 @@ -package loop_test +package reporting_plugins_test import ( + "fmt" + "os" "os/exec" - "strconv" "sync/atomic" "testing" "time" @@ -13,10 +14,30 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins" "github.com/smartcontractkit/chainlink-relay/pkg/types" "github.com/smartcontractkit/chainlink-relay/pkg/utils" ) +type HelperProcessOptions struct { + Limit int +} + +func HelperProcess(command string, opts HelperProcessOptions) *exec.Cmd { + cmdArgs := []string{ + "go", "run", "../testutils/cmd/main.go", fmt.Sprintf("-cmd=%s", command), + } + if opts.Limit != 0 { + cmdArgs = append(cmdArgs, fmt.Sprintf("-limit=%d", opts.Limit)) + } + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + cmd.Env = append(os.Environ()) + return cmd +} + +// PluginGenericMedian is a generic plugin which wants a median provider +const PluginGenericMedian = "generic-median" + func TestGenericService(t *testing.T) { t.Parallel() @@ -26,13 +47,13 @@ func TestGenericService(t *testing.T) { // A generic plugin with a median provider {Plugin: PluginGenericMedian}, // A generic plugin with a plugin provider - {Plugin: loop.PluginServiceName}, + {Plugin: reporting_plugins.PluginServiceName}, } for _, ts := range tests { - genericSvc := loop.NewPluginService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(ts.Plugin) + genericSvc := reporting_plugins.NewLOOPPService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { + return HelperProcess(ts.Plugin, HelperProcessOptions{}) }, types.PluginServiceConfig{}, test.MockConn{}, &test.StaticErrorLog{}) - hook := genericSvc.TestHook() + hook := genericSvc.XXXTestHook() require.NoError(t, genericSvc.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, genericSvc.Close()) }) @@ -63,8 +84,8 @@ func TestGenericService(t *testing.T) { func TestGenericService_recovery(t *testing.T) { t.Parallel() var limit atomic.Int32 - genericSvc := loop.NewPluginService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { - return helperProcess(PluginGenericMedian, strconv.Itoa(int(limit.Add(1)))) + genericSvc := reporting_plugins.NewLOOPPService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { + return HelperProcess(PluginGenericMedian, HelperProcessOptions{Limit: int(limit.Add(1))}) }, types.PluginServiceConfig{}, test.MockConn{}, &test.StaticErrorLog{}) require.NoError(t, genericSvc.Start(utils.Context(t))) t.Cleanup(func() { assert.NoError(t, genericSvc.Close()) }) diff --git a/pkg/loop/service.go b/pkg/loop/service.go index 7a40fa2ec6..0a8c4bd6a7 100644 --- a/pkg/loop/service.go +++ b/pkg/loop/service.go @@ -19,7 +19,7 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/utils" ) -const keepAliveTickDuration = 5 * time.Second //TODO from config +const KeepAliveTickDuration = 5 * time.Second //TODO from config type BrokerConfig = internal.BrokerConfig @@ -31,7 +31,7 @@ type grpcPlugin interface { // pluginService is a [types.Service] wrapper that maintains an internal [types.Service] created from a [grpcPlugin] // client instance by launching and re-launching as necessary. -type pluginService[P grpcPlugin, S types.Service] struct { +type PluginService[P grpcPlugin, S types.Service] struct { utils.StartStopOnce pluginName string @@ -50,12 +50,12 @@ type pluginService[P grpcPlugin, S types.Service] struct { newService func(context.Context, any) (S, error) serviceCh chan struct{} // closed when service is available - service S + Service S - testInterrupt chan func(*pluginService[P, S]) // tests only (via TestHook) to enable access to internals without racing + testInterrupt chan func(*PluginService[P, S]) // tests only (via TestHook) to enable access to internals without racing } -func (s *pluginService[P, S]) init(pluginName string, p P, newService func(context.Context, any) (S, error), lggr logger.Logger, cmd func() *exec.Cmd, stopCh chan struct{}) { +func (s *PluginService[P, S]) Init(pluginName string, p P, newService func(context.Context, any) (S, error), lggr logger.Logger, cmd func() *exec.Cmd, stopCh chan struct{}) { s.pluginName = pluginName s.lggr = lggr s.cmd = cmd @@ -65,12 +65,12 @@ func (s *pluginService[P, S]) init(pluginName string, p P, newService func(conte s.serviceCh = make(chan struct{}) } -func (s *pluginService[P, S]) keepAlive() { +func (s *PluginService[P, S]) keepAlive() { defer s.wg.Done() - s.lggr.Debugw("Staring keepAlive", "tick", keepAliveTickDuration) + s.lggr.Debugw("Staring keepAlive", "tick", KeepAliveTickDuration) - t := time.NewTicker(keepAliveTickDuration) + t := time.NewTicker(KeepAliveTickDuration) defer t.Stop() for { select { @@ -96,7 +96,7 @@ func (s *pluginService[P, S]) keepAlive() { } } -func (s *pluginService[P, S]) tryLaunch(old plugin.ClientProtocol) (err error) { +func (s *PluginService[P, S]) tryLaunch(old plugin.ClientProtocol) (err error) { if old != nil && s.clientProtocol != old { // already replaced by another routine return nil @@ -108,7 +108,7 @@ func (s *pluginService[P, S]) tryLaunch(old plugin.ClientProtocol) (err error) { return } -func (s *pluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, error) { +func (s *PluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, error) { ctx, cancelFn := utils.ContextFromChan(s.stopCh) defer cancelFn() @@ -138,7 +138,7 @@ func (s *pluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, e case <-s.serviceCh: // s.service already set default: - s.service, err = s.newService(ctx, i) + s.Service, err = s.newService(ctx, i) if err != nil { abort() return nil, nil, fmt.Errorf("failed to create service: %w", err) @@ -148,7 +148,7 @@ func (s *pluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, e return client, cp, nil } -func (s *pluginService[P, S]) Start(context.Context) error { +func (s *PluginService[P, S]) Start(context.Context) error { return s.StartOnce("PluginService", func() error { s.wg.Add(1) go s.keepAlive() @@ -156,36 +156,36 @@ func (s *pluginService[P, S]) Start(context.Context) error { }) } -func (s *pluginService[P, S]) Ready() error { +func (s *PluginService[P, S]) Ready() error { select { case <-s.serviceCh: - return s.service.Ready() + return s.Service.Ready() default: return ErrPluginUnavailable } } -func (s *pluginService[P, S]) Name() string { return s.lggr.Name() } +func (s *PluginService[P, S]) Name() string { return s.lggr.Name() } -func (s *pluginService[P, S]) HealthReport() map[string]error { +func (s *PluginService[P, S]) HealthReport() map[string]error { select { case <-s.serviceCh: hr := map[string]error{s.Name(): s.Healthy()} - maps.Copy(hr, s.service.HealthReport()) + maps.Copy(hr, s.Service.HealthReport()) return hr default: return map[string]error{s.Name(): ErrPluginUnavailable} } } -func (s *pluginService[P, S]) Close() error { +func (s *PluginService[P, S]) Close() error { return s.StopOnce("PluginService", func() (err error) { close(s.stopCh) s.wg.Wait() select { case <-s.serviceCh: - if cerr := s.service.Close(); !errors.Is(cerr, context.Canceled) && status.Code(cerr) != codes.Canceled { + if cerr := s.Service.Close(); !errors.Is(cerr, context.Canceled) && status.Code(cerr) != codes.Canceled { err = errors.Join(err, cerr) } default: @@ -195,7 +195,7 @@ func (s *pluginService[P, S]) Close() error { }) } -func (s *pluginService[P, S]) closeClient() (err error) { +func (s *PluginService[P, S]) closeClient() (err error) { if s.clientProtocol != nil { if cerr := s.clientProtocol.Close(); !errors.Is(cerr, context.Canceled) { err = cerr @@ -207,11 +207,43 @@ func (s *pluginService[P, S]) closeClient() (err error) { return } -func (s *pluginService[P, S]) wait(ctx context.Context) error { +func (s *PluginService[P, S]) Wait(ctx context.Context) error { select { case <-ctx.Done(): return context.Cause(ctx) case <-s.serviceCh: return nil + case <-s.stopCh: + return nil + } +} + +// XXXTestHook returns a TestPluginService. +// It must only be called once, and before Start. +func (s *PluginService[P, S]) XXXTestHook() TestPluginService[P, S] { + s.testInterrupt = make(chan func(*PluginService[P, S])) + return s.testInterrupt +} + +// TestPluginService supports Killing & Resetting a running *pluginService. +type TestPluginService[P grpcPlugin, S types.Service] chan<- func(*PluginService[P, S]) + +func (ch TestPluginService[P, S]) Kill() { + done := make(chan struct{}) + ch <- func(s *PluginService[P, S]) { + defer close(done) + _ = s.closeClient() + } + <-done +} + +func (ch TestPluginService[P, S]) Reset() { + done := make(chan struct{}) + ch <- func(r *PluginService[P, S]) { + defer close(done) + _ = r.closeClient() + r.client = nil + r.clientProtocol = nil } + <-done } diff --git a/pkg/loop/service_test.go b/pkg/loop/service_test.go deleted file mode 100644 index d5d62b9f1d..0000000000 --- a/pkg/loop/service_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package loop - -import ( - "github.com/smartcontractkit/chainlink-relay/pkg/types" -) - -const KeepAliveTickDuration = keepAliveTickDuration - -// TestHook returns a TestPluginService. -// It must only be called once, and before Start. -func (s *pluginService[P, S]) TestHook() TestPluginService[P, S] { - s.testInterrupt = make(chan func(*pluginService[P, S])) - return s.testInterrupt -} - -// TestPluginService supports Killing & Resetting a running *pluginService. -type TestPluginService[P grpcPlugin, S types.Service] chan<- func(*pluginService[P, S]) - -func (ch TestPluginService[P, S]) Kill() { - done := make(chan struct{}) - ch <- func(s *pluginService[P, S]) { - defer close(done) - _ = s.closeClient() - } - <-done -} - -func (ch TestPluginService[P, S]) Reset() { - done := make(chan struct{}) - ch <- func(r *pluginService[P, S]) { - defer close(done) - _ = r.closeClient() - r.client = nil - r.clientProtocol = nil - } - <-done -} diff --git a/pkg/loop/plugin_test.go b/pkg/loop/testutils/cmd/main.go similarity index 53% rename from pkg/loop/plugin_test.go rename to pkg/loop/testutils/cmd/main.go index fe673dd3fb..d8f5069d62 100644 --- a/pkg/loop/plugin_test.go +++ b/pkg/loop/testutils/cmd/main.go @@ -1,122 +1,39 @@ -package loop_test +package main import ( "context" + "flag" "fmt" "os" - "os/exec" - "strconv" - "testing" - "time" "github.com/hashicorp/go-plugin" - "github.com/stretchr/testify/require" "google.golang.org/grpc" "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/loop" "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins" "github.com/smartcontractkit/chainlink-relay/pkg/types" - "github.com/smartcontractkit/chainlink-relay/pkg/utils" ) -// PluginGenericMedian is a generic plugin which wants a median provider -const PluginGenericMedian = "generic-median" - -func testPlugin[I any](t *testing.T, name string, p plugin.Plugin, testFn func(*testing.T, I)) { - ctx, cancel := context.WithCancel(utils.Context(t)) - defer cancel() - - ch := make(chan *plugin.ReattachConfig, 1) - closeCh := make(chan struct{}) - go plugin.Serve(&plugin.ServeConfig{ - Test: &plugin.ServeTestConfig{ - Context: ctx, - ReattachConfigCh: ch, - CloseCh: closeCh, - }, - GRPCServer: plugin.DefaultGRPCServer, - Plugins: map[string]plugin.Plugin{name: p}, - }) - - // We should get a config - var config *plugin.ReattachConfig - select { - case config = <-ch: - case <-time.After(5 * time.Second): - t.Fatal("should've received reattach") - } - require.NotNil(t, config) - - c := plugin.NewClient(&plugin.ClientConfig{ - Reattach: config, - Plugins: map[string]plugin.Plugin{name: p}, - }) - t.Cleanup(c.Kill) - clientProtocol, err := c.Client() - require.NoError(t, err) - defer clientProtocol.Close() - i, err := clientProtocol.Dispense(name) - require.NoError(t, err) - - testFn(t, i.(I)) - - // stop plugin - cancel() - select { - case <-closeCh: - case <-time.After(5 * time.Second): - t.Fatal("should've stopped") - } - require.Error(t, clientProtocol.Ping()) -} +func main() { + cmdS := "" + flag.StringVar(&cmdS, "cmd", "", "the name of the service to run") -func helperProcess(s ...string) *exec.Cmd { - cs := []string{"-test.run=TestHelperProcess", "--"} - cs = append(cs, s...) - env := []string{ - "GO_WANT_HELPER_PROCESS=1", - } - - cmd := exec.Command(os.Args[0], cs...) - cmd.Env = append(env, os.Environ()...) - return cmd -} - -// This is not a real test. This is just a helper process kicked off by -// tests. -func TestHelperProcess(t *testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return - } + limitI := 0 + flag.IntVar(&limitI, "limit", 0, "the number of services to run") + flag.Parse() defer os.Exit(0) - args := os.Args - for len(args) > 0 { - if args[0] == "--" { - args = args[1:] - break - } - - args = args[1:] - } - - if len(args) == 0 { + if cmdS == "" { fmt.Fprintf(os.Stderr, "No command\n") os.Exit(2) } - cmd, args := args[0], args[1:] - limit := -1 - if len(args) > 0 { - var err error - limit, err = strconv.Atoi(args[0]) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to parse integer limit: %s\n", err) - os.Exit(2) - } + if limitI != 0 { + limit = limitI } grpcServer := func(opts []grpc.ServerOption) *grpc.Server { return grpc.NewServer(opts...) } @@ -128,14 +45,20 @@ func TestHelperProcess(t *testing.T) { } } + lggr, err := logger.New() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to instantiate logger: %s\n", err) + os.Exit(2) + } + stopCh := make(chan struct{}) defer close(stopCh) - switch cmd { + switch cmdS { case loop.PluginRelayerName: plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginRelayerHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, + loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) @@ -145,20 +68,20 @@ func TestHelperProcess(t *testing.T) { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: loop.PluginMedianHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginRelayerName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, + loop.PluginRelayerName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}}, }, GRPCServer: grpcServer, }) os.Exit(0) - case loop.PluginServiceName: + case "plugin-service": plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: loop.PluginGenericHandshakeConfig(), + HandshakeConfig: reporting_plugins.PluginGenericHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginServiceName: &loop.GRPCPluginService[types.PluginProvider]{ + "plugin-service": &reporting_plugins.GRPCPluginService[types.PluginProvider]{ PluginServer: test.StaticGenericPlugin{}, BrokerConfig: loop.BrokerConfig{ - Logger: logger.Test(t), + Logger: lggr, StopCh: stopCh, }, }, @@ -167,14 +90,14 @@ func TestHelperProcess(t *testing.T) { }) os.Exit(0) - case PluginGenericMedian: + case "generic-median": plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: loop.PluginGenericHandshakeConfig(), + HandshakeConfig: reporting_plugins.PluginGenericHandshakeConfig(), Plugins: map[string]plugin.Plugin{ - loop.PluginServiceName: &loop.GRPCPluginService[types.MedianProvider]{ + "plugin-service": &reporting_plugins.GRPCPluginService[types.MedianProvider]{ PluginServer: test.StaticGenericPluginMedianProvider{}, BrokerConfig: loop.BrokerConfig{ - Logger: logger.Test(t), + Logger: lggr, StopCh: stopCh, }, }, @@ -184,7 +107,7 @@ func TestHelperProcess(t *testing.T) { os.Exit(0) default: - fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) + fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmdS) os.Exit(2) } } diff --git a/pkg/loop/testutils/service.go b/pkg/loop/testutils/service.go new file mode 100644 index 0000000000..1dd298d481 --- /dev/null +++ b/pkg/loop/testutils/service.go @@ -0,0 +1,60 @@ +package testutils + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/go-plugin" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-relay/pkg/utils" +) + +func PluginTest[I any](t *testing.T, name string, p plugin.Plugin, testFn func(*testing.T, I)) { + ctx, cancel := context.WithCancel(utils.Context(t)) + defer cancel() + + ch := make(chan *plugin.ReattachConfig, 1) + closeCh := make(chan struct{}) + go plugin.Serve(&plugin.ServeConfig{ + Test: &plugin.ServeTestConfig{ + Context: ctx, + ReattachConfigCh: ch, + CloseCh: closeCh, + }, + GRPCServer: plugin.DefaultGRPCServer, + Plugins: map[string]plugin.Plugin{name: p}, + }) + + // We should get a config + var config *plugin.ReattachConfig + select { + case config = <-ch: + case <-time.After(5 * time.Second): + t.Fatal("should've received reattach") + } + require.NotNil(t, config) + + c := plugin.NewClient(&plugin.ClientConfig{ + Reattach: config, + Plugins: map[string]plugin.Plugin{name: p}, + }) + t.Cleanup(c.Kill) + clientProtocol, err := c.Client() + require.NoError(t, err) + defer clientProtocol.Close() + i, err := clientProtocol.Dispense(name) + require.NoError(t, err) + + testFn(t, i.(I)) + + // stop plugin + cancel() + select { + case <-closeCh: + case <-time.After(5 * time.Second): + t.Fatal("should've stopped") + } + require.Error(t, clientProtocol.Ping()) +}