Skip to content

Commit

Permalink
Change oltpreceiver to use the new pdatagrpc, avoid proto dependency
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jun 14, 2021
1 parent cda019f commit 4da96f2
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 38 deletions.
1 change: 1 addition & 0 deletions internal/pdatagrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type rawTracesServer struct {
}

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) {
internal.TracesCompatibilityChanges(request)
_, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return &otlpcollectortraces.ExportTraceServiceResponse{}, err
}
8 changes: 2 additions & 6 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -55,17 +53,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)

ld := pdata.LogsFromInternalRep(internal.LogsFromOtlp(req))
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return nil, err
}

return &collectorlog.ExportLogsServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -116,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func())

// Now run it as a gRPC server
srv := grpc.NewServer()
collectorlog.RegisterLogsServiceServer(srv, r)
pdatagrpc.RegisterLogsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
9 changes: 2 additions & 7 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -54,17 +52,14 @@ const (
var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)

md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req))

err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return nil, err
}

return &collectormetrics.ExportMetricsServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -123,7 +122,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func
r := New(receiverID, mc)
// Now run it as a gRPC server
srv := grpc.NewServer()
collectormetrics.RegisterMetricsServiceServer(srv, r)
pdatagrpc.RegisterMetricsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
8 changes: 2 additions & 6 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -55,17 +53,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
internal.TracesCompatibilityChanges(req)
td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req))
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return nil, err
}

return &collectortrace.ExportTraceServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -118,7 +117,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func(

// Now run it as a gRPC server
srv := grpc.NewServer()
collectortrace.RegisterTraceServiceServer(srv, r)
pdatagrpc.RegisterTracesServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
10 changes: 4 additions & 6 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/otlp"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
Expand Down Expand Up @@ -120,15 +118,15 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
r.serverGRPC = grpc.NewServer(opts...)

if r.traceReceiver != nil {
collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver)
pdatagrpc.RegisterTracesServer(r.serverGRPC, r.traceReceiver)
}

if r.metricsReceiver != nil {
collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver)
pdatagrpc.RegisterMetricsServer(r.serverGRPC, r.metricsReceiver)
}

if r.logReceiver != nil {
collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver)
pdatagrpc.RegisterLogsServer(r.serverGRPC, r.logReceiver)
}

err = r.startGRPCServer(r.cfg.GRPC, host)
Expand Down
17 changes: 10 additions & 7 deletions receiver/otlpreceiver/otlphttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/internal/model"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
Expand All @@ -50,13 +50,14 @@ func handleTraces(
return
}

rsp, err := tracesReceiver.Export(req.Context(), internal.TracesToOtlp(td.InternalRep()))
_, err = tracesReceiver.Export(req.Context(), td)
if err != nil {
writeError(resp, contentType, err, http.StatusInternalServerError)
return
}

writeResponse(resp, contentType, http.StatusOK, rsp)
// TODO: Pass response from grpc handler when pdatagrpc returns concrete type.
writeResponse(resp, contentType, http.StatusOK, &types.Empty{})
}

func handleMetrics(
Expand All @@ -76,13 +77,14 @@ func handleMetrics(
return
}

rsp, err := metricsReceiver.Export(req.Context(), internal.MetricsToOtlp(md.InternalRep()))
_, err = metricsReceiver.Export(req.Context(), md)
if err != nil {
writeError(resp, contentType, err, http.StatusInternalServerError)
return
}

writeResponse(resp, contentType, http.StatusOK, rsp)
// TODO: Pass response from grpc handler when pdatagrpc returns concrete type.
writeResponse(resp, contentType, http.StatusOK, &types.Empty{})
}

func handleLogs(
Expand All @@ -102,13 +104,14 @@ func handleLogs(
return
}

rsp, err := logsReceiver.Export(req.Context(), internal.LogsToOtlp(ld.InternalRep()))
_, err = logsReceiver.Export(req.Context(), ld)
if err != nil {
writeError(resp, contentType, err, http.StatusInternalServerError)
return
}

writeResponse(resp, contentType, http.StatusOK, rsp)
// TODO: Pass response from grpc handler when pdatagrpc returns concrete type.
writeResponse(resp, contentType, http.StatusOK, &types.Empty{})
}

func readAndCloseBody(resp http.ResponseWriter, req *http.Request, contentType string) ([]byte, bool) {
Expand Down

0 comments on commit 4da96f2

Please sign in to comment.