diff --git a/internal/pdatagrpc/traces.go b/internal/pdatagrpc/traces.go index 9bbd9a23f22..38feab7c18d 100644 --- a/internal/pdatagrpc/traces.go +++ b/internal/pdatagrpc/traces.go @@ -72,6 +72,7 @@ type rawTracesServer struct { } func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) { + internal.TracesCompatibilityChanges(request) _, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request))) return &otlpcollectortraces.ExportTraceServiceResponse{}, err } diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 16445572236..24cb581e6e1 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -67,7 +67,7 @@ func createDefaultConfig() config.Receiver { // CreateTracesReceiver creates a trace receiver based on provided config. func createTracesReceiver( - ctx context.Context, + _ context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, nextConsumer consumer.Traces, @@ -76,7 +76,7 @@ func createTracesReceiver( return newOtlpReceiver(cfg.(*Config), set.Logger) }) - if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(ctx, nextConsumer); err != nil { + if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil { return nil, err } return r, nil @@ -84,7 +84,7 @@ func createTracesReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( - ctx context.Context, + _ context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, consumer consumer.Metrics, @@ -93,7 +93,7 @@ func createMetricsReceiver( return newOtlpReceiver(cfg.(*Config), set.Logger) }) - if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(ctx, consumer); err != nil { + if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil { return nil, err } return r, nil @@ -101,7 +101,7 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( - ctx context.Context, + _ context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, consumer consumer.Logs, @@ -110,7 +110,7 @@ func createLogReceiver( return newOtlpReceiver(cfg.(*Config), set.Logger) }) - if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(ctx, consumer); err != nil { + if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil { return nil, err } return r, nil diff --git a/receiver/otlpreceiver/internal/logs/otlp.go b/receiver/otlpreceiver/internal/logs/otlp.go index e677afa1f89..97740b9c054 100644 --- a/receiver/otlpreceiver/internal/logs/otlp.go +++ b/receiver/otlpreceiver/internal/logs/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -55,17 +53,15 @@ const ( var receiverID = config.NewIDWithName("otlp", "log") // Export implements the service Export logs func. -func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) { // We need to ensure that it propagates the receiver name as a tag ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - - ld := pdata.LogsFromInternalRep(internal.LogsFromOtlp(req)) err := r.sendToNextConsumer(ctxWithReceiverName, ld) if err != nil { return nil, err } - return &collectorlog.ExportLogsServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error { diff --git a/receiver/otlpreceiver/internal/logs/otlp_test.go b/receiver/otlpreceiver/internal/logs/otlp_test.go index 811f427bb61..b85054f8917 100644 --- a/receiver/otlpreceiver/internal/logs/otlp_test.go +++ b/receiver/otlpreceiver/internal/logs/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -116,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func()) // Now run it as a gRPC server srv := grpc.NewServer() - collectorlog.RegisterLogsServiceServer(srv, r) + pdatagrpc.RegisterLogsServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/internal/metrics/otlp.go b/receiver/otlpreceiver/internal/metrics/otlp.go index 23f5aa95b71..f40597fd91f 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp.go +++ b/receiver/otlpreceiver/internal/metrics/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -54,17 +52,14 @@ const ( var receiverID = config.NewIDWithName("otlp", "metrics") // Export implements the service Export metrics func. -func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) { receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - - md := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req)) - err := r.sendToNextConsumer(receiverCtx, md) if err != nil { return nil, err } - return &collectormetrics.ExportMetricsServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error { diff --git a/receiver/otlpreceiver/internal/metrics/otlp_test.go b/receiver/otlpreceiver/internal/metrics/otlp_test.go index 59247c95065..defabab9fa1 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp_test.go +++ b/receiver/otlpreceiver/internal/metrics/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -123,7 +122,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func r := New(receiverID, mc) // Now run it as a gRPC server srv := grpc.NewServer() - collectormetrics.RegisterMetricsServiceServer(srv, r) + pdatagrpc.RegisterMetricsServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index f5ec0455acd..57ec6c40121 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -21,8 +21,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/obsreport" ) @@ -55,17 +53,15 @@ const ( var receiverID = config.NewIDWithName("otlp", "trace") // Export implements the service Export traces func. -func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { +func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) { // We need to ensure that it propagates the receiver name as a tag ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport) - internal.TracesCompatibilityChanges(req) - td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req)) err := r.sendToNextConsumer(ctxWithReceiverName, td) if err != nil { return nil, err } - return &collectortrace.ExportTraceServiceResponse{}, nil + return nil, nil } func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error { diff --git a/receiver/otlpreceiver/internal/trace/otlp_test.go b/receiver/otlpreceiver/internal/trace/otlp_test.go index ba2590d5b02..ba854c21ec5 100644 --- a/receiver/otlpreceiver/internal/trace/otlp_test.go +++ b/receiver/otlpreceiver/internal/trace/otlp_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" ) @@ -118,7 +117,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func( // Now run it as a gRPC server srv := grpc.NewServer() - collectortrace.RegisterTraceServiceServer(srv, r) + pdatagrpc.RegisterTracesServer(srv, r) go func() { _ = srv.Serve(ln) }() diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 51b851a70eb..4f071b018eb 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -16,11 +16,12 @@ package otlpreceiver import ( "context" + "io/ioutil" "net" "net/http" "sync" - gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/gorilla/mux" "go.uber.org/zap" "google.golang.org/grpc" @@ -29,20 +30,24 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" - collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" - collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" - "go.opentelemetry.io/collector/receiver/otlpreceiver/internal" + "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/internal/otlp" + "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" ) +const ( + pbContentType = "application/x-protobuf" + jsonContentType = "application/json" +) + // otlpReceiver is the type that exposes Trace and Metrics reception. type otlpReceiver struct { cfg *Config serverGRPC *grpc.Server - gatewayMux *gatewayruntime.ServeMux + httpMux *mux.Router serverHTTP *http.Server traceReceiver *trace.Receiver @@ -62,19 +67,7 @@ func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver { logger: logger, } if cfg.HTTP != nil { - // Use our custom JSON marshaler instead of default Protobuf JSON marshaler. - // This is needed because OTLP spec defines encoding for trace and span id - // and it is only possible to do using Gogoproto-compatible JSONPb marshaler. - jsonpb := &internal.JSONPb{ - EmitDefaults: true, - Indent: " ", - OrigName: true, - } - r.gatewayMux = gatewayruntime.NewServeMux( - gatewayruntime.WithProtoErrorHandler(gatewayruntime.DefaultHTTPProtoErrorHandler), - gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), - gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb), - ) + r.httpMux = mux.NewRouter() } return r @@ -127,15 +120,15 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { r.serverGRPC = grpc.NewServer(opts...) if r.traceReceiver != nil { - collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) + pdatagrpc.RegisterTracesServer(r.serverGRPC, r.traceReceiver) } if r.metricsReceiver != nil { - collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) + pdatagrpc.RegisterMetricsServer(r.serverGRPC, r.metricsReceiver) } if r.logReceiver != nil { - collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver) + pdatagrpc.RegisterLogsServer(r.serverGRPC, r.logReceiver) } err = r.startGRPCServer(r.cfg.GRPC, host) @@ -157,7 +150,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { } if r.cfg.HTTP != nil { r.serverHTTP = r.cfg.HTTP.ToServer( - r.gatewayMux, + r.httpMux, confighttp.WithErrorHandler(errorHandler), ) err = r.startHTTPServer(r.cfg.HTTP, host) @@ -191,40 +184,163 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error { return err } -func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Traces) error { +var tracesPbUnmarshaler = otlp.NewProtobufTracesUnmarshaler() +var tracesJSONUnmarshaler = otlp.NewJSONTracesUnmarshaler() + +func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error { if tc == nil { return componenterror.ErrNilNextConsumer } r.traceReceiver = trace.New(r.cfg.ID(), tc) - if r.gatewayMux != nil { - err := collectortrace.RegisterTraceServiceHandlerServer(ctx, r.gatewayMux, r.traceReceiver) - if err != nil { - return err - } - // Also register an alias handler. This fixes bug https://github.com/open-telemetry/opentelemetry-collector/issues/1968 - return collectortrace.RegisterTraceServiceHandlerServerAlias(ctx, r.gatewayMux, r.traceReceiver) + if r.httpMux != nil { + r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) { + handleHttpTraces(resp, req, pbContentType, r.traceReceiver, tracesPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", pbContentType) + // For backwards compatibility see https://github.com/open-telemetry/opentelemetry-collector/issues/1968 + r.httpMux.HandleFunc("/v1/trace", func(resp http.ResponseWriter, req *http.Request) { + handleHttpTraces(resp, req, pbContentType, r.traceReceiver, tracesPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", pbContentType) + r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) { + handleHttpTraces(resp, req, jsonContentType, r.traceReceiver, tracesJSONUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", jsonContentType) + // For backwards compatibility see https://github.com/open-telemetry/opentelemetry-collector/issues/1968 + r.httpMux.HandleFunc("/v1/trace", func(resp http.ResponseWriter, req *http.Request) { + handleHttpTraces(resp, req, jsonContentType, r.traceReceiver, tracesJSONUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", jsonContentType) } return nil } -func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.Metrics) error { +var metricsPbUnmarshaler = otlp.NewProtobufMetricsUnmarshaler() +var metricsJSONUnmarshaler = otlp.NewJSONMetricsUnmarshaler() + +func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error { if mc == nil { return componenterror.ErrNilNextConsumer } r.metricsReceiver = metrics.New(r.cfg.ID(), mc) - if r.gatewayMux != nil { - return collectormetrics.RegisterMetricsServiceHandlerServer(ctx, r.gatewayMux, r.metricsReceiver) + if r.httpMux != nil { + r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) { + handleHttpMetrics(resp, req, pbContentType, r.metricsReceiver, metricsPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", pbContentType) + r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) { + handleHttpMetrics(resp, req, jsonContentType, r.metricsReceiver, metricsJSONUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", jsonContentType) } return nil } -func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.Logs) error { - if tc == nil { +var logsPbUnmarshaler = otlp.NewProtobufLogsUnmarshaler() +var logsJSONUnmarshaler = otlp.NewJSONLogsUnmarshaler() + +func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error { + if lc == nil { return componenterror.ErrNilNextConsumer } - r.logReceiver = logs.New(r.cfg.ID(), tc) - if r.gatewayMux != nil { - return collectorlog.RegisterLogsServiceHandlerServer(ctx, r.gatewayMux, r.logReceiver) + r.logReceiver = logs.New(r.cfg.ID(), lc) + if r.httpMux != nil { + r.httpMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, req *http.Request) { + handleHttpLogs(w, req, pbContentType, r.logReceiver, logsPbUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", pbContentType) + r.httpMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, req *http.Request) { + handleHttpLogs(w, req, jsonContentType, r.logReceiver, logsJSONUnmarshaler) + }).Methods(http.MethodPost).Headers("Content-Type", jsonContentType) } return nil } + +func handleHttpTraces( + resp http.ResponseWriter, + req *http.Request, + contentType string, + tracesReceiver *trace.Receiver, + tracesUnmarshaler model.TracesUnmarshaler) { + body, err := ioutil.ReadAll(req.Body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + if err = req.Body.Close(); err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + td, err := tracesUnmarshaler.Unmarshal(body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + _, err = tracesReceiver.Export(req.Context(), td) + if err != nil { + writeError(resp, req, err, http.StatusInternalServerError) + return + } + + resp.Header().Set("Content-Type", contentType) + // TODO: Encode response and write it to the response. +} + +func handleHttpMetrics( + resp http.ResponseWriter, + req *http.Request, + contentType string, + metricsReceiver *metrics.Receiver, + metricsUnmarshaler model.MetricsUnmarshaler) { + body, err := ioutil.ReadAll(req.Body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + if err = req.Body.Close(); err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + md, err := metricsUnmarshaler.Unmarshal(body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + _, err = metricsReceiver.Export(req.Context(), md) + if err != nil { + writeError(resp, req, err, http.StatusInternalServerError) + return + } + + resp.Header().Set("Content-Type", contentType) + // TODO: Encode response and write it to the response. +} + +func handleHttpLogs( + resp http.ResponseWriter, + req *http.Request, + contentType string, + logsReceiver *logs.Receiver, + logsUnmarshaler model.LogsUnmarshaler) { + body, err := ioutil.ReadAll(req.Body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + if err = req.Body.Close(); err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + ld, err := logsUnmarshaler.Unmarshal(body) + if err != nil { + writeError(resp, req, err, http.StatusBadRequest) + return + } + + _, err = logsReceiver.Export(req.Context(), ld) + if err != nil { + writeError(resp, req, err, http.StatusInternalServerError) + return + } + + resp.Header().Set("Content-Type", contentType) + // TODO: Encode response and write it to the response. +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 66f70b244a0..ce16da21bfa 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -413,7 +413,7 @@ func testHTTPProtobufRequest( require.NoError(t, err, "Error reading response from trace grpc-gateway") require.NoError(t, resp.Body.Close(), "Error closing response body") - require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") + assert.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") allTraces := tSink.AllTraces() diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index 9da35519f64..fbe36fd1555 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -38,24 +38,40 @@ func (*xProtobufMarshaler) ContentType() string { var jsonMarshaler = &jsonpb.Marshaler{} +// writeError encodes the HTTP error message inside a rpc.Status message as required +// by the OTLP protocol. +func writeError(w http.ResponseWriter, r *http.Request, err error, statusCode int) { + s, ok := status.FromError(err) + if ok { + writeStatus(w, r, s, statusCode) + } else { + errorHandler(w, r, err.Error(), statusCode) + } +} + // errorHandler encodes the HTTP error message inside a rpc.Status message as required // by the OTLP protocol. func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusCode int) { - var ( - msg []byte - s *status.Status - err error - ) - // Pre-computed status with code=Internal to be used in case of a marshaling error. - fallbackMsg := []byte(`{"code": 13, "message": "failed to marshal error message"}`) - fallbackContentType := "application/json" - + var s *status.Status if statusCode == http.StatusBadRequest { s = status.New(codes.InvalidArgument, errMsg) } else { - s = status.New(codes.Internal, errMsg) + s = status.New(codes.Unknown, errMsg) } + writeStatus(w, r, s, statusCode) +} + +// Pre-computed status with code=Internal to be used in case of a marshaling error. +var fallbackMsg = []byte(`{"code": 13, "message": "failed to marshal error message"}`) + +const fallbackContentType = "application/json" + +// writeError encodes the HTTP error message inside a rpc.Status message as required +// by the OTLP protocol. +func writeStatus(w http.ResponseWriter, r *http.Request, s *status.Status, statusCode int) { + var err error + var msg []byte contentType := r.Header.Get("Content-Type") if contentType == "application/json" { buf := new(bytes.Buffer)