Skip to content

Commit

Permalink
Add correct handling of server error responses in OTLP/HTTP exporter (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan authored Oct 27, 2020
1 parent 22fdd41 commit 14618bd
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 10 deletions.
93 changes: 86 additions & 7 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type exporterImp struct {
Expand All @@ -36,6 +44,11 @@ type exporterImp struct {
logsURL string
}

const (
headerRetryAfter = "Retry-After"
maxHTTPResponseReadBytes = 64 * 1024
)

// Crete new exporter.
func newExporter(cfg configmodels.Exporter) (*exporterImp, error) {
oCfg := cfg.(*Config)
Expand Down Expand Up @@ -109,15 +122,81 @@ func (e *exporterImp) export(ctx context.Context, url string, request []byte) er

resp, err := e.client.Do(req)
if err != nil {
return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
return fmt.Errorf("failed to make an HTTP request: %w", err)
}

_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
// TODO: Parse status and decide if can or not retry.
// TODO: Parse status and decide throttling.
return fmt.Errorf("failed the request with status code %d", resp.StatusCode)
defer func() {
// Discard any remaining response body when we are done reading.
io.CopyN(ioutil.Discard, resp.Body, maxHTTPResponseReadBytes)
resp.Body.Close()
}()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// Request is successful.
return nil
}

respStatus := readResponse(resp)

// Format the error message. Use the status if it is present in the response.
var formattedErr error
if respStatus != nil {
formattedErr = fmt.Errorf(
"error exporting items, server responded with HTTP Status Code %d, Message=%s, Details=%v",
resp.StatusCode, respStatus.Message, respStatus.Details)
} else {
formattedErr = fmt.Errorf(
"error exporting items, server responded with HTTP Status Code %d",
resp.StatusCode)
}

// Check if the server is overwhelmed.
// See spec https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/protocol/otlp.md#throttling-1
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
// Fallback to 0 if the Retry-After header is not present. This will trigger the
// default backoff policy by our caller (retry handler).
retryAfter := 0
if val := resp.Header.Get(headerRetryAfter); val != "" {
if seconds, err2 := strconv.Atoi(val); err2 == nil {
retryAfter = seconds
}
}
// Indicate to our caller to pause for the specified number of seconds.
return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(retryAfter)*time.Second)
}

if resp.StatusCode == http.StatusBadRequest {
// Report the failure as permanent if the server thinks the request is malformed.
return consumererror.Permanent(formattedErr)
}

// All other errors are retryable, so don't wrap them in consumererror.Permanent().
return formattedErr
}

// Read the response and decode the status.Status from the body.
// Returns nil if the response is empty or cannot be decoded.
func readResponse(resp *http.Response) *status.Status {
var respStatus *status.Status
if resp.StatusCode >= 400 && resp.StatusCode <= 599 {
// Request failed. Read the body. OTLP spec says:
// "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a
// Protobuf-encoded Status message that describes the problem."
maxRead := resp.ContentLength
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
}
respBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, respBytes)
if err == nil && n > 0 {
// Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/protocol/otlp.md#failures
respStatus = &status.Status{}
err = proto.Unmarshal(respBytes, respStatus)
if err != nil {
respStatus = nil
}
}
}

return nil
return respStatus
}
106 changes: 103 additions & 3 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/data/testdata"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/testutil"
)

// TODO: add tests for retryable/permanent errors logic.
// TODO: add tests for throttling logic.

func TestInvalidConfig(t *testing.T) {
config := &Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Expand Down Expand Up @@ -312,3 +317,98 @@ func startAndCleanup(t *testing.T, cmp component.Component) {
require.NoError(t, cmp.Shutdown(context.Background()))
})
}

func TestErrorResponses(t *testing.T) {
tests := []struct {
name string
responseStatus int
responseBody *status.Status
err error
isPermErr bool
headers map[string]string
}{
{
name: "400",
responseStatus: http.StatusBadRequest,
responseBody: status.New(codes.InvalidArgument, "Bad field"),
isPermErr: true,
},
{
name: "404",
responseStatus: http.StatusNotFound,
err: fmt.Errorf("error exporting items, server responded with HTTP Status Code 404"),
},
{
name: "419",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
err: exporterhelper.NewThrottleRetry(
fmt.Errorf("error exporting items, server responded with HTTP Status Code 429, Message=Quota exceeded, Details=[]"),
time.Duration(0)*time.Second),
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: exporterhelper.NewThrottleRetry(
fmt.Errorf("error exporting items, server responded with HTTP Status Code 503, Message=Server overloaded, Details=[]"),
time.Duration(0)*time.Second),
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
err: exporterhelper.NewThrottleRetry(
fmt.Errorf("error exporting items, server responded with HTTP Status Code 503, Message=Server overloaded, Details=[]"),
time.Duration(30)*time.Second),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
mux := http.NewServeMux()
mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) {
for k, v := range test.headers {
writer.Header().Add(k, v)
}
writer.WriteHeader(test.responseStatus)
if test.responseBody != nil {
msg, err := proto.Marshal(test.responseBody.Proto())
require.NoError(t, err)
writer.Write(msg)
}
})
srv := http.Server{
Addr: addr,
Handler: mux,
}
ln, err := net.Listen("tcp", addr)
require.NoError(t, err)
go func() {
_ = srv.Serve(ln)
}()

cfg := &Config{
TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr),
// Create without QueueSettings and RetrySettings so that ConsumeTraces
// returns the errors that we want to check immediately.
}
exp, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)

traces := pdata.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
assert.Error(t, err)

if test.isPermErr {
assert.True(t, consumererror.IsPermanent(err))
} else {
assert.EqualValues(t, test.err, err)
}

srv.Close()
})
}
}

0 comments on commit 14618bd

Please sign in to comment.