Skip to content

Commit

Permalink
pkg/loop: plugins report health to host
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 31, 2023
1 parent 0781c3c commit db7e8af
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 48 deletions.
1 change: 1 addition & 0 deletions pkg/loop/internal/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type pluginMedianServer struct {
}

func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/loop/internal/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil)

type PluginRelayerClient struct {
*pluginClient
*serviceClient

grpc pb.PluginRelayerClient
pluginRelayer pb.PluginRelayerClient
}

func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient {
brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient")
pc := newPluginClient(broker, brokerCfg, conn)
return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)}
return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)}
}

func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) {
Expand All @@ -40,7 +41,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
}
deps.Add(ksRes)

reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{
reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{
Config: config,
KeystoreID: id,
})
Expand All @@ -61,6 +62,7 @@ type pluginRelayerServer struct {
}

func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl))
return nil
}
Expand Down Expand Up @@ -436,14 +438,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq
return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck)
}

func healthReport(s map[string]string) (hr map[string]error) {
func healthReport(prefix string, s map[string]string) (hr map[string]error) {
hr = make(map[string]error, len(s))
for n, e := range s {
var err error
if e != "" {
err = errors.New(e)
}
hr[n] = err
hr[prefix+"."+n] = err
}
return hr
}
1 change: 1 addition & 0 deletions pkg/loop/internal/reporting_plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type reportingPluginServiceServer struct {
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/loop/internal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (s *serviceClient) HealthReport() map[string]error {
ctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()

name := s.Name()
reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{})
if err != nil {
return map[string]error{s.b.Logger.Name(): err}
return map[string]error{name: err}
}
hr := healthReport(reply.HealthReport)
hr := healthReport(name, reply.HealthReport)
hr[s.b.Logger.Name()] = nil
return hr
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/loop/internal/test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ import (
"github.com/stretchr/testify/assert"
)

type staticConfigProvider struct{}
type staticConfigProvider struct {
name string
}

// TODO validate start/Close calls?
func (s staticConfigProvider) Start(ctx context.Context) error { return nil }

func (s staticConfigProvider) Close() error { return nil }

func (s staticConfigProvider) Ready() error { panic("unimplemented") }
func (s staticConfigProvider) Ready() error { return nil }

func (s staticConfigProvider) Name() string { panic("unimplemented") }
func (s staticConfigProvider) Name() string { return s.name + ".staticConfigProvider" }

func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") }
func (s staticConfigProvider) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Ready()}
}

func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
Expand Down
45 changes: 37 additions & 8 deletions pkg/loop/internal/test/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink-relay/pkg/services/srvctest"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils/tests"
)
Expand All @@ -34,6 +36,11 @@ func (m PluginMedianTest) TestPluginMedian(t *testing.T, p types.PluginMedian) {
require.NoError(t, err)

TestReportingPluginFactory(t, factory)
srvctest.AssertHealthReportNames(t, p.HealthReport(),
"PluginMedianClient",
"PluginMedianClient.StaticPluginMedian",
"PluginMedianClient.StaticPluginMedian.staticPluginFactory",
)
})
}

Expand Down Expand Up @@ -68,6 +75,20 @@ func TestReportingPluginFactory(t *testing.T, factory types.ReportingPluginFacto

type StaticPluginMedian struct{}

func (s StaticPluginMedian) Name() string { return "StaticPluginMedian" }

func (s StaticPluginMedian) Start(ctx context.Context) error { return nil }

func (s StaticPluginMedian) Close() error { return nil }

func (s StaticPluginMedian) Ready() error { return nil }

func (s StaticPluginMedian) HealthReport() map[string]error {
hp := map[string]error{s.Name(): s.Ready()}
services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport())
return hp
}

func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) {
ocd := provider.OffchainConfigDigester()
gotDigestPrefix, err := ocd.ConfigDigestPrefix()
Expand Down Expand Up @@ -218,20 +239,24 @@ func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types
if err := errorLog.SaveError(ctx, errMsg); err != nil {
return nil, fmt.Errorf("failed to save error: %w", err)
}
return staticPluginFactory{}, nil
return staticPluginFactory{s.Name()}, nil
}

type staticPluginFactory struct{}
type staticPluginFactory struct {
name string
}

func (s staticPluginFactory) Name() string { panic("implement me") }
func (s staticPluginFactory) Name() string { return s.name + ".staticPluginFactory" }

func (s staticPluginFactory) Start(ctx context.Context) error { return nil }

func (s staticPluginFactory) Close() error { return nil }

func (s staticPluginFactory) Ready() error { panic("implement me") }
func (s staticPluginFactory) Ready() error { return nil }

func (s staticPluginFactory) HealthReport() map[string]error { panic("implement me") }
func (s staticPluginFactory) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Ready()}
}

func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) {
if config.ConfigDigest != reportingPluginConfig.ConfigDigest {
Expand Down Expand Up @@ -273,17 +298,21 @@ func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginCon
return staticReportingPlugin{}, rpi, nil
}

type StaticMedianProvider struct{}
type StaticMedianProvider struct {
name string
}

func (s StaticMedianProvider) Start(ctx context.Context) error { return nil }

func (s StaticMedianProvider) Close() error { return nil }

func (s StaticMedianProvider) Ready() error { panic("unimplemented") }

func (s StaticMedianProvider) Name() string { panic("unimplemented") }
func (s StaticMedianProvider) Name() string { return s.name + ".StaticMedianProvider" }

func (s StaticMedianProvider) HealthReport() map[string]error { panic("unimplemented") }
func (s StaticMedianProvider) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Ready()}
}

func (s StaticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
Expand Down
10 changes: 7 additions & 3 deletions pkg/loop/internal/test/plugin_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (
libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

type StaticPluginProvider struct{}
type StaticPluginProvider struct {
name string
}

func (s StaticPluginProvider) Start(ctx context.Context) error { return nil }

func (s StaticPluginProvider) Close() error { return nil }

func (s StaticPluginProvider) Ready() error { panic("unimplemented") }

func (s StaticPluginProvider) Name() string { panic("unimplemented") }
func (s StaticPluginProvider) Name() string { return s.name + ".StaticPluginProvider" }

func (s StaticPluginProvider) HealthReport() map[string]error { panic("unimplemented") }
func (s StaticPluginProvider) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Ready()}
}

func (s StaticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
Expand Down
34 changes: 30 additions & 4 deletions pkg/loop/internal/test/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink-relay/pkg/services/srvctest"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils/tests"
)
Expand All @@ -35,6 +37,20 @@ func (s StaticKeystore) Sign(ctx context.Context, id string, data []byte) ([]byt

type StaticPluginRelayer struct{}

func (s StaticPluginRelayer) Name() string { return "StaticPluginRelayer" }

func (s StaticPluginRelayer) Start(ctx context.Context) error { return nil }

func (s StaticPluginRelayer) Close() error { return nil }

func (s StaticPluginRelayer) Ready() error { return nil }

func (s StaticPluginRelayer) HealthReport() map[string]error {
hp := map[string]error{s.Name(): s.Ready()}
services.CopyHealth(hp, staticRelayer{s.Name()}.HealthReport())
return hp
}

func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (internal.Relayer, error) {
if config != ConfigTOML {
return nil, fmt.Errorf("expected config %q but got %q", ConfigTOML, config)
Expand All @@ -53,21 +69,25 @@ func (s StaticPluginRelayer) NewRelayer(ctx context.Context, config string, keys
if !bytes.Equal(signed, gotSigned) {
return nil, fmt.Errorf("expected signed bytes %x but got %x", signed, gotSigned)
}
return staticRelayer{}, nil
return staticRelayer{s.Name()}, nil
}

type staticRelayer struct{}
type staticRelayer struct {
name string
}

func (s staticRelayer) Start(ctx context.Context) error { return nil }

func (s staticRelayer) Close() error { return nil }

func (s staticRelayer) Ready() error { return nil }

func (s staticRelayer) Name() string { return "staticRelayer" }
func (s staticRelayer) Name() string { return s.name + ".staticRelayer" }

func (s staticRelayer) HealthReport() map[string]error {
return map[string]error{s.Name(): s.Ready()}
hp := map[string]error{s.Name(): s.Ready()}
services.CopyHealth(hp, staticConfigProvider{s.Name()}.HealthReport())
return hp
}
func (s staticRelayer) NewConfigProvider(ctx context.Context, r types.RelayArgs) (types.ConfigProvider, error) {
if !equalRelayArgs(r, RelayArgs) {
Expand Down Expand Up @@ -158,6 +178,12 @@ func TestPluginRelayer(t *testing.T, p internal.PluginRelayer) {
require.NoError(t, relayer.Start(ctx))
t.Cleanup(func() { assert.NoError(t, relayer.Close()) })
TestRelayer(t, relayer)
srvctest.AssertHealthReportNames(t, p.HealthReport(),
"PluginRelayerClient",
"PluginRelayerClient.StaticPluginRelayer",
"PluginRelayerClient.StaticPluginRelayer.staticRelayer",
"PluginRelayerClient.StaticPluginRelayer.staticRelayer.staticConfigProvider",
)
})
}

Expand Down
45 changes: 43 additions & 2 deletions pkg/loop/internal/test/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
)

Expand All @@ -19,13 +20,32 @@ type MockConn struct {
const ReportingPluginWithMedianProviderName = "reporting-plugin-with-median-provider"

type StaticReportingPluginWithMedianProvider struct {
name string
}

func (s StaticReportingPluginWithMedianProvider) Start(ctx context.Context) error { return nil }

func (s StaticReportingPluginWithMedianProvider) Close() error { return nil }

func (s StaticReportingPluginWithMedianProvider) Ready() error { return nil }

func (s StaticReportingPluginWithMedianProvider) HealthReport() map[string]error {
hp := map[string]error{s.Name(): s.Ready()}
services.CopyHealth(hp, StaticMedianProvider{s.Name()}.HealthReport())
services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport())
return hp
}

func (s StaticReportingPluginWithMedianProvider) Name() string {
return s.name + ".StaticReportingPluginWithMedianProvider"
}

func (s StaticReportingPluginWithMedianProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.MedianProvider {
return StaticMedianProvider{}
}

func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, provider types.MedianProvider, pipelineRunner types.PipelineRunnerService, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) {
//TODO validate config
ocd := provider.OffchainConfigDigester()
gotDigestPrefix, err := ocd.ConfigDigestPrefix()
if err != nil {
Expand Down Expand Up @@ -158,13 +178,34 @@ func (s StaticReportingPluginWithMedianProvider) NewReportingPluginFactory(ctx c
if !reflect.DeepEqual(gotDecoded, onchainConfig) {
return nil, fmt.Errorf("expected OnchainConfig %s but got %s", onchainConfig, gotDecoded)
}

//TODO validate pipelineRunner

if err := errorLog.SaveError(ctx, errMsg); err != nil {
return nil, fmt.Errorf("failed to save error: %w", err)
}
return staticPluginFactory{}, nil
return staticPluginFactory{s.Name()}, nil
}

type StaticReportingPluginWithPluginProvider struct {
name string
}

func (s StaticReportingPluginWithPluginProvider) Start(ctx context.Context) error { return nil }

func (s StaticReportingPluginWithPluginProvider) Close() error { return nil }

func (s StaticReportingPluginWithPluginProvider) Ready() error { return nil }

func (s StaticReportingPluginWithPluginProvider) HealthReport() map[string]error {
hp := map[string]error{s.Name(): s.Ready()}
services.CopyHealth(hp, StaticPluginProvider{s.Name()}.HealthReport())
services.CopyHealth(hp, staticPluginFactory{s.Name()}.HealthReport())
return hp
}

func (s StaticReportingPluginWithPluginProvider) Name() string {
return s.name + ".StaticReportingPluginWithPluginProvider"
}

func (s StaticReportingPluginWithPluginProvider) ConnToProvider(conn grpc.ClientConnInterface, broker internal.Broker, brokerConfig internal.BrokerConfig) types.PluginProvider {
Expand Down Expand Up @@ -244,5 +285,5 @@ func (s StaticReportingPluginWithPluginProvider) NewReportingPluginFactory(ctx c
if !reflect.DeepEqual(tr, taskResults) {
return nil, fmt.Errorf("expected TaskResults %+v but got %+v", taskResults, tr)
}
return staticPluginFactory{}, nil
return staticPluginFactory{s.Name()}, nil
}
Loading

0 comments on commit db7e8af

Please sign in to comment.