Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] Fix: Don't drop batch in case of failure to translate metrics #29729

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ac29886
Fix: Don't return error for metric translation in the prw exporter
rapphil Dec 11, 2023
883bd39
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Dec 12, 2023
f2a846b
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 11, 2024
baa04c6
Feat: Add telemetry for metric translation in the PRWE
rapphil Jan 12, 2024
691a7ec
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 12, 2024
f897551
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 12, 2024
b3a6ce6
Fix and simplify log line in case of failure to translated
rapphil Jan 12, 2024
a128f16
Simplify tests
rapphil Jan 12, 2024
84af853
Fix: fix based on code review
rapphil Jan 12, 2024
bffdd8c
Chore: Update metric name
rapphil Jan 12, 2024
faf0fdd
Fix: Fix go.mod
rapphil Jan 12, 2024
8bf5987
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 12, 2024
f60d68e
Chore: fix go.mod
rapphil Jan 12, 2024
5e953f5
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Jan 30, 2024
0fad48b
Fix go.mod
rapphil Jan 30, 2024
bfdb63a
Merge branch 'main' into rapphil-prw-fix-metric-translation
rapphil Jan 30, 2024
c178c3b
Merge remote-tracking branch 'origin/main' into rapphil-prw-fix-metri…
rapphil Mar 1, 2024
2bc92ca
Fix merge conflicts
rapphil Mar 1, 2024
9e7ac06
Merge branch 'main' into rapphil-prw-fix-metric-translation
bryan-aguilar Mar 1, 2024
315996a
Merge branch 'main' into rapphil-prw-fix-metric-translation
bryan-aguilar Mar 13, 2024
e8ea846
make tidy
bryan-aguilar Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/prw_failure_translate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29729]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
56 changes: 53 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,33 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata"
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
)

type prwTelemetry interface {
recordTranslationFailure(ctx context.Context)
recordTranslatedTimeSeries(ctx context.Context, numTS int)
}

type prwTelemetryOtel struct {
failedTranslations metric.Int64Counter
translatedTimeSeries metric.Int64Counter
}

func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) {
p.failedTranslations.Add(ctx, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these include the component ID as an attribute? It can be important to identify which component is failing when multiple exporters of the same type are configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. done!

}

func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) {
p.translatedTimeSeries.Add(ctx, int64(numTS))
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand All @@ -45,10 +66,33 @@ type prwExporter struct {
retrySettings configretry.BackOffConfig
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
}

func newPRWTelemetry(set exporter.CreateSettings) prwTelemetry {

meter := metadata.Meter(set.TelemetrySettings)
// TODO: create helper functions similar to the processor helper: BuildCustomMetricName
prefix := "exporter/" + metadata.Type + "/"

failedTranslations, _ := meter.Int64Counter(prefix+"failed_translations",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't ignore instrument creation errors. These should be propagated up the call stack if they can't be handled here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why I thought it was ok to do this. fixed.

metric.WithDescription("Number of translation operations that failed to translate metrics from OTEL to Prometheus"),
metric.WithUnit("1"),
)

translatedTimeSeries, _ := meter.Int64Counter(prefix+"translated_metrics",
metric.WithDescription("Number of Prometheus time series that were translated from OTEL metrics"),
metric.WithUnit("1"),
)
bryan-aguilar marked this conversation as resolved.
Show resolved Hide resolved

return &prwTelemetryOtel{
failedTranslations: failedTranslations,
translatedTimeSeries: translatedTimeSeries,
}
}

// newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, error) {
func newPRWExporter(cfg *Config, set exporter.CreateSettings, telemetry prwTelemetry) (*prwExporter, error) {
sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -79,6 +123,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
telemetry: telemetry,
}
if cfg.WAL == nil {
return prwe, nil
Expand Down Expand Up @@ -134,15 +179,20 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics %s", zap.Error(err))
prwe.settings.Logger.Debug("exporting remaining %s metrics", zap.Int("translated", len(tsMap)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prwe.settings.Logger.Debug("failed to translate metrics %s", zap.Error(err))
prwe.settings.Logger.Debug("exporting remaining %s metrics", zap.Int("translated", len(tsMap)))
prwe.settings.Logger.Debug("failed to translate metrics, zap.Error(err))
prwe.settings.Logger.Debug("exporting remaining metrics", zap.Int("count", len(tsMap)))

The %s is for formatted strings. Zap uses structured logging instead.

I also wondering if we want to adjust these log lines to be more clear. I think they may confuse users. I think the message intent is

  1. Some metrics failed to translate. This was not a catastrophic failure.
  2. N metrics were successfully translated. The PRWE will attempt to export these.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prwe.settings.Logger.Debug("failed to translate metrics %s", zap.Error(err))
prwe.settings.Logger.Debug("exporting remaining %s metrics", zap.Int("translated", len(tsMap)))
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))

If these are to be logged at the same level might as well make it a single logging statement. Is the number of metrics that failed translation available to record?

Copy link
Contributor Author

@rapphil rapphil Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your idea of using a single line.

Is the number of metrics that failed translation available to record?

No, this is hidden in the FromMetrics function implementation. In theory we could use num otel metrics - len(tsMap) but the FromMetrics also include other time series, such as the target_info. This behaviour vary based on the flags passed to the function .... 😒

We could also look into the number of errors embedded in the error returned in the FromMetrics function, but this would also rely on assumptions about the FromMetrics in my opinion.

}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand Down
115 changes: 77 additions & 38 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_NewPRWExporter(t *testing.T) {
cfg.ExternalLabels = tt.externalLabels
cfg.Namespace = tt.namespace
cfg.RemoteWriteQueue.NumConsumers = 1
prwe, err := newPRWExporter(cfg, tt.set)
prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(set))

if tt.returnErrorOnCreate {
assert.Error(t, err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func Test_Start(t *testing.T) {
cfg.RemoteWriteQueue.NumConsumers = 1
cfg.HTTPClientSettings = tt.clientSettings

prwe, err := newPRWExporter(cfg, tt.set)
prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(tt.set))
assert.NoError(t, err)
assert.NotNil(t, prwe)

Expand Down Expand Up @@ -360,7 +360,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
set := exportertest.NewNopCreateSettings()
set.BuildInfo = buildInfo
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := newPRWExporter(cfg, set)
prwe, err := newPRWExporter(cfg, set, newPRWTelemetry(set))
if err != nil {
return err
}
Expand All @@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return prwe.handleExport(context.Background(), testmap, nil)
}

type mockPRWTelemetry struct {
failedTranslations int
translatedTimeSeries int
}

func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) {
m.failedTranslations++
}

func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) {
m.translatedTimeSeries += numTs
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// expected
func Test_PushMetrics(t *testing.T) {
Expand Down Expand Up @@ -420,6 +433,11 @@ func Test_PushMetrics(t *testing.T) {

emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])

// partial success (or partial failure) cases

partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum],
validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge])

// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])
Expand Down Expand Up @@ -457,20 +475,23 @@ func Test_PushMetrics(t *testing.T) {
}

tests := []struct {
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
hasFailedTranslation bool
rapphil marked this conversation as resolved.
Show resolved Hide resolved
}{
{
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
reqTestFunc: checkFunc,
expectedTimeSeries: 1, // the resource target metric.
hasFailedTranslation: true,
},
{
name: "intSum_case",
Expand Down Expand Up @@ -567,32 +588,40 @@ func Test_PushMetrics(t *testing.T) {
skipForWAL: true,
},
{
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
hasFailedTranslation: true,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
hasFailedTranslation: true,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
hasFailedTranslation: true,
},
{
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
hasFailedTranslation: true,
},
{
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "partialSuccess_case",
metrics: partialSuccess1,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedTimeSeries: 4,
hasFailedTranslation: true,
},
{
name: "staleNaNIntGauge_case",
Expand Down Expand Up @@ -668,6 +697,7 @@ func Test_PushMetrics(t *testing.T) {
}
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockTelemetry := &mockPRWTelemetry{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tt.reqTestFunc != nil {
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
Expand Down Expand Up @@ -716,7 +746,9 @@ func Test_PushMetrics(t *testing.T) {
}
set := exportertest.NewNopCreateSettings()
set.BuildInfo = buildInfo
prwe, nErr := newPRWExporter(cfg, set)

prwe, nErr := newPRWExporter(cfg, set, mockTelemetry)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -729,6 +761,13 @@ func Test_PushMetrics(t *testing.T) {
assert.Error(t, err)
return
}

if tt.hasFailedTranslation {
assert.Equal(t, 1, mockTelemetry.failedTranslations)
} else {
assert.Equal(t, 0, mockTelemetry.failedTranslations)
}
rapphil marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries)
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -894,7 +933,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
Version: "1.0",
}

prwe, perr := newPRWExporter(cfg, set)
prwe, perr := newPRWExporter(cfg, set, newPRWTelemetry(set))
assert.NoError(t, perr)

nopHost := componenttest.NewNopHost()
Expand Down Expand Up @@ -972,7 +1011,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
// 4. Finally, ensure that the bytes that were uploaded to the
// Prometheus Remote Write endpoint are exactly as were saved in the WAL.
// Read from that same WAL, export to the RWExporter server.
prwe2, err := newPRWExporter(cfg, set)
prwe2, err := newPRWExporter(cfg, set, newPRWTelemetry(set))
assert.NoError(t, err)
require.NoError(t, prwe2.Start(ctx, nopHost))
t.Cleanup(func() {
Expand Down
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings,
return nil, errors.New("invalid configuration")
}

prwe, err := newPRWExporter(prwCfg, set)
telemetry := newPRWTelemetry(set)
prwe, err := newPRWExporter(prwCfg, set, telemetry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done inside newPRWExporter()? It's already receiving the settings object newPRWTelemetry() needs and the telemetry object isn't used outside of the new exporter function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done because of the tests. I think it is ok to create a real telemetry and then switch for a mock one. I updated the code.

if err != nil {
return nil, err
}
Expand Down