Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Oct 16, 2023
1 parent 66bf221 commit 6ea9785
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 228 deletions.
11 changes: 4 additions & 7 deletions pkg/loop/median_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

var _ ocrtypes.ReportingPluginFactory = (*MedianService)(nil)

// MedianService is a [types.Service] that maintains an internal [types.PluginMedian].
type MedianService struct {
pluginService[*GRPCPluginMedian, types.ReportingPluginFactory]
PluginService[*GRPCPluginMedian, types.ReportingPluginFactory]
}

// NewMedianService returns a new [*MedianService].
Expand All @@ -34,15 +33,13 @@ func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cm
lggr = logger.Named(lggr, "MedianService")
var ms MedianService
broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
ms.init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
ms.Init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
return &ms
}

func (m *MedianService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
ctx, cancel := utils.ContextFromChan(m.pluginService.stopCh)
defer cancel()
if err := m.wait(ctx); err != nil {
if err := m.Wait(context.Background()); err != nil {
return nil, ocrtypes.ReportingPluginInfo{}, err
}
return m.service.NewReportingPlugin(config)
return m.Service.NewReportingPlugin(config)
}
7 changes: 3 additions & 4 deletions pkg/loop/median_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loop_test

import (
"os/exec"
"strconv"
"sync/atomic"
"testing"
"time"
Expand All @@ -19,9 +18,9 @@ import (
func TestMedianService(t *testing.T) {
t.Parallel()
median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return helperProcess(loop.PluginMedianName)
return HelperProcess(loop.PluginMedianName, HelperProcessOptions{})
}, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{})
hook := median.TestHook()
hook := median.PluginService.XXXTestHook()
require.NoError(t, median.Start(utils.Context(t)))
t.Cleanup(func() { assert.NoError(t, median.Close()) })

Expand Down Expand Up @@ -52,7 +51,7 @@ func TestMedianService_recovery(t *testing.T) {
t.Parallel()
var limit atomic.Int32
median := loop.NewMedianService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return helperProcess(loop.PluginMedianName, strconv.Itoa(int(limit.Add(1))))
return HelperProcess(loop.PluginMedianName, HelperProcessOptions{Limit: int(limit.Add(1))})
}, test.StaticMedianProvider{}, test.StaticDataSource(), test.StaticJuelsPerFeeCoinDataSource(), &test.StaticErrorLog{})
require.NoError(t, median.Start(utils.Context(t)))
t.Cleanup(func() { assert.NoError(t, median.Close()) })
Expand Down
9 changes: 5 additions & 4 deletions pkg/loop/plugin_median_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import (
"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
)

func TestPluginMedian(t *testing.T) {
t.Parallel()

stopCh := newStopCh(t)
testPlugin(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginMedian)
testutils.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginMedian)

t.Run("proxy", func(t *testing.T) {
testPlugin(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) {
testutils.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, func(t *testing.T, pr loop.PluginRelayer) {
p := newMedianProvider(t, pr)
pm := test.PluginMedianTest{MedianProvider: p}
testPlugin(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian)
testutils.PluginTest(t, loop.PluginMedianName, &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, pm.TestPluginMedian)
})
})
}
Expand All @@ -35,7 +36,7 @@ func TestPluginMedianExec(t *testing.T) {
stopCh := newStopCh(t)
median := loop.GRPCPluginMedian{BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}
cc := median.ClientConfig()
cc.Cmd = helperProcess(loop.PluginMedianName)
cc.Cmd = HelperProcess(loop.PluginMedianName, HelperProcessOptions{})
c := plugin.NewClient(cc)
t.Cleanup(c.Kill)
client, err := c.Client()
Expand Down
5 changes: 3 additions & 2 deletions pkg/loop/plugin_relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils"
)

func TestPluginRelayer(t *testing.T) {
t.Parallel()

stopCh := newStopCh(t)
testPlugin(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginRelayer)
testutils.PluginTest(t, loop.PluginRelayerName, &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}, test.TestPluginRelayer)
}

func TestPluginRelayerExec(t *testing.T) {
Expand All @@ -30,7 +31,7 @@ func TestPluginRelayerExec(t *testing.T) {
func newPluginRelayerExec(t *testing.T, stopCh <-chan struct{}) loop.PluginRelayer {
relayer := loop.GRPCPluginRelayer{BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}}
cc := relayer.ClientConfig()
cc.Cmd = helperProcess(loop.PluginRelayerName)
cc.Cmd = HelperProcess(loop.PluginRelayerName, HelperProcessOptions{})
c := plugin.NewClient(cc)
t.Cleanup(c.Kill)
client, err := c.Client()
Expand Down
23 changes: 23 additions & 0 deletions pkg/loop/process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package loop_test

import (
"fmt"
"os"
"os/exec"
)

type HelperProcessOptions struct {
Limit int
}

func HelperProcess(command string, opts HelperProcessOptions) *exec.Cmd {
cmdArgs := []string{
"go", "run", "./testutils/cmd/main.go", fmt.Sprintf("-cmd=%s", command),
}
if opts.Limit != 0 {
cmdArgs = append(cmdArgs, fmt.Sprintf("-limit=%d", opts.Limit))
}
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.Env = append(os.Environ())
return cmd
}
24 changes: 12 additions & 12 deletions pkg/loop/relayer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ Relayer = (*RelayerService)(nil)

// RelayerService is a [types.Service] that maintains an internal [Relayer].
type RelayerService struct {
pluginService[*GRPCPluginRelayer, Relayer]
PluginService[*GRPCPluginRelayer, Relayer]
}

// NewRelayerService returns a new [*RelayerService].
Expand All @@ -38,41 +38,41 @@ func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.C
lggr = logger.Named(lggr, "RelayerService")
var rs RelayerService
broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
rs.init(PluginRelayerName, &GRPCPluginRelayer{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
rs.Init(PluginRelayerName, &GRPCPluginRelayer{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
return &rs
}

func (r *RelayerService) NewConfigProvider(ctx context.Context, args types.RelayArgs) (types.ConfigProvider, error) {
if err := r.wait(ctx); err != nil {
if err := r.Wait(ctx); err != nil {
return nil, err
}
return r.service.NewConfigProvider(ctx, args)
return r.Service.NewConfigProvider(ctx, args)
}

func (r *RelayerService) NewPluginProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.PluginProvider, error) {
if err := r.wait(ctx); err != nil {
if err := r.Wait(ctx); err != nil {
return nil, err
}
return r.service.NewPluginProvider(ctx, rargs, pargs)
return r.Service.NewPluginProvider(ctx, rargs, pargs)
}

func (r *RelayerService) GetChainStatus(ctx context.Context) (types.ChainStatus, error) {
if err := r.wait(ctx); err != nil {
if err := r.Wait(ctx); err != nil {
return types.ChainStatus{}, err
}
return r.service.GetChainStatus(ctx)
return r.Service.GetChainStatus(ctx)
}

func (r *RelayerService) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (nodes []types.NodeStatus, nextPageToken string, total int, err error) {
if err := r.wait(ctx); err != nil {
if err := r.Wait(ctx); err != nil {
return nil, "", -1, err
}
return r.service.ListNodeStatuses(ctx, pageSize, pageToken)
return r.Service.ListNodeStatuses(ctx, pageSize, pageToken)
}

func (r *RelayerService) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error {
if err := r.wait(ctx); err != nil {
if err := r.Wait(ctx); err != nil {
return err
}
return r.service.Transact(ctx, from, to, amount, balanceCheck)
return r.Service.Transact(ctx, from, to, amount, balanceCheck)
}
7 changes: 3 additions & 4 deletions pkg/loop/relayer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loop_test

import (
"os/exec"
"strconv"
"sync/atomic"
"testing"
"time"
Expand All @@ -19,9 +18,9 @@ import (
func TestRelayerService(t *testing.T) {
t.Parallel()
relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return helperProcess(loop.PluginRelayerName)
return HelperProcess(loop.PluginRelayerName, HelperProcessOptions{})
}, test.ConfigTOML, test.StaticKeystore{})
hook := relayer.TestHook()
hook := relayer.XXXTestHook()
require.NoError(t, relayer.Start(utils.Context(t)))
t.Cleanup(func() { assert.NoError(t, relayer.Close()) })

Expand Down Expand Up @@ -52,7 +51,7 @@ func TestRelayerService_recovery(t *testing.T) {
t.Parallel()
var limit atomic.Int32
relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd {
return helperProcess(loop.PluginRelayerName, strconv.Itoa(int(limit.Add(1))))
return HelperProcess(loop.PluginRelayerName, HelperProcessOptions{Limit: int(limit.Add(1))})
}, test.ConfigTOML, test.StaticKeystore{})
require.NoError(t, relayer.Start(utils.Context(t)))
t.Cleanup(func() { assert.NoError(t, relayer.Close()) })
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package loop
package reporting_plugins

import (
"context"

"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
)
Expand All @@ -22,7 +23,7 @@ func PluginGenericHandshakeConfig() plugin.HandshakeConfig {

type ProviderServer[T types.PluginProvider] interface {
types.PluginServer[T]
ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig BrokerConfig) T
ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig loop.BrokerConfig) T
}

// GRPCPluginService is the loopp interface for a plugin that can
Expand All @@ -31,7 +32,7 @@ type ProviderServer[T types.PluginProvider] interface {
type GRPCPluginService[T types.PluginProvider] struct {
plugin.NetRPCUnsupportedPlugin

BrokerConfig
loop.BrokerConfig

PluginServer ProviderServer[T]

Expand Down Expand Up @@ -68,8 +69,8 @@ func (p *GRPCPluginService[T]) ClientConfig() *plugin.ClientConfig {
HandshakeConfig: PluginGenericHandshakeConfig(),
Plugins: map[string]plugin.Plugin{PluginServiceName: p},
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
GRPCDialOptions: p.DialOpts,
Logger: HCLogLogger(p.Logger),
GRPCDialOptions: p.BrokerConfig.DialOpts,
Logger: loop.HCLogLogger(p.BrokerConfig.Logger),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
package loop_test
package reporting_plugins_test

import (
"testing"
"time"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/testutils"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
)

func newStopCh(t *testing.T) <-chan struct{} {
stopCh := make(chan struct{})
if d, ok := t.Deadline(); ok {
time.AfterFunc(time.Until(d), func() { close(stopCh) })
}
return stopCh
}

func TestGRPCPluginService(t *testing.T) {
t.Parallel()

stopCh := newStopCh(t)
testPlugin(
testutils.PluginTest(
t,
PluginGenericMedian,
&loop.GRPCPluginService[types.MedianProvider]{
&reporting_plugins.GRPCPluginService[types.MedianProvider]{
PluginServer: test.StaticGenericPluginMedianProvider{},
BrokerConfig: loop.BrokerConfig{
Logger: logger.Test(t),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package loop
package reporting_plugins

import (
"context"
Expand All @@ -9,21 +9,21 @@ import (
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

var _ ocrtypes.ReportingPluginFactory = (*PluginService)(nil)
var _ ocrtypes.ReportingPluginFactory = (*LOOPPService)(nil)

// PluginService is a [types.Service] that maintains an internal [types.PluginClient].
type PluginService struct {
pluginService[*GRPCPluginService[types.PluginProvider], types.ReportingPluginFactory]
type LOOPPService struct {
loop.PluginService[*GRPCPluginService[types.PluginProvider], types.ReportingPluginFactory]
}

// NewPluginService returns a new [*PluginService].
// cmd must return a new exec.Cmd each time it is called.
// We use a `conn` here rather than a provider so that we can enforce proxy providers being passed in.
func NewPluginService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config types.PluginServiceConfig, providerConn grpc.ClientConnInterface, errorLog types.ErrorLog) *PluginService {
func NewLOOPPService(lggr logger.Logger, grpcOpts loop.GRPCOpts, cmd func() *exec.Cmd, config types.PluginServiceConfig, providerConn grpc.ClientConnInterface, errorLog types.ErrorLog) *LOOPPService {
newService := func(ctx context.Context, instance any) (types.ReportingPluginFactory, error) {
plug, ok := instance.(types.PluginClient)
if !ok {
Expand All @@ -33,17 +33,15 @@ func NewPluginService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cm
}
stopCh := make(chan struct{})
lggr = logger.Named(lggr, "GenericService")
var ps PluginService
broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
ps.init(PluginServiceName, &GRPCPluginService[types.PluginProvider]{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
var ps LOOPPService
broker := loop.BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
ps.Init(PluginServiceName, &GRPCPluginService[types.PluginProvider]{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
return &ps
}

func (g *PluginService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
ctx, cancel := utils.ContextFromChan(g.pluginService.stopCh)
defer cancel()
if err := g.wait(ctx); err != nil {
func (g *LOOPPService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
if err := g.Wait(context.Background()); err != nil {
return nil, ocrtypes.ReportingPluginInfo{}, err
}
return g.service.NewReportingPlugin(config)
return g.Service.NewReportingPlugin(config)
}
Loading

0 comments on commit 6ea9785

Please sign in to comment.