Skip to content

Commit

Permalink
[exporter/datadog] Mark unrecoverable errors as permanent (#17386)
Browse files Browse the repository at this point in the history
* [exporter/datadog] Mark unrecoverable errors as permanent

* Make DoWithRetries return retryNum
  • Loading branch information
songy23 authored Jan 11, 2023
1 parent 481e524 commit c3bd1e6
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 33 deletions.
4 changes: 2 additions & 2 deletions exporter/datadogexporter/internal/clientutil/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CreateAPIClient(buildInfo component.BuildInfo, endpoint string, settings ex
func ValidateAPIKey(ctx context.Context, apiKey string, logger *zap.Logger, apiClient *datadog.APIClient) error {
logger.Info("Validating API key.")
authAPI := datadogV1.NewAuthenticationApi(apiClient)
resp, _, err := authAPI.Validate(GetRequestContext(ctx, apiKey))
resp, httpresp, err := authAPI.Validate(GetRequestContext(ctx, apiKey))
if err == nil && resp.Valid != nil && *resp.Valid {
logger.Info("API key validation successful.")
return nil
Expand All @@ -56,7 +56,7 @@ func ValidateAPIKey(ctx context.Context, apiKey string, logger *zap.Logger, apiC
return nil
}
logger.Warn(ErrInvalidAPI.Error())
return ErrInvalidAPI
return WrapError(ErrInvalidAPI, httpresp)
}

// GetRequestContext creates a new context with API key for DatadogV2 requests
Expand Down
33 changes: 33 additions & 0 deletions exporter/datadogexporter/internal/clientutil/error_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/clientutil"

import (
"net/http"

"go.opentelemetry.io/collector/consumer/consumererror"
)

// WrapError wraps an error to a permanent consumer error that won't be retried if the http response code is non-retriable.
func WrapError(err error, resp *http.Response) error {
if err == nil || !isNonRetriable(resp) {
return err
}
return consumererror.NewPermanent(err)
}

func isNonRetriable(resp *http.Response) bool {
return resp.StatusCode == 400 || resp.StatusCode == 404 || resp.StatusCode == 413 || resp.StatusCode == 403
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/clientutil"

import (
"fmt"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/consumererror"
)

func TestWrapError(t *testing.T) {
respOK := http.Response{StatusCode: 200}
respRetriable := http.Response{StatusCode: 402}
respNonRetriable := http.Response{StatusCode: 404}
err := fmt.Errorf("Test error")
assert.False(t, consumererror.IsPermanent(WrapError(err, &respOK)))
assert.False(t, consumererror.IsPermanent(WrapError(err, &respRetriable)))
assert.True(t, consumererror.IsPermanent(WrapError(err, &respNonRetriable)))
assert.False(t, consumererror.IsPermanent(WrapError(nil, &respNonRetriable)))
}
12 changes: 6 additions & 6 deletions exporter/datadogexporter/internal/clientutil/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func NewRetrier(logger *zap.Logger, settings exporterhelper.RetrySettings, scrub

// DoWithRetries does a function with retries. This is a condensed version of the code on
// the exporterhelper, which we reuse here since we want custom retry logic.
func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) error) error {
func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) error) (int64, error) {
if !r.cfg.Enabled {
return fn(ctx)
return 0, fn(ctx)
}

// Do not use NewExponentialBackOff since it calls Reset and the code here must
Expand All @@ -64,19 +64,19 @@ func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) er
for {
err := fn(ctx)
if err == nil {
return nil
return retryNum, nil
}

err = r.scrubber.Scrub(err)

if consumererror.IsPermanent(err) {
return err
return retryNum, err
}

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
err = fmt.Errorf("max elapsed time expired %w", err)
return err
return retryNum, err
}

backoffDelayStr := backoffDelay.String()
Expand All @@ -90,7 +90,7 @@ func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) er
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
return retryNum, fmt.Errorf("request is cancelled or timed out %w", err)
case <-time.After(backoffDelay):
}
}
Expand Down
22 changes: 20 additions & 2 deletions exporter/datadogexporter/internal/clientutil/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package clientutil // import "github.com/open-telemetry/opentelemetry-collector-
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
Expand All @@ -32,8 +35,9 @@ func TestDoWithRetries(t *testing.T) {
retrier := NewRetrier(zap.NewNop(), exporterhelper.NewDefaultRetrySettings(), scrubber)
ctx := context.Background()

err := retrier.DoWithRetries(ctx, func(context.Context) error { return nil })
retryNum, err := retrier.DoWithRetries(ctx, func(context.Context) error { return nil })
require.NoError(t, err)
assert.Equal(t, retryNum, int64(0))

retrier = NewRetrier(zap.NewNop(),
exporterhelper.RetrySettings{
Expand All @@ -44,6 +48,20 @@ func TestDoWithRetries(t *testing.T) {
},
scrubber,
)
err = retrier.DoWithRetries(ctx, func(context.Context) error { return errors.New("action failed") })
retryNum, err = retrier.DoWithRetries(ctx, func(context.Context) error { return errors.New("action failed") })
require.Error(t, err)
assert.Greater(t, retryNum, int64(0))
}

func TestNoRetriesOnPermanentError(t *testing.T) {
scrubber := scrub.NewScrubber()
retrier := NewRetrier(zap.NewNop(), exporterhelper.NewDefaultRetrySettings(), scrubber)
ctx := context.Background()
respNonRetriable := http.Response{StatusCode: 404}

retryNum, err := retrier.DoWithRetries(ctx, func(context.Context) error {
return WrapError(fmt.Errorf("test"), &respNonRetriable)
})
require.Error(t, err)
assert.Equal(t, retryNum, int64(0))
}
2 changes: 1 addition & 1 deletion exporter/datadogexporter/internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func pushMetadata(pcfg PusherConfig, params exporter.CreateSettings, metadata *H
func pushMetadataWithRetry(retrier *clientutil.Retrier, params exporter.CreateSettings, pcfg PusherConfig, hostMetadata *HostMetadata) {
params.Logger.Debug("Sending host metadata payload", zap.Any("payload", hostMetadata))

err := retrier.DoWithRetries(context.Background(), func(context.Context) error {
_, err := retrier.DoWithRetries(context.Background(), func(context.Context) error {
return pushMetadata(pcfg, params, hostMetadata)
})

Expand Down
38 changes: 16 additions & 22 deletions exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ func (exp *metricsExporter) pushSketches(ctx context.Context, sl sketches.Sketch
}

if err != nil {
return fmt.Errorf("failed to do sketches HTTP request: %w", err)
return clientutil.WrapError(fmt.Errorf("failed to do sketches HTTP request: %w", err), resp)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
return fmt.Errorf("error when sending payload to %s: %s", sketches.SketchSeriesEndpoint, resp.Status)
return clientutil.WrapError(fmt.Errorf("error when sending payload to %s: %s", sketches.SketchSeriesEndpoint, resp.Status), resp)
}
return nil
}
Expand Down Expand Up @@ -223,14 +223,12 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr
err = nil
if len(ms) > 0 {
exp.params.Logger.Debug("exporting native Datadog payload", zap.Any("metric", ms))
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(context.Context) error {
ctx = clientutil.GetRequestContext(ctx, string(exp.cfg.API.Key))
_, _, merr := exp.metricsAPI.SubmitMetrics(ctx, datadogV2.MetricPayload{Series: ms})
return merr
}),
)
_, experr := exp.retrier.DoWithRetries(ctx, func(context.Context) error {
ctx = clientutil.GetRequestContext(ctx, string(exp.cfg.API.Key))
_, httpresp, merr := exp.metricsAPI.SubmitMetrics(ctx, datadogV2.MetricPayload{Series: ms})
return clientutil.WrapError(merr, httpresp)
})
err = multierr.Append(err, experr)
}
} else {
var ms []zorkian.Metric
Expand All @@ -240,23 +238,19 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr
err = nil
if len(ms) > 0 {
exp.params.Logger.Debug("exporting Zorkian Datadog payload", zap.Any("metric", ms))
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(context.Context) error {
return exp.client.PostMetrics(ms)
}),
)
_, experr := exp.retrier.DoWithRetries(ctx, func(context.Context) error {
return exp.client.PostMetrics(ms)
})
err = multierr.Append(err, experr)
}
}

if len(sl) > 0 {
exp.params.Logger.Debug("exporting sketches payload", zap.Any("sketches", sl))
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error {
return exp.pushSketches(ctx, sl)
}),
)
_, experr := exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error {
return exp.pushSketches(ctx, sl)
})
err = multierr.Append(err, experr)
}

if len(sp) > 0 {
Expand Down

0 comments on commit c3bd1e6

Please sign in to comment.