diff --git a/internal/otlp/pb_decoder.go b/internal/otlp/pb_decoder.go index 07a8cc9bdf2..be33f7a1a82 100644 --- a/internal/otlp/pb_decoder.go +++ b/internal/otlp/pb_decoder.go @@ -15,6 +15,7 @@ package otlp import ( + "go.opentelemetry.io/collector/internal" otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" @@ -41,5 +42,8 @@ func (d *pbDecoder) DecodeMetrics(buf []byte) (interface{}, error) { func (d *pbDecoder) DecodeTraces(buf []byte) (interface{}, error) { td := &otlpcollectortrace.ExportTraceServiceRequest{} err := td.Unmarshal(buf) + if err == nil { + internal.TracesCompatibilityChanges(td) + } return td, err } diff --git a/internal/pdatagrpc/traces.go b/internal/pdatagrpc/traces.go index 9bbd9a23f22..38feab7c18d 100644 --- a/internal/pdatagrpc/traces.go +++ b/internal/pdatagrpc/traces.go @@ -72,6 +72,7 @@ type rawTracesServer struct { } func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) { + internal.TracesCompatibilityChanges(request) _, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request))) return &otlpcollectortraces.ExportTraceServiceResponse{}, err } diff --git a/receiver/otlpreceiver/internal/logs/otlp.go b/receiver/otlpreceiver/internal/logs/otlp.go index e677afa1f89..97740b9c054 100644 --- a/receiver/otlpreceiver/internal/logs/otlp.go +++ b/receiver/otlpreceiver/internal/logs/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -55,17 +53,15 @@ const ( var receiverID = config.NewIDWithName("otlp", "log") // Export implements the service Export logs func. -func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) { // We need to ensure that it propagates the receiver name as a tag ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - - ld := pdata.LogsFromInternalRep(internal.LogsFromOtlp(req)) err := r.sendToNextConsumer(ctxWithReceiverName, ld) if err != nil { return nil, err } - return &collectorlog.ExportLogsServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error { diff --git a/receiver/otlpreceiver/internal/logs/otlp_test.go b/receiver/otlpreceiver/internal/logs/otlp_test.go index 811f427bb61..b85054f8917 100644 --- a/receiver/otlpreceiver/internal/logs/otlp_test.go +++ b/receiver/otlpreceiver/internal/logs/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -116,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func()) // Now run it as a gRPC server srv := grpc.NewServer() - collectorlog.RegisterLogsServiceServer(srv, r) + pdatagrpc.RegisterLogsServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/internal/metrics/otlp.go b/receiver/otlpreceiver/internal/metrics/otlp.go index 23f5aa95b71..f40597fd91f 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp.go +++ b/receiver/otlpreceiver/internal/metrics/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -54,17 +52,14 @@ const ( var receiverID = config.NewIDWithName("otlp", "metrics") // Export implements the service Export metrics func. -func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) { receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - - md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req)) - err := r.sendToNextConsumer(receiverCtx, md) if err != nil { return nil, err } - return &collectormetrics.ExportMetricsServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error { diff --git a/receiver/otlpreceiver/internal/metrics/otlp_test.go b/receiver/otlpreceiver/internal/metrics/otlp_test.go index 59247c95065..defabab9fa1 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp_test.go +++ b/receiver/otlpreceiver/internal/metrics/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -123,7 +122,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func r := New(receiverID, mc) // Now run it as a gRPC server srv := grpc.NewServer() - collectormetrics.RegisterMetricsServiceServer(srv, r) + pdatagrpc.RegisterMetricsServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index f5ec0455acd..57ec6c40121 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -55,17 +53,15 @@ const ( var receiverID = config.NewIDWithName("otlp", "trace") // Export implements the service Export traces func. -func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) { // We need to ensure that it propagates the receiver name as a tag ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - internal.TracesCompatibilityChanges(req) - td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req)) err := r.sendToNextConsumer(ctxWithReceiverName, td) if err != nil { return nil, err } - return &collectortrace.ExportTraceServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error { diff --git a/receiver/otlpreceiver/internal/trace/otlp_test.go b/receiver/otlpreceiver/internal/trace/otlp_test.go index ba2590d5b02..ba854c21ec5 100644 --- a/receiver/otlpreceiver/internal/trace/otlp_test.go +++ b/receiver/otlpreceiver/internal/trace/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -118,7 +117,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func( // Now run it as a gRPC server srv := grpc.NewServer() - collectortrace.RegisterTraceServiceServer(srv, r) + pdatagrpc.RegisterTracesServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 4375118f7d9..d3cade21fe1 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -29,10 +29,8 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/internal/otlp" + "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" @@ -120,15 +118,15 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { r.serverGRPC = grpc.NewServer(opts...) if r.traceReceiver != nil { - collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) + pdatagrpc.RegisterTracesServer(r.serverGRPC, r.traceReceiver) } if r.metricsReceiver != nil { - collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) + pdatagrpc.RegisterMetricsServer(r.serverGRPC, r.metricsReceiver) } if r.logReceiver != nil { - collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver) + pdatagrpc.RegisterLogsServer(r.serverGRPC, r.logReceiver) } err = r.startGRPCServer(r.cfg.GRPC, host) diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index c8339d318af..911c6af3f96 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -21,10 +21,10 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" @@ -50,13 +50,14 @@ func handleTraces( return } - rsp, err := tracesReceiver.Export(req.Context(), internal.TracesToOtlp(td.InternalRep())) + _, err = tracesReceiver.Export(req.Context(), td) if err != nil { writeError(resp, contentType, err, http.StatusInternalServerError) return } - writeResponse(resp, contentType, http.StatusOK, rsp) + // TODO: Pass response from grpc handler when pdatagrpc returns concrete type. + writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) } func handleMetrics( @@ -76,13 +77,14 @@ func handleMetrics( return } - rsp, err := metricsReceiver.Export(req.Context(), internal.MetricsToOtlp(md.InternalRep())) + _, err = metricsReceiver.Export(req.Context(), md) if err != nil { writeError(resp, contentType, err, http.StatusInternalServerError) return } - writeResponse(resp, contentType, http.StatusOK, rsp) + // TODO: Pass response from grpc handler when pdatagrpc returns concrete type. + writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) } func handleLogs( @@ -102,13 +104,14 @@ func handleLogs( return } - rsp, err := logsReceiver.Export(req.Context(), internal.LogsToOtlp(ld.InternalRep())) + _, err = logsReceiver.Export(req.Context(), ld) if err != nil { writeError(resp, contentType, err, http.StatusInternalServerError) return } - writeResponse(resp, contentType, http.StatusOK, rsp) + // TODO: Pass response from grpc handler when pdatagrpc returns concrete type. + writeResponse(resp, contentType, http.StatusOK, &types.Empty{}) } func readAndCloseBody(resp http.ResponseWriter, req *http.Request, contentType string) ([]byte, bool) {