Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] Fix: Don't drop batch in case of failure to translate metrics #29729

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ac29886
Fix: Don't return error for metric translation in the prw exporter
rapphil Dec 11, 2023
883bd39
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Dec 12, 2023
f2a846b
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 11, 2024
baa04c6
Feat: Add telemetry for metric translation in the PRWE
rapphil Jan 12, 2024
691a7ec
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 12, 2024
f897551
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 12, 2024
b3a6ce6
Fix and simplify log line in case of failure to translated
rapphil Jan 12, 2024
a128f16
Simplify tests
rapphil Jan 12, 2024
84af853
Fix: fix based on code review
rapphil Jan 12, 2024
bffdd8c
Chore: Update metric name
rapphil Jan 12, 2024
faf0fdd
Fix: Fix go.mod
rapphil Jan 12, 2024
8bf5987
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 12, 2024
f60d68e
Chore: fix go.mod
rapphil Jan 12, 2024
5e953f5
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 30, 2024
0fad48b
Fix go.mod
rapphil Jan 30, 2024
bfdb63a
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 30, 2024
c178c3b
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Mar 1, 2024
2bc92ca
Fix merge conflicts
rapphil Mar 1, 2024
9e7ac06
Merge branch 'main' into rapphil-prw-fix-metric-translation
bryan-aguilar Mar 1, 2024
315996a
Merge branch 'main' into rapphil-prw-fix-metric-translation
bryan-aguilar Mar 13, 2024
e8ea846
make tidy
bryan-aguilar Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/prw_failure_translate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29729]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
62 changes: 60 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,35 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata"
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
)

type prwTelemetry interface {
recordTranslationFailure(ctx context.Context)
recordTranslatedTimeSeries(ctx context.Context, numTS int)
}

type prwTelemetryOtel struct {
failedTranslations metric.Int64Counter
translatedTimeSeries metric.Int64Counter
otelAttrs []attribute.KeyValue
}

func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) {
p.failedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...))
}

func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) {
p.translatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand All @@ -45,6 +68,31 @@ type prwExporter struct {
retrySettings configretry.BackOffConfig
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
}

func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) {

meter := metadata.Meter(set.TelemetrySettings)
// TODO: create helper functions similar to the processor helper: BuildCustomMetricName
prefix := "exporter/" + metadata.Type.String() + "/"
failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations",
metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"),
metric.WithUnit("1"),
)

translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_time_series",
metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"),
metric.WithUnit("1"),
)

return &prwTelemetryOtel{
failedTranslations: failedTranslations,
translatedTimeSeries: translatedTimeSeries,
otelAttrs: []attribute.KeyValue{
attribute.String("exporter", set.ID.String()),
},
}, errors.Join(errFailedTranslation, errTranslatedMetrics)
}

// newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
Expand All @@ -59,6 +107,11 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
return nil, errors.New("invalid endpoint")
}

prwTelemetry, err := newPRWTelemetry(set)
if err != nil {
return nil, err
}

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

prwe := &prwExporter{
Expand All @@ -79,6 +132,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
}

prwe.wal = newWAL(cfg.WAL, prwe.export)
Expand Down Expand Up @@ -128,15 +182,19 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand Down
100 changes: 68 additions & 32 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return prwe.handleExport(context.Background(), testmap, nil)
}

type mockPRWTelemetry struct {
failedTranslations int
translatedTimeSeries int
}

func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) {
m.failedTranslations++
}

func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) {
m.translatedTimeSeries += numTs
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// expected
func Test_PushMetrics(t *testing.T) {
Expand Down Expand Up @@ -420,6 +433,11 @@ func Test_PushMetrics(t *testing.T) {

emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])

// partial success (or partial failure) cases

partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum],
validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge])

// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])
Expand Down Expand Up @@ -457,20 +475,23 @@ func Test_PushMetrics(t *testing.T) {
}

tests := []struct {
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
expectedFailedTranslations int
}{
{
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
reqTestFunc: checkFunc,
expectedTimeSeries: 1, // the resource target metric.
expectedFailedTranslations: 1,
},
{
name: "intSum_case",
Expand Down Expand Up @@ -567,32 +588,40 @@ func Test_PushMetrics(t *testing.T) {
skipForWAL: true,
},
{
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "partialSuccess_case",
metrics: partialSuccess1,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedTimeSeries: 4,
expectedFailedTranslations: 1,
},
{
name: "staleNaNIntGauge_case",
Expand Down Expand Up @@ -668,6 +697,7 @@ func Test_PushMetrics(t *testing.T) {
}
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockTelemetry := &mockPRWTelemetry{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tt.reqTestFunc != nil {
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
Expand Down Expand Up @@ -716,7 +746,10 @@ func Test_PushMetrics(t *testing.T) {
}
set := exportertest.NewNopCreateSettings()
set.BuildInfo = buildInfo

prwe, nErr := newPRWExporter(cfg, set)
prwe.telemetry = mockTelemetry

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -729,6 +762,9 @@ func Test_PushMetrics(t *testing.T) {
assert.Error(t, err)
return
}

assert.Equal(t, tt.expectedFailedTranslations, mockTelemetry.failedTranslations)
assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries)
assert.NoError(t, err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/pdata v1.3.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.3.0
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
go.opentelemetry.io/collector/receiver v0.96.1-0.20240306115632-b2693620eff6 // indirect
go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
Expand Down
Loading