From bf568ec262dd3ab44fb09f246fa0db249f2121ed Mon Sep 17 00:00:00 2001 From: Evan Bradley Date: Tue, 17 Jan 2023 11:05:00 -0500 Subject: [PATCH 1/5] Handle partial success responses in the OTLP HTTP exporter --- .chloggen/otlphttp-partial-success.yaml | 16 ++ exporter/otlphttpexporter/otlp.go | 90 +++++++- exporter/otlphttpexporter/otlp_test.go | 284 ++++++++++++++++++++---- 3 files changed, 340 insertions(+), 50 deletions(-) create mode 100755 .chloggen/otlphttp-partial-success.yaml diff --git a/.chloggen/otlphttp-partial-success.yaml b/.chloggen/otlphttp-partial-success.yaml new file mode 100755 index 00000000000..78635f3b421 --- /dev/null +++ b/.chloggen/otlphttp-partial-success.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Treat partial success responses as errors + +# One or more tracking issues or pull requests related to the change +issues: [6686] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 3ced7ef4cec..9790c932380 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -90,7 +90,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.tracesURL, request) + return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler) } func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -99,7 +99,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro if err != nil { return consumererror.NewPermanent(err) } - return e.export(ctx, e.metricsURL, request) + return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler) } func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { @@ -109,10 +109,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.logsURL, request) + return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte) error { +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -133,6 +133,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e }() if resp.StatusCode >= 200 && resp.StatusCode <= 299 { + if err := handlePartialSuccessResponse(resp, partialSuccessHandler); err != nil { + return err + } + // Request is successful. return nil } @@ -214,3 +218,81 @@ func readResponse(resp *http.Response) *status.Status { return respStatus } + +func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { + if resp.ContentLength == 0 { + return nil + } + + maxRead := resp.ContentLength + needsResize := false + if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { + maxRead = maxHTTPResponseReadBytes + needsResize = true + } + protoBytes := make([]byte, maxRead) + n, err := io.ReadFull(resp.Body, protoBytes) + + // No bytes read and an EOF error indicates there is no body to read. + if n == 0 && (err == nil || errors.Is(err, io.EOF)) { + return nil + } + + // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header + // wasn't set, since we will try to read past the length of the body. If this + // is the case, the body will still have the full message in it, so we want to + // ignore the error and parse the message. + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return err + } + + // The pdata unmarshaling methods check for the length of the slice + // when unmarshaling it, so we have to trim down the length to the + // actual size of the data. + if needsResize { + protoBytes = protoBytes[:n] + } + + return partialSuccessHandler(protoBytes) +} + +type partialSuccessHandler func(protoBytes []byte) error + +func tracesPartialSuccessHandler(protoBytes []byte) error { + exportResponse := ptraceotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans())) + } + return nil +} + +func metricsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := pmetricotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints())) + } + return nil +} + +func logsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := plogotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords())) + } + return nil +} diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 255968d1beb..b8268568b29 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -37,7 +37,9 @@ import ( "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -475,8 +477,7 @@ func TestErrorResponses(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { for k, v := range test.headers { writer.Header().Add(k, v) } @@ -488,15 +489,8 @@ func TestErrorResponses(t *testing.T) { require.NoError(t, err) } }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), @@ -523,8 +517,6 @@ func TestErrorResponses(t *testing.T) { } else { assert.EqualValues(t, test.err, err) } - - srv.Close() }) } } @@ -559,20 +551,12 @@ func TestUserAgent(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), @@ -594,8 +578,6 @@ func TestUserAgent(t *testing.T) { traces := ptrace.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) require.NoError(t, err) - - srv.Close() }) } }) @@ -603,20 +585,12 @@ func TestUserAgent(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), @@ -638,8 +612,6 @@ func TestUserAgent(t *testing.T) { metrics := pmetric.NewMetrics() err = exp.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) - - srv.Close() }) } }) @@ -647,20 +619,12 @@ func TestUserAgent(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), @@ -688,3 +652,231 @@ func TestUserAgent(t *testing.T) { } }) } + +func TestPartialSuccess(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + set := exportertest.NewNopCreateSettings() + + t.Run("traces", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.Error(t, err) + }) + + t.Run("metrics", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + response := pmetricotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedDataPoints(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.Error(t, err) + }) + + t.Run("logs", func(t *testing.T) { + srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + response := plogotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedLogRecords(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() + + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.Error(t, err) + }) + + t.Run("Response is missing a Content-Length header but includes a partial success object", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.True(t, consumererror.IsPermanent(err)) + }) + + t.Run("Response is missing a Content-Length header and a body", func(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte{})), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Nil(t, err) + }) + + t.Run("Reading the response body returns an error other than ErrUnexpectedEOF", func(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(badReader{}), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + t.Run("Response has short Content-Length header", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + t.Run("Response has long Content-Length header", func(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 4096, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) + }) + + invalidBodyCases := []struct { + telemetryType string + handler partialSuccessHandler + }{ + { + telemetryType: "traces", + handler: tracesPartialSuccessHandler, + }, + { + telemetryType: "metrics", + handler: metricsPartialSuccessHandler, + }, + { + telemetryType: "logs", + handler: logsPartialSuccessHandler, + }, + } + for _, tt := range invalidBodyCases { + t.Run("Invalid response body: "+tt.telemetryType, func(t *testing.T) { + str := "invalid proto" + body := bytes.NewBufferString(str) + resp := &http.Response{ + ContentLength: int64(len(str)), + Body: io.NopCloser(body), + } + err := handlePartialSuccessResponse(resp, tt.handler) + assert.Error(t, err) + }) + } + +} + +func createBackend(addr string, endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) (*http.Server, error) { + mux := http.NewServeMux() + mux.HandleFunc(endpoint, handler) + srv := http.Server{ + Addr: addr, + Handler: mux, + } + ln, err := net.Listen("tcp", addr) + if err != nil { + return &http.Server{}, err + } + go func() { + _ = srv.Serve(ln) + }() + + return &srv, nil +} + +type badReader struct{} + +func (b badReader) Read([]byte) (int, error) { + return 0, errors.New("Bad read") +} From 269f4f562fa5502213cfb236fa81df64125b50e4 Mon Sep 17 00:00:00 2001 From: Evan Bradley Date: Mon, 27 Mar 2023 08:56:28 -0400 Subject: [PATCH 2/5] Address PR feedback --- exporter/otlphttpexporter/otlp.go | 83 ++++--- exporter/otlphttpexporter/otlp_test.go | 304 ++++++++++--------------- 2 files changed, 158 insertions(+), 229 deletions(-) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 9790c932380..b6041d419eb 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -141,7 +141,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p return nil } - respStatus := readResponse(resp) + respStatus := readResponseStatus(resp) // Format the error message. Use the status if it is present in the response. var formattedErr error @@ -192,27 +192,49 @@ func isRetryableStatusCode(code int) bool { } } +func readResponseBody(body io.ReadCloser) ([]byte, error) { + // Read the maximum number of bytes allowed in a request. This avoids + // issues with missing or invalid Content-Length headers. + protoBytes := make([]byte, maxHTTPResponseReadBytes) + n, err := io.ReadFull(body, protoBytes) + + // No bytes read and an EOF error indicates there is no body to read. + if n == 0 && (err == nil || errors.Is(err, io.EOF)) { + return nil, nil + } + + // io.ReadFull will return io.ErrorUnexpectedEOF in most cases since there + // will usually be a mismatch between the length of the byte slice and the + // size of the body, so we ignore that error. + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return nil, err + } + + // The pdata unmarshaling methods check for the length of the slice + // when unmarshaling it, so we have to trim down the length to the + // actual size of the data. + return protoBytes[:n], nil +} + // Read the response and decode the status.Status from the body. // Returns nil if the response is empty or cannot be decoded. -func readResponse(resp *http.Response) *status.Status { +func readResponseStatus(resp *http.Response) *status.Status { var respStatus *status.Status if resp.StatusCode >= 400 && resp.StatusCode <= 599 { // Request failed. Read the body. OTLP spec says: // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." - maxRead := resp.ContentLength - if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { - maxRead = maxHTTPResponseReadBytes + respBytes, err := readResponseBody(resp.Body) + + if err != nil { + return nil } - respBytes := make([]byte, maxRead) - n, err := io.ReadFull(resp.Body, respBytes) - if err == nil && n > 0 { - // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures - respStatus = &status.Status{} - err = proto.Unmarshal(respBytes, respStatus) - if err != nil { - respStatus = nil - } + + // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures + respStatus = &status.Status{} + err = proto.Unmarshal(respBytes, respStatus) + if err != nil { + return nil } } @@ -220,40 +242,13 @@ func readResponse(resp *http.Response) *status.Status { } func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { - if resp.ContentLength == 0 { - return nil - } - - maxRead := resp.ContentLength - needsResize := false - if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { - maxRead = maxHTTPResponseReadBytes - needsResize = true - } - protoBytes := make([]byte, maxRead) - n, err := io.ReadFull(resp.Body, protoBytes) + bodyBytes, err := readResponseBody(resp.Body) - // No bytes read and an EOF error indicates there is no body to read. - if n == 0 && (err == nil || errors.Is(err, io.EOF)) { - return nil - } - - // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header - // wasn't set, since we will try to read past the length of the body. If this - // is the case, the body will still have the full message in it, so we want to - // ignore the error and parse the message. - if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + if err != nil { return err } - // The pdata unmarshaling methods check for the length of the slice - // when unmarshaling it, so we have to trim down the length to the - // actual size of the data. - if needsResize { - protoBytes = protoBytes[:n] - } - - return partialSuccessHandler(protoBytes) + return partialSuccessHandler(bodyBytes) } type partialSuccessHandler func(protoBytes []byte) error diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index b8268568b29..3dab1bde58d 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -12,7 +12,6 @@ import ( "errors" "fmt" "io" - "net" "net/http" "net/http/httptest" "testing" @@ -373,14 +372,15 @@ func startAndCleanup(t *testing.T, cmp component.Component) { } func TestErrorResponses(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - errMsgPrefix := fmt.Sprintf("error exporting items, request to http://%s/v1/traces responded with HTTP Status Code ", addr) + errMsgPrefix := func(srv *httptest.Server) string { + return fmt.Sprintf("error exporting items, request to %s/v1/traces responded with HTTP Status Code ", srv.URL) + } tests := []struct { name string responseStatus int responseBody *status.Status - err error + err func(srv *httptest.Server) error isPermErr bool headers map[string]string }{ @@ -430,9 +430,11 @@ func TestErrorResponses(t *testing.T) { name: "419", responseStatus: http.StatusTooManyRequests, responseBody: status.New(codes.InvalidArgument, "Quota exceeded"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"429, Message=Quota exceeded, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "500", @@ -452,18 +454,22 @@ func TestErrorResponses(t *testing.T) { name: "503", responseStatus: http.StatusServiceUnavailable, responseBody: status.New(codes.InvalidArgument, "Server overloaded"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "503-Retry-After", responseStatus: http.StatusServiceUnavailable, responseBody: status.New(codes.InvalidArgument, "Server overloaded"), headers: map[string]string{"Retry-After": "30"}, - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"), - time.Duration(30)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"), + time.Duration(30)*time.Second) + }, }, { name: "504", @@ -477,7 +483,7 @@ func TestErrorResponses(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { for k, v := range test.headers { writer.Header().Add(k, v) } @@ -493,7 +499,7 @@ func TestErrorResponses(t *testing.T) { defer srv.Close() cfg := &Config{ - TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), // Create without QueueSettings and RetrySettings so that ConsumeTraces // returns the errors that we want to check immediately. } @@ -515,14 +521,13 @@ func TestErrorResponses(t *testing.T) { if test.isPermErr { assert.True(t, consumererror.IsPermanent(err)) } else { - assert.EqualValues(t, test.err, err) + assert.EqualValues(t, test.err(srv), err) } }) } } func TestUserAgent(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -551,7 +556,7 @@ func TestUserAgent(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) @@ -559,7 +564,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ - TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -585,7 +590,7 @@ func TestUserAgent(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) @@ -593,7 +598,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ - MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -619,7 +624,7 @@ func TestUserAgent(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv, err := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) @@ -627,7 +632,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ - LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -653,178 +658,117 @@ func TestUserAgent(t *testing.T) { }) } -func TestPartialSuccess(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - set := exportertest.NewNopCreateSettings() - - t.Run("traces", func(t *testing.T) { - srv, err := createBackend(addr, "/v1/traces", func(writer http.ResponseWriter, request *http.Request) { - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - bytes, err := response.MarshalProto() - require.NoError(t, err) - _, err = writer.Write(bytes) - require.NoError(t, err) - }) +func TestPartialSuccess_traces(t *testing.T) { + srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + bytes, err := response.MarshalProto() require.NoError(t, err) - defer srv.Close() - - cfg := &Config{ - TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), - HTTPClientSettings: confighttp.HTTPClientSettings{}, - } - exp, err := createTracesExporter(context.Background(), set, cfg) + _, err = writer.Write(bytes) require.NoError(t, err) + }) + require.NoError(t, err) + defer srv.Close() - // start the exporter - err = exp.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, exp.Shutdown(context.Background())) - }) + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) - // generate data - traces := ptrace.NewTraces() - err = exp.ConsumeTraces(context.Background(), traces) - require.Error(t, err) + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) }) - t.Run("metrics", func(t *testing.T) { - srv, err := createBackend(addr, "/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { - response := pmetricotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedDataPoints(1) - bytes, err := response.MarshalProto() - require.NoError(t, err) - _, err = writer.Write(bytes) - require.NoError(t, err) - }) - require.NoError(t, err) - defer srv.Close() + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.Error(t, err) +} - cfg := &Config{ - MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), - HTTPClientSettings: confighttp.HTTPClientSettings{}, - } - exp, err := createMetricsExporter(context.Background(), set, cfg) +func TestPartialSuccess_metrics(t *testing.T) { + srv, err := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + response := pmetricotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedDataPoints(1) + bytes, err := response.MarshalProto() require.NoError(t, err) - - // start the exporter - err = exp.Start(context.Background(), componenttest.NewNopHost()) + _, err = writer.Write(bytes) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, exp.Shutdown(context.Background())) - }) - - // generate data - metrics := pmetric.NewMetrics() - err = exp.ConsumeMetrics(context.Background(), metrics) - require.Error(t, err) }) + require.NoError(t, err) + defer srv.Close() - t.Run("logs", func(t *testing.T) { - srv, err := createBackend(addr, "/v1/logs", func(writer http.ResponseWriter, request *http.Request) { - response := plogotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedLogRecords(1) - bytes, err := response.MarshalProto() - require.NoError(t, err) - _, err = writer.Write(bytes) - require.NoError(t, err) - }) - require.NoError(t, err) - defer srv.Close() - - cfg := &Config{ - LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), - HTTPClientSettings: confighttp.HTTPClientSettings{}, - } - exp, err := createLogsExporter(context.Background(), set, cfg) - require.NoError(t, err) - - // start the exporter - err = exp.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, exp.Shutdown(context.Background())) - }) + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) - // generate data - logs := plog.NewLogs() - err = exp.ConsumeLogs(context.Background(), logs) - require.Error(t, err) + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) }) - t.Run("Response is missing a Content-Length header but includes a partial success object", func(t *testing.T) { - response := ptraceotlp.NewExportResponse() + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.Error(t, err) +} + +func TestPartialSuccess_logs(t *testing.T) { + srv, err := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + response := plogotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() + partial.SetRejectedLogRecords(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) require.NoError(t, err) - resp := &http.Response{ - // `-1` indicates a missing Content-Length header in the Go http standard library - ContentLength: -1, - Body: io.NopCloser(bytes.NewReader(data)), - } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.True(t, consumererror.IsPermanent(err)) }) + require.NoError(t, err) + defer srv.Close() - t.Run("Response is missing a Content-Length header and a body", func(t *testing.T) { - resp := &http.Response{ - // `-1` indicates a missing Content-Length header in the Go http standard library - ContentLength: -1, - Body: io.NopCloser(bytes.NewReader([]byte{})), - } - err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.Nil(t, err) - }) + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) - t.Run("Reading the response body returns an error other than ErrUnexpectedEOF", func(t *testing.T) { - resp := &http.Response{ - // `-1` indicates a missing Content-Length header in the Go http standard library - ContentLength: -1, - Body: io.NopCloser(badReader{}), - } - err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.Error(t, err) + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) }) - t.Run("Response has short Content-Length header", func(t *testing.T) { - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() - require.NoError(t, err) - resp := &http.Response{ - ContentLength: 3, - Body: io.NopCloser(bytes.NewReader(data)), - } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.Error(t, err) - }) + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.Error(t, err) +} - t.Run("Response has long Content-Length header", func(t *testing.T) { - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() - require.NoError(t, err) - resp := &http.Response{ - ContentLength: 4096, - Body: io.NopCloser(bytes.NewReader(data)), - } - err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) - assert.Error(t, err) - }) +func TestPartialSuccessInvalidResponseBody(t *testing.T) { + resp := &http.Response{ + Body: io.NopCloser(badReader{}), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} +func TestPartialSuccessInvalidBody(t *testing.T) { invalidBodyCases := []struct { telemetryType string handler partialSuccessHandler @@ -854,25 +798,15 @@ func TestPartialSuccess(t *testing.T) { assert.Error(t, err) }) } - } -func createBackend(addr string, endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) (*http.Server, error) { +func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) (*httptest.Server, error) { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) - if err != nil { - return &http.Server{}, err - } - go func() { - _ = srv.Serve(ln) - }() - return &srv, nil + srv := httptest.NewServer(mux) + + return srv, nil } type badReader struct{} From c8a656080fff45ca24a1c7fcb05a1b1206e94b47 Mon Sep 17 00:00:00 2001 From: Evan Bradley Date: Tue, 28 Mar 2023 15:30:37 -0400 Subject: [PATCH 3/5] Fix up tests --- exporter/otlphttpexporter/otlp_test.go | 71 +++++++++++++++----------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 3dab1bde58d..8850bbb0535 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -14,6 +14,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -446,9 +447,11 @@ func TestErrorResponses(t *testing.T) { name: "502", responseStatus: http.StatusBadGateway, responseBody: status.New(codes.InvalidArgument, "Bad gateway"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"502, Message=Bad gateway, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "503", @@ -475,15 +478,27 @@ func TestErrorResponses(t *testing.T) { name: "504", responseStatus: http.StatusGatewayTimeout, responseBody: status.New(codes.InvalidArgument, "Gateway timeout"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"504, Message=Gateway timeout, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]"), + time.Duration(0)*time.Second) + }, + }, + { + name: "Bad response payload", + responseStatus: http.StatusServiceUnavailable, + responseBody: status.New(codes.InvalidArgument, strings.Repeat("a", maxHTTPResponseReadBytes+1)), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503"), + time.Duration(0)*time.Second) + }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { for k, v := range test.headers { writer.Header().Add(k, v) } @@ -495,7 +510,6 @@ func TestErrorResponses(t *testing.T) { require.NoError(t, err) } }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -527,6 +541,15 @@ func TestErrorResponses(t *testing.T) { } } +func TestErrorResponseInvalidResponseBody(t *testing.T) { + resp := &http.Response{ + StatusCode: 400, + Body: io.NopCloser(badReader{}), + } + status := readResponseStatus(resp) + assert.Nil(t, status) +} + func TestUserAgent(t *testing.T) { set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" @@ -556,11 +579,10 @@ func TestUserAgent(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -590,11 +612,10 @@ func TestUserAgent(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -624,11 +645,10 @@ func TestUserAgent(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - srv, err := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -659,7 +679,7 @@ func TestUserAgent(t *testing.T) { } func TestPartialSuccess_traces(t *testing.T) { - srv, err := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { response := ptraceotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -669,7 +689,6 @@ func TestPartialSuccess_traces(t *testing.T) { _, err = writer.Write(bytes) require.NoError(t, err) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -693,7 +712,7 @@ func TestPartialSuccess_traces(t *testing.T) { } func TestPartialSuccess_metrics(t *testing.T) { - srv, err := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { response := pmetricotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -703,7 +722,6 @@ func TestPartialSuccess_metrics(t *testing.T) { _, err = writer.Write(bytes) require.NoError(t, err) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -727,7 +745,7 @@ func TestPartialSuccess_metrics(t *testing.T) { } func TestPartialSuccess_logs(t *testing.T) { - srv, err := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { response := plogotlp.NewExportResponse() partial := response.PartialSuccess() partial.SetErrorMessage("hello") @@ -737,7 +755,6 @@ func TestPartialSuccess_logs(t *testing.T) { _, err = writer.Write(bytes) require.NoError(t, err) }) - require.NoError(t, err) defer srv.Close() cfg := &Config{ @@ -787,26 +804,20 @@ func TestPartialSuccessInvalidBody(t *testing.T) { }, } for _, tt := range invalidBodyCases { - t.Run("Invalid response body: "+tt.telemetryType, func(t *testing.T) { - str := "invalid proto" - body := bytes.NewBufferString(str) - resp := &http.Response{ - ContentLength: int64(len(str)), - Body: io.NopCloser(body), - } - err := handlePartialSuccessResponse(resp, tt.handler) + t.Run("Invalid response body_"+tt.telemetryType, func(t *testing.T) { + err := tt.handler([]byte{1}) assert.Error(t, err) }) } } -func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) (*httptest.Server, error) { +func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) srv := httptest.NewServer(mux) - return srv, nil + return srv } type badReader struct{} From 978f348227d6de19ef9eb914cc20828ceca8a391 Mon Sep 17 00:00:00 2001 From: Evan Bradley Date: Mon, 12 Jun 2023 11:01:02 -0400 Subject: [PATCH 4/5] Restore Content-Length header checks --- exporter/otlphttpexporter/otlp.go | 35 +++++++----- exporter/otlphttpexporter/otlp_test.go | 74 ++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index b6041d419eb..a5976ad57c1 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -192,27 +192,36 @@ func isRetryableStatusCode(code int) bool { } } -func readResponseBody(body io.ReadCloser) ([]byte, error) { - // Read the maximum number of bytes allowed in a request. This avoids - // issues with missing or invalid Content-Length headers. - protoBytes := make([]byte, maxHTTPResponseReadBytes) - n, err := io.ReadFull(body, protoBytes) +func readResponseBody(resp *http.Response) ([]byte, error) { + if resp.ContentLength == 0 { + return nil, nil + } + + maxRead := resp.ContentLength + + // if maxRead == -1, the ContentLength header has not been sent, so read up to + // the maximum permitted body size. If it is larger than the permitted body + // size, still try to read from the body in case the value is an error. If the + // body is larger than the maximum size, proto unmarshaling will likely fail. + if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { + maxRead = maxHTTPResponseReadBytes + } + protoBytes := make([]byte, maxRead) + n, err := io.ReadFull(resp.Body, protoBytes) // No bytes read and an EOF error indicates there is no body to read. if n == 0 && (err == nil || errors.Is(err, io.EOF)) { return nil, nil } - // io.ReadFull will return io.ErrorUnexpectedEOF in most cases since there - // will usually be a mismatch between the length of the byte slice and the - // size of the body, so we ignore that error. + // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header + // wasn't set, since we will try to read past the length of the body. If this + // is the case, the body will still have the full message in it, so we want to + // ignore the error and parse the message. if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { return nil, err } - // The pdata unmarshaling methods check for the length of the slice - // when unmarshaling it, so we have to trim down the length to the - // actual size of the data. return protoBytes[:n], nil } @@ -224,7 +233,7 @@ func readResponseStatus(resp *http.Response) *status.Status { // Request failed. Read the body. OTLP spec says: // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." - respBytes, err := readResponseBody(resp.Body) + respBytes, err := readResponseBody(resp) if err != nil { return nil @@ -242,7 +251,7 @@ func readResponseStatus(resp *http.Response) *status.Status { } func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { - bodyBytes, err := readResponseBody(resp.Body) + bodyBytes, err := readResponseBody(resp) if err != nil { return err diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 8850bbb0535..64d9223e3f5 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -543,8 +543,9 @@ func TestErrorResponses(t *testing.T) { func TestErrorResponseInvalidResponseBody(t *testing.T) { resp := &http.Response{ - StatusCode: 400, - Body: io.NopCloser(badReader{}), + StatusCode: 400, + Body: io.NopCloser(badReader{}), + ContentLength: 100, } status := readResponseStatus(resp) assert.Nil(t, status) @@ -777,9 +778,76 @@ func TestPartialSuccess_logs(t *testing.T) { require.Error(t, err) } +func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.True(t, consumererror.IsPermanent(err)) +} + +func TestPartialResponse_missingHeaderAndBody(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte{})), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Nil(t, err) +} + +func TestPartialResponse_nonErrUnexpectedEOFError(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(badReader{}), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccess_longContentLengthHeader(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 4096, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + func TestPartialSuccessInvalidResponseBody(t *testing.T) { resp := &http.Response{ - Body: io.NopCloser(badReader{}), + Body: io.NopCloser(badReader{}), + ContentLength: 100, } err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.Error(t, err) From 58e860579ef183a64efcf1bf89f7acca43079931 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Mon, 26 Jun 2023 21:41:16 -0700 Subject: [PATCH 5/5] Update exporter/otlphttpexporter/otlp.go --- exporter/otlphttpexporter/otlp.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index a5976ad57c1..9e2e92a34d5 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -133,12 +133,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p }() if resp.StatusCode >= 200 && resp.StatusCode <= 299 { - if err := handlePartialSuccessResponse(resp, partialSuccessHandler); err != nil { - return err - } - - // Request is successful. - return nil + return handlePartialSuccessResponse(resp, partialSuccessHandler) } respStatus := readResponseStatus(resp)