Skip to content

Commit

Permalink
Change oltpreceiver to use the new unmarshaler, avoid grpc-gateway de…
Browse files Browse the repository at this point in the history
…pendency

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jun 9, 2021
1 parent ce31fce commit edd6eda
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 81 deletions.
1 change: 1 addition & 0 deletions internal/pdatagrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type rawTracesServer struct {
}

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) {
internal.TracesCompatibilityChanges(request)
_, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return &otlpcollectortraces.ExportTraceServiceResponse{}, err
}
12 changes: 6 additions & 6 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func createDefaultConfig() config.Receiver {

// CreateTracesReceiver creates a trace receiver based on provided config.
func createTracesReceiver(
ctx context.Context,
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
nextConsumer consumer.Traces,
Expand All @@ -76,15 +76,15 @@ func createTracesReceiver(
return newOtlpReceiver(cfg.(*Config), set.Logger)
})

if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(ctx, nextConsumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
consumer consumer.Metrics,
Expand All @@ -93,15 +93,15 @@ func createMetricsReceiver(
return newOtlpReceiver(cfg.(*Config), set.Logger)
})

if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(ctx, consumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
}

// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
consumer consumer.Logs,
Expand All @@ -110,7 +110,7 @@ func createLogReceiver(
return newOtlpReceiver(cfg.(*Config), set.Logger)
})

if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(ctx, consumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
Expand Down
8 changes: 2 additions & 6 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -55,17 +53,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)

ld := pdata.LogsFromInternalRep(internal.LogsFromOtlp(req))
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return nil, err
}

return &collectorlog.ExportLogsServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -116,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func())

// Now run it as a gRPC server
srv := grpc.NewServer()
collectorlog.RegisterLogsServiceServer(srv, r)
pdatagrpc.RegisterLogsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
9 changes: 2 additions & 7 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -54,17 +52,14 @@ const (
var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)

md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req))

err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return nil, err
}

return &collectormetrics.ExportMetricsServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -123,7 +122,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func
r := New(receiverID, mc)
// Now run it as a gRPC server
srv := grpc.NewServer()
collectormetrics.RegisterMetricsServiceServer(srv, r)
pdatagrpc.RegisterMetricsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
8 changes: 2 additions & 6 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -55,17 +53,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) {
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
internal.TracesCompatibilityChanges(req)
td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req))
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return nil, err
}

return &collectortrace.ExportTraceServiceResponse{}, nil
return nil, nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
Expand Down
3 changes: 1 addition & 2 deletions receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
)
Expand Down Expand Up @@ -118,7 +117,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func(

// Now run it as a gRPC server
srv := grpc.NewServer()
collectortrace.RegisterTraceServiceServer(srv, r)
pdatagrpc.RegisterTracesServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
Loading

0 comments on commit edd6eda

Please sign in to comment.