diff --git a/pkg/loop/internal/pb/generate.go b/pkg/loop/internal/pb/generate.go index 4c56e4efc..14705b7fd 100644 --- a/pkg/loop/internal/pb/generate.go +++ b/pkg/loop/internal/pb/generate.go @@ -1,4 +1,5 @@ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative relayer.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative reporting.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative median.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative telemetry.proto package pb diff --git a/pkg/loop/internal/pb/telemetry.pb.go b/pkg/loop/internal/pb/telemetry.pb.go new file mode 100644 index 000000000..63cb6f402 --- /dev/null +++ b/pkg/loop/internal/pb/telemetry.pb.go @@ -0,0 +1,260 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.3 +// source: telemetry.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RelayID struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Network string `protobuf:"bytes,1,opt,name=network,proto3" json:"network,omitempty"` + ChainId string `protobuf:"bytes,2,opt,name=chainId,proto3" json:"chainId,omitempty"` +} + +func (x *RelayID) Reset() { + *x = RelayID{} + if protoimpl.UnsafeEnabled { + mi := &file_telemetry_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RelayID) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RelayID) ProtoMessage() {} + +func (x *RelayID) ProtoReflect() protoreflect.Message { + mi := &file_telemetry_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RelayID.ProtoReflect.Descriptor instead. +func (*RelayID) Descriptor() ([]byte, []int) { + return file_telemetry_proto_rawDescGZIP(), []int{0} +} + +func (x *RelayID) GetNetwork() string { + if x != nil { + return x.Network + } + return "" +} + +func (x *RelayID) GetChainId() string { + if x != nil { + return x.ChainId + } + return "" +} + +type TelemetryMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RelayID *RelayID `protobuf:"bytes,1,opt,name=relayID,proto3" json:"relayID,omitempty"` + ContractID string `protobuf:"bytes,2,opt,name=contractID,proto3" json:"contractID,omitempty"` + TelemetryType string `protobuf:"bytes,3,opt,name=telemetryType,proto3" json:"telemetryType,omitempty"` + Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *TelemetryMessage) Reset() { + *x = TelemetryMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_telemetry_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TelemetryMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TelemetryMessage) ProtoMessage() {} + +func (x *TelemetryMessage) ProtoReflect() protoreflect.Message { + mi := &file_telemetry_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TelemetryMessage.ProtoReflect.Descriptor instead. +func (*TelemetryMessage) Descriptor() ([]byte, []int) { + return file_telemetry_proto_rawDescGZIP(), []int{1} +} + +func (x *TelemetryMessage) GetRelayID() *RelayID { + if x != nil { + return x.RelayID + } + return nil +} + +func (x *TelemetryMessage) GetContractID() string { + if x != nil { + return x.ContractID + } + return "" +} + +func (x *TelemetryMessage) GetTelemetryType() string { + if x != nil { + return x.TelemetryType + } + return "" +} + +func (x *TelemetryMessage) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +var File_telemetry_proto protoreflect.FileDescriptor + +var file_telemetry_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x04, 0x6c, 0x6f, 0x6f, 0x70, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x3d, 0x0a, 0x07, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x49, 0x44, 0x12, + 0x18, 0x0a, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x49, 0x64, 0x22, 0x9b, 0x01, 0x0a, 0x10, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, + 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x27, 0x0a, 0x07, 0x72, 0x65, 0x6c, 0x61, + 0x79, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x6f, 0x6f, 0x70, + 0x2e, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x49, 0x44, 0x52, 0x07, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x49, + 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x49, 0x44, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x49, + 0x44, 0x12, 0x24, 0x0a, 0x0d, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x54, 0x79, + 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x32, 0x43, 0x0a, 0x09, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x12, 0x36, + 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x6f, 0x70, 0x2e, 0x54, 0x65, + 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, + 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2d, + 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6c, 0x6f, 0x6f, 0x70, 0x2f, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_telemetry_proto_rawDescOnce sync.Once + file_telemetry_proto_rawDescData = file_telemetry_proto_rawDesc +) + +func file_telemetry_proto_rawDescGZIP() []byte { + file_telemetry_proto_rawDescOnce.Do(func() { + file_telemetry_proto_rawDescData = protoimpl.X.CompressGZIP(file_telemetry_proto_rawDescData) + }) + return file_telemetry_proto_rawDescData +} + +var file_telemetry_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_telemetry_proto_goTypes = []interface{}{ + (*RelayID)(nil), // 0: loop.RelayID + (*TelemetryMessage)(nil), // 1: loop.TelemetryMessage + (*emptypb.Empty)(nil), // 2: google.protobuf.Empty +} +var file_telemetry_proto_depIdxs = []int32{ + 0, // 0: loop.TelemetryMessage.relayID:type_name -> loop.RelayID + 1, // 1: loop.Telemetry.Send:input_type -> loop.TelemetryMessage + 2, // 2: loop.Telemetry.Send:output_type -> google.protobuf.Empty + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_telemetry_proto_init() } +func file_telemetry_proto_init() { + if File_telemetry_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_telemetry_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RelayID); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_telemetry_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TelemetryMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_telemetry_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_telemetry_proto_goTypes, + DependencyIndexes: file_telemetry_proto_depIdxs, + MessageInfos: file_telemetry_proto_msgTypes, + }.Build() + File_telemetry_proto = out.File + file_telemetry_proto_rawDesc = nil + file_telemetry_proto_goTypes = nil + file_telemetry_proto_depIdxs = nil +} diff --git a/pkg/loop/internal/pb/telemetry.proto b/pkg/loop/internal/pb/telemetry.proto new file mode 100644 index 000000000..7cbfc560d --- /dev/null +++ b/pkg/loop/internal/pb/telemetry.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/pb"; + +package loop; + +import "google/protobuf/empty.proto"; + +message RelayID { + string network = 1; + string chainId = 2; +} + +message TelemetryMessage { + RelayID relayID = 1; + string contractID = 2; + string telemetryType = 3; + bytes payload = 4; +} + +service Telemetry { + rpc Send(TelemetryMessage) returns (google.protobuf.Empty); +} diff --git a/pkg/loop/internal/pb/telemetry_grpc.pb.go b/pkg/loop/internal/pb/telemetry_grpc.pb.go new file mode 100644 index 000000000..166f55e69 --- /dev/null +++ b/pkg/loop/internal/pb/telemetry_grpc.pb.go @@ -0,0 +1,110 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 +// source: telemetry.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Telemetry_Send_FullMethodName = "/loop.Telemetry/Send" +) + +// TelemetryClient is the client API for Telemetry service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TelemetryClient interface { + Send(ctx context.Context, in *TelemetryMessage, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type telemetryClient struct { + cc grpc.ClientConnInterface +} + +func NewTelemetryClient(cc grpc.ClientConnInterface) TelemetryClient { + return &telemetryClient{cc} +} + +func (c *telemetryClient) Send(ctx context.Context, in *TelemetryMessage, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Telemetry_Send_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TelemetryServer is the server API for Telemetry service. +// All implementations must embed UnimplementedTelemetryServer +// for forward compatibility +type TelemetryServer interface { + Send(context.Context, *TelemetryMessage) (*emptypb.Empty, error) + mustEmbedUnimplementedTelemetryServer() +} + +// UnimplementedTelemetryServer must be embedded to have forward compatible implementations. +type UnimplementedTelemetryServer struct { +} + +func (UnimplementedTelemetryServer) Send(context.Context, *TelemetryMessage) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedTelemetryServer) mustEmbedUnimplementedTelemetryServer() {} + +// UnsafeTelemetryServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TelemetryServer will +// result in compilation errors. +type UnsafeTelemetryServer interface { + mustEmbedUnimplementedTelemetryServer() +} + +func RegisterTelemetryServer(s grpc.ServiceRegistrar, srv TelemetryServer) { + s.RegisterService(&Telemetry_ServiceDesc, srv) +} + +func _Telemetry_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TelemetryMessage) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TelemetryServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Telemetry_Send_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TelemetryServer).Send(ctx, req.(*TelemetryMessage)) + } + return interceptor(ctx, in, info, handler) +} + +// Telemetry_ServiceDesc is the grpc.ServiceDesc for Telemetry service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Telemetry_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "loop.Telemetry", + HandlerType: (*TelemetryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Send", + Handler: _Telemetry_Send_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "telemetry.proto", +} diff --git a/pkg/loop/internal/telemetry.go b/pkg/loop/internal/telemetry.go new file mode 100644 index 000000000..c57713dbe --- /dev/null +++ b/pkg/loop/internal/telemetry.go @@ -0,0 +1,144 @@ +package internal + +import ( + "context" + "errors" + "fmt" + + "github.com/smartcontractkit/libocr/commontypes" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/pb" + "github.com/smartcontractkit/chainlink-relay/pkg/types" +) + +var _ types.Telemetry = (*telemetryClient)(nil) +var _ types.MonitoringEndpointGenerator = (*telemetryClient)(nil) +var _ commontypes.MonitoringEndpoint = (*telemetryEndpoint)(nil) + +type TelemetryClient struct { + *telemetryClient + + grpc pb.TelemetryClient +} + +type telemetryClient struct { + *brokerExt + grpc pb.TelemetryClient + + contractID string + telemetryType string + network string + chainID string +} + +type telemetryEndpoint struct { + grpc pb.TelemetryClient + relayID pb.RelayID + contractID string + telemetryType string +} + +func (t *telemetryEndpoint) SendLog(log []byte) { + _, err := t.grpc.Send(context.Background(), &pb.TelemetryMessage{ + RelayID: &t.relayID, + ContractID: t.contractID, + TelemetryType: t.telemetryType, + Payload: log, + }) + if err != nil { + //log error? + } +} + +func (t *telemetryClient) GenMonitoringEndpoint(contractID string, telemType string, network string, chainID string) commontypes.MonitoringEndpoint { + return &telemetryEndpoint{ + grpc: t.grpc, + relayID: pb.RelayID{ + Network: network, + ChainId: chainID, + }, + contractID: contractID, + telemetryType: telemType, + } +} + +func (t *telemetryClient) Send(ctx context.Context, contractID string, telemetryType string, network string, chainID string, payload []byte) error { + if contractID == "" { + return errors.New("contractID cannot be empty") + } + if telemetryType == "" { + return errors.New("telemetryType cannot be empty") + } + if network == "" { + return errors.New("network cannot be empty") + } + if chainID == "" { + return errors.New("chainId cannot be empty") + } + if len(payload) == 0 { + return errors.New("payload cannot be empty") + } + _, err := t.grpc.Send(ctx, &pb.TelemetryMessage{ + RelayID: &pb.RelayID{ + Network: network, + ChainId: chainID, + }, + ContractID: contractID, + TelemetryType: telemetryType, + Payload: payload, + }) + if err != nil { + return err + } + return nil +} + +var _ pb.TelemetryServer = (*telemetryServer)(nil) + +type telemetryServer struct { + pb.UnimplementedTelemetryServer + *brokerExt + + impl types.MonitoringEndpointGenerator + endpoints map[string]commontypes.MonitoringEndpoint +} + +func (t *telemetryServer) Send(ctx context.Context, message *pb.TelemetryMessage) (*emptypb.Empty, error) { + e, err := t.getOrCreateEndpoint(message) + if err != nil { + return nil, err + } + e.SendLog(message.Payload) + + return nil, nil +} + +func (t *telemetryServer) getOrCreateEndpoint(m *pb.TelemetryMessage) (commontypes.MonitoringEndpoint, error) { + if m.ContractID == "" { + return nil, errors.New("contractID cannot be empty") + } + if m.TelemetryType == "" { + return nil, errors.New("TelemetryType cannot be empty") + } + if m.RelayID == nil { + return nil, errors.New("RelayID cannot be nil") + } + if m.RelayID.Network == "" { + return nil, errors.New("RelayID.Network cannot be empty") + } + if m.RelayID.ChainId == "" { + return nil, errors.New("RelayID.ChainId cannot be empty") + } + + key := makeKey(m) + e, ok := t.endpoints[key] + if !ok { + e = t.impl.GenMonitoringEndpoint(m.ContractID, m.TelemetryType, m.RelayID.Network, m.RelayID.ChainId) + } + return e, nil +} + +func makeKey(m *pb.TelemetryMessage) string { + return fmt.Sprintf("%s_%s_%s_%s", m.RelayID.Network, m.RelayID.ChainId, m.ContractID, m.TelemetryType) +} diff --git a/pkg/types/telemetry.go b/pkg/types/telemetry.go new file mode 100644 index 000000000..cf1792cf0 --- /dev/null +++ b/pkg/types/telemetry.go @@ -0,0 +1,17 @@ +package types + +import ( + "context" + + "github.com/smartcontractkit/libocr/commontypes" +) + +type Telemetry interface { + Send(ctx context.Context, contractID string, telemetryType string, network string, chainID string, payload []byte) error + GenMonitoringEndpoint(contractID string, telemType string, network string, chainID string) commontypes.MonitoringEndpoint +} + +// MonitoringEndpointGenerator almost identical to synchronization.MonitoringEndpointGenerator except for the telemetry type being string after https://github.com/smartcontractkit/chainlink/pull/10623 gets merged +type MonitoringEndpointGenerator interface { + GenMonitoringEndpoint(contractID string, telemType string, network string, chainID string) commontypes.MonitoringEndpoint +}