Skip to content

Commit

Permalink
pkg/loop: plugins report health to host
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Nov 27, 2023
1 parent b2682a5 commit c2c5209
Show file tree
Hide file tree
Showing 25 changed files with 464 additions and 286 deletions.
7 changes: 7 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func With(l Logger, keyvals ...interface{}) Logger {

// Named returns a logger with name 'n', if 'l' has a method `Named(string) L`, where L implements Logger, otherwise it returns l.
func Named(l Logger, n string) Logger {
l = named(l, n)
if testing.Testing() {
l.Debugf("New logger: %s", n)
}
return l
}
func named(l Logger, n string) Logger {
switch t := l.(type) {
case *logger:
return t.named(n)
Expand Down
5 changes: 5 additions & 0 deletions pkg/loop/internal/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type pluginMedianServer struct {
}

func RegisterPluginMedianServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.PluginMedian) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginMedianServer(server, newPluginMedianServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -152,6 +153,10 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N
m.closeAll(dsRes, juelsRes, providerRes, errorLogRes)
return nil, err
}
if err = factory.Start(ctx); err != nil {
m.closeAll(dsRes, juelsRes, providerRes, errorLogRes)
return nil, err
}

id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
Expand Down
14 changes: 8 additions & 6 deletions pkg/loop/internal/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ var _ PluginRelayer = (*PluginRelayerClient)(nil)

type PluginRelayerClient struct {
*pluginClient
*serviceClient

grpc pb.PluginRelayerClient
pluginRelayer pb.PluginRelayerClient
}

func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.ClientConn) *PluginRelayerClient {
brokerCfg.Logger = logger.Named(brokerCfg.Logger, "PluginRelayerClient")
pc := newPluginClient(broker, brokerCfg, conn)
return &PluginRelayerClient{pluginClient: pc, grpc: pb.NewPluginRelayerClient(pc)}
return &PluginRelayerClient{pluginClient: pc, pluginRelayer: pb.NewPluginRelayerClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)}
}

func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore types.Keystore) (Relayer, error) {
Expand All @@ -40,7 +41,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key
}
deps.Add(ksRes)

reply, err := p.grpc.NewRelayer(ctx, &pb.NewRelayerRequest{
reply, err := p.pluginRelayer.NewRelayer(ctx, &pb.NewRelayerRequest{
Config: config,
KeystoreID: id,
})
Expand All @@ -61,6 +62,7 @@ type pluginRelayerServer struct {
}

func RegisterPluginRelayerServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl PluginRelayer) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterPluginRelayerServer(server, newPluginRelayerServer(broker, brokerCfg, impl))
return nil
}
Expand Down Expand Up @@ -161,7 +163,7 @@ type relayerClient struct {
}

func newRelayerClient(b *brokerExt, conn grpc.ClientConnInterface) *relayerClient {
b = b.withName("ChainRelayerClient")
b = b.withName("RelayerClient")
return &relayerClient{b, newServiceClient(b, conn), pb.NewRelayerClient(conn)}
}

Expand Down Expand Up @@ -436,14 +438,14 @@ func (r *relayerServer) Transact(ctx context.Context, request *pb.TransactionReq
return &emptypb.Empty{}, r.impl.Transact(ctx, request.From, request.To, request.Amount.Int(), request.BalanceCheck)
}

func healthReport(s map[string]string) (hr map[string]error) {
func healthReport(prefix string, s map[string]string) (hr map[string]error) {
hr = make(map[string]error, len(s))
for n, e := range s {
var err error
if e != "" {
err = errors.New(e)
}
hr[n] = err
hr[prefix+"."+n] = err
}
return hr
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/loop/internal/reporting_plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type reportingPluginServiceServer struct {
}

func RegisterReportingPluginServiceServer(server *grpc.Server, broker Broker, brokerCfg BrokerConfig, impl types.ReportingPluginClient) error {
pb.RegisterServiceServer(server, &serviceServer{srv: impl})
pb.RegisterReportingPluginServiceServer(server, newReportingPluginServiceServer(&brokerExt{broker, brokerCfg}, impl))
return nil
}
Expand Down Expand Up @@ -148,6 +149,10 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
return nil, err
}
if err = factory.Start(ctx); err != nil {
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
return nil, err
}

id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
Expand Down
5 changes: 3 additions & 2 deletions pkg/loop/internal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (s *serviceClient) HealthReport() map[string]error {
ctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()

name := s.Name()
reply, err := s.grpc.HealthReport(ctx, &emptypb.Empty{})
if err != nil {
return map[string]error{s.b.Logger.Name(): err}
return map[string]error{name: err}
}
hr := healthReport(reply.HealthReport)
hr := healthReport(name, reply.HealthReport)
hr[s.b.Logger.Name()] = nil
return hr
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/loop/internal/test/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginRelayerHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.NewStaticPluginRelayer(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
Expand All @@ -68,7 +68,7 @@ func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginMedianHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
loop.PluginMedianName: &loop.GRPCPluginMedian{PluginServer: test.NewStaticPluginMedian(lggr), BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
Expand All @@ -89,7 +89,7 @@ func main() {
HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.PluginProvider]{
PluginServer: test.StaticReportingPluginWithPluginProvider{},
PluginServer: test.NewStaticReportingPluginWithPluginProvider(lggr),
BrokerConfig: loop.BrokerConfig{
Logger: lggr,
StopCh: stopCh,
Expand All @@ -105,7 +105,7 @@ func main() {
HandshakeConfig: reportingplugins.ReportingPluginHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
reportingplugins.PluginServiceName: &reportingplugins.GRPCService[types.MedianProvider]{
PluginServer: test.StaticReportingPluginWithMedianProvider{},
PluginServer: test.NewStaticReportingPluginWithMedianProvider(lggr),
BrokerConfig: loop.BrokerConfig{
Logger: lggr,
StopCh: stopCh,
Expand Down
19 changes: 8 additions & 11 deletions pkg/loop/internal/test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@ import (
libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/stretchr/testify/assert"
)

type staticConfigProvider struct{}

// TODO validate start/Close calls?
func (s staticConfigProvider) Start(ctx context.Context) error { return nil }

func (s staticConfigProvider) Close() error { return nil }

func (s staticConfigProvider) Ready() error { panic("unimplemented") }
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func (s staticConfigProvider) Name() string { panic("unimplemented") }
type staticConfigProvider struct {
staticService
}

func (s staticConfigProvider) HealthReport() map[string]error { panic("unimplemented") }
func newStaticConfigProvider(lggr logger.Logger) staticConfigProvider {
return staticConfigProvider{staticService{lggr: logger.Named(lggr, "staticConfigProvider")}}
}

func (s staticConfigProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
Expand Down
73 changes: 43 additions & 30 deletions pkg/loop/internal/test/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types"


"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/srvctest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

func PluginMedian(t *testing.T, p types.PluginMedian) {
PluginMedianTest{&StaticMedianProvider{}}.TestPluginMedian(t, p)
PluginMedianTest{&staticMedianProvider{}}.TestPluginMedian(t, p)
}

type PluginMedianTest struct {
Expand All @@ -34,6 +38,11 @@ func (m PluginMedianTest) TestPluginMedian(t *testing.T, p types.PluginMedian) {
require.NoError(t, err)

ReportingPluginFactory(t, factory)
srvctest.AssertHealthReportNames(t, p.HealthReport(),
"PluginMedianClient",
"PluginMedianClient.staticPluginMedian",
"PluginMedianClient.staticPluginMedian.staticPluginFactory",
)
})
}

Expand Down Expand Up @@ -65,9 +74,21 @@ func ReportingPluginFactory(t *testing.T, factory types.ReportingPluginFactory)
})
}

type StaticPluginMedian struct{}
type staticPluginMedian struct {
staticService
}

func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) {
func NewStaticPluginMedian(lggr logger.Logger) staticPluginMedian {
return staticPluginMedian{staticService{lggr: logger.Named(lggr, "staticPluginMedian")}}
}

func (s staticPluginMedian) HealthReport() map[string]error {
hp := s.staticService.HealthReport()
services.CopyHealth(hp, newStaticPluginFactory(s.lggr).HealthReport())
return hp
}

func (s staticPluginMedian) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoinDataSource median.DataSource, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) {
ocd := provider.OffchainConfigDigester()
gotDigestPrefix, err := ocd.ConfigDigestPrefix()
if err != nil {
Expand Down Expand Up @@ -217,20 +238,16 @@ func (s StaticPluginMedian) NewMedianFactory(ctx context.Context, provider types
if err := errorLog.SaveError(ctx, errMsg); err != nil {
return nil, fmt.Errorf("failed to save error: %w", err)
}
return staticPluginFactory{}, nil
return newStaticPluginFactory(s.lggr), nil
}

type staticPluginFactory struct{}

func (s staticPluginFactory) Name() string { panic("implement me") }

func (s staticPluginFactory) Start(ctx context.Context) error { return nil }

func (s staticPluginFactory) Close() error { return nil }

func (s staticPluginFactory) Ready() error { panic("implement me") }
type staticPluginFactory struct {
staticService
}

func (s staticPluginFactory) HealthReport() map[string]error { panic("implement me") }
func newStaticPluginFactory(lggr logger.Logger) staticPluginFactory {
return staticPluginFactory{staticService{lggr: logger.Named(lggr, "staticPluginFactory")}}
}

func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginConfig) (libocr.ReportingPlugin, libocr.ReportingPluginInfo, error) {
if config.ConfigDigest != reportingPluginConfig.ConfigDigest {
Expand Down Expand Up @@ -272,35 +289,31 @@ func (s staticPluginFactory) NewReportingPlugin(config libocr.ReportingPluginCon
return staticReportingPlugin{}, rpi, nil
}

type StaticMedianProvider struct{}

func (s StaticMedianProvider) Start(ctx context.Context) error { return nil }

func (s StaticMedianProvider) Close() error { return nil }

func (s StaticMedianProvider) Ready() error { panic("unimplemented") }

func (s StaticMedianProvider) Name() string { panic("unimplemented") }
type staticMedianProvider struct {
staticService
}

func (s StaticMedianProvider) HealthReport() map[string]error { panic("unimplemented") }
func NewStaticMedianProvider(lggr logger.Logger) staticMedianProvider {
return staticMedianProvider{staticService{lggr: logger.Named(lggr, "staticMedianProvider")}}
}

func (s StaticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
func (s staticMedianProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
}

func (s StaticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker {
func (s staticMedianProvider) ContractConfigTracker() libocr.ContractConfigTracker {
return staticContractConfigTracker{}
}

func (s StaticMedianProvider) ContractTransmitter() libocr.ContractTransmitter {
func (s staticMedianProvider) ContractTransmitter() libocr.ContractTransmitter {
return staticContractTransmitter{}
}

func (s StaticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} }
func (s staticMedianProvider) ReportCodec() median.ReportCodec { return staticReportCodec{} }

func (s StaticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} }
func (s staticMedianProvider) MedianContract() median.MedianContract { return staticMedianContract{} }

func (s StaticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec {
func (s staticMedianProvider) OnchainConfigCodec() median.OnchainConfigCodec {
return staticOnchainConfigCodec{}
}

Expand Down
26 changes: 11 additions & 15 deletions pkg/loop/internal/test/plugin_provider.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
package test

import (
"context"

libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

type StaticPluginProvider struct{}

func (s StaticPluginProvider) Start(ctx context.Context) error { return nil }

func (s StaticPluginProvider) Close() error { return nil }

func (s StaticPluginProvider) Ready() error { panic("unimplemented") }
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func (s StaticPluginProvider) Name() string { panic("unimplemented") }
type staticPluginProvider struct {
staticService
}

func (s StaticPluginProvider) HealthReport() map[string]error { panic("unimplemented") }
func NewStaticPluginProvider(lggr logger.Logger) staticPluginProvider {
return staticPluginProvider{staticService{lggr: logger.Named(lggr, "staticPluginProvider")}}
}

func (s StaticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
func (s staticPluginProvider) OffchainConfigDigester() libocr.OffchainConfigDigester {
return staticOffchainConfigDigester{}
}

func (s StaticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker {
func (s staticPluginProvider) ContractConfigTracker() libocr.ContractConfigTracker {
return staticContractConfigTracker{}
}

func (s StaticPluginProvider) ContractTransmitter() libocr.ContractTransmitter {
func (s staticPluginProvider) ContractTransmitter() libocr.ContractTransmitter {
return staticContractTransmitter{}
}
Loading

0 comments on commit c2c5209

Please sign in to comment.