diff --git a/.chloggen/prometheusreceiver-append-native-histogram.yaml b/.chloggen/prometheusreceiver-append-native-histogram.yaml new file mode 100644 index 000000000000..34fb9ea34dec --- /dev/null +++ b/.chloggen/prometheusreceiver-append-native-histogram.yaml @@ -0,0 +1,36 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allows receiving prometheus native histograms + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26555] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Native histograms are compatible with OTEL exponential histograms. + - The feature can be enabled via the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`. + Run the collector with the command line option `--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms`. + - Currently the feature also requires that targets are scraped via the ProtoBuf format. + To start scraping native histograms, set + `config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]` in the + receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats. + - For more up to date information see the README.md file of the receiver at + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/prometheusreceiver/README.md#prometheus-native-histograms. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/prometheusreceiver/README.md b/receiver/prometheusreceiver/README.md index 7eb086fb4c9e..9b2f1f844eab 100644 --- a/receiver/prometheusreceiver/README.md +++ b/receiver/prometheusreceiver/README.md @@ -67,6 +67,12 @@ prometheus --config.file=prom.yaml "--feature-gates=receiver.prometheusreceiver.UseCreatedMetric" ``` +- `receiver.prometheusreceiver.EnableNativeHistograms`: process and turn native histogram metrics into OpenTelemetry exponential histograms. For more details consult the [Prometheus native histograms](#prometheus-native-histograms) section. + +```shell +"--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms" +``` + - `report_extra_scrape_metrics`: Extra Prometheus scrape metrics can be reported by setting this parameter to `true` You can copy and paste that same configuration under: @@ -123,7 +129,22 @@ receivers: - targets: ['0.0.0.0:8888'] ``` -## OpenTelemetry Operator +## Prometheus native histograms + +Native histograms are an experimental [feature](https://prometheus.io/docs/prometheus/latest/feature_flags/#native-histograms) of Prometheus. + +To start scraping native histograms, set `config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]` +in the receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats. + +To enable converting native histograms to OpenTelemetry exponential histograms, enable the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`. +The feature is considered experimental. + +This feature applies to the most common integer counter histograms, gauge histograms are dropped. +In case a metric has both the conventional (aka classic) buckets and also native histogram buckets, only the native histogram buckets will be +taken into account to create the corresponding exponential histogram. To scrape the classic buckets instead use the +[scrape option](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) `scrape_classic_histograms`. + +## OpenTelemetry Operator Additional to this static job definitions this receiver allows to query a list of jobs from the OpenTelemetryOperators TargetAllocator or a compatible endpoint. diff --git a/receiver/prometheusreceiver/factory.go b/receiver/prometheusreceiver/factory.go index 91eba1919a9b..e3717e15aa3c 100644 --- a/receiver/prometheusreceiver/factory.go +++ b/receiver/prometheusreceiver/factory.go @@ -25,6 +25,14 @@ var useCreatedMetricGate = featuregate.GlobalRegistry().MustRegister( " retrieve the start time for Summary, Histogram and Sum metrics from _created metric"), ) +var enableNativeHistogramsGate = featuregate.GlobalRegistry().MustRegister( + "receiver.prometheusreceiver.EnableNativeHistograms", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, the Prometheus receiver will convert"+ + " Prometheus native histograms to OTEL exponential histograms and ignore"+ + " those Prometheus classic histograms that have a native histogram alternative"), +) + // NewFactory creates a new Prometheus receiver factory. func NewFactory() receiver.Factory { return receiver.NewFactory( diff --git a/receiver/prometheusreceiver/internal/appendable.go b/receiver/prometheusreceiver/internal/appendable.go index 33acfa4608cc..2be6e408a8de 100644 --- a/receiver/prometheusreceiver/internal/appendable.go +++ b/receiver/prometheusreceiver/internal/appendable.go @@ -17,12 +17,13 @@ import ( // appendable translates Prometheus scraping diffs into OpenTelemetry format. type appendable struct { - sink consumer.Metrics - metricAdjuster MetricsAdjuster - useStartTimeMetric bool - trimSuffixes bool - startTimeMetricRegex *regexp.Regexp - externalLabels labels.Labels + sink consumer.Metrics + metricAdjuster MetricsAdjuster + useStartTimeMetric bool + enableNativeHistograms bool + trimSuffixes bool + startTimeMetricRegex *regexp.Regexp + externalLabels labels.Labels settings receiver.CreateSettings obsrecv *receiverhelper.ObsReport @@ -36,6 +37,7 @@ func NewAppendable( useStartTimeMetric bool, startTimeMetricRegex *regexp.Regexp, useCreatedMetric bool, + enableNativeHistograms bool, externalLabels labels.Labels, trimSuffixes bool) (storage.Appendable, error) { var metricAdjuster MetricsAdjuster @@ -51,17 +53,18 @@ func NewAppendable( } return &appendable{ - sink: sink, - settings: set, - metricAdjuster: metricAdjuster, - useStartTimeMetric: useStartTimeMetric, - startTimeMetricRegex: startTimeMetricRegex, - externalLabels: externalLabels, - obsrecv: obsrecv, - trimSuffixes: trimSuffixes, + sink: sink, + settings: set, + metricAdjuster: metricAdjuster, + useStartTimeMetric: useStartTimeMetric, + enableNativeHistograms: enableNativeHistograms, + startTimeMetricRegex: startTimeMetricRegex, + externalLabels: externalLabels, + obsrecv: obsrecv, + trimSuffixes: trimSuffixes, }, nil } func (o *appendable) Appender(ctx context.Context) storage.Appender { - return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes) + return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms) } diff --git a/receiver/prometheusreceiver/internal/metricfamily.go b/receiver/prometheusreceiver/internal/metricfamily.go index 63ff895d4e26..ba87700d0442 100644 --- a/receiver/prometheusreceiver/internal/metricfamily.go +++ b/receiver/prometheusreceiver/internal/metricfamily.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" @@ -49,6 +50,8 @@ type metricGroup struct { hasSum bool created float64 value float64 + hValue *histogram.Histogram + fhValue *histogram.FloatHistogram complexValue []*dataPoint exemplars pmetric.ExemplarSlice } @@ -156,6 +159,118 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) mg.setExemplars(point.Exemplars()) } +// toExponentialHistogramDataPoints is based on +// https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms +func (mg *metricGroup) toExponentialHistogramDataPoints(dest pmetric.ExponentialHistogramDataPointSlice) { + if !mg.hasCount { + return + } + point := dest.AppendEmpty() + point.SetTimestamp(timestampFromMs(mg.ts)) + + // We do not set Min or Max as native histograms don't have that information. + switch { + case mg.fhValue != nil: + fh := mg.fhValue + + if value.IsStaleNaN(fh.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(fh.Schema) + // Input is a float native histogram. This conversion will lose + // precision,but we don't actually expect float histograms in scrape, + // since these are typically the result of operations on integer + // native histograms in the database. + point.SetCount(uint64(fh.Count)) + point.SetSum(fh.Sum) + point.SetZeroThreshold(fh.ZeroThreshold) + point.SetZeroCount(uint64(fh.ZeroCount)) + + if len(fh.PositiveSpans) > 0 { + point.Positive().SetOffset(fh.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.PositiveSpans, fh.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(fh.NegativeSpans) > 0 { + point.Negative().SetOffset(fh.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.NegativeSpans, fh.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + case mg.hValue != nil: + h := mg.hValue + + if value.IsStaleNaN(h.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(h.Schema) + point.SetCount(h.Count) + point.SetSum(h.Sum) + point.SetZeroThreshold(h.ZeroThreshold) + point.SetZeroCount(h.ZeroCount) + + if len(h.PositiveSpans) > 0 { + point.Positive().SetOffset(h.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.PositiveSpans, h.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(h.NegativeSpans) > 0 { + point.Negative().SetOffset(h.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.NegativeSpans, h.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + default: + // This should never happen. + return + } + + tsNanos := timestampFromMs(mg.ts) + if mg.created != 0 { + point.SetStartTimestamp(timestampFromFloat64(mg.created)) + } else { + // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp + point.SetStartTimestamp(tsNanos) + } + point.SetTimestamp(tsNanos) + populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes()) + mg.setExemplars(point.Exemplars()) +} + +func convertDeltaBuckets(spans []histogram.Span, deltas []int64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(deltas)) + bucketIdx := 0 + bucketCount := int64(0) + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + bucketCount += deltas[bucketIdx] + bucketIdx++ + buckets.Append(uint64(bucketCount)) + } + } +} + +func convertAbsoluteBuckets(spans []histogram.Span, counts []float64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(counts)) + bucketIdx := 0 + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + buckets.Append(uint64(counts[bucketIdx])) + bucketIdx++ + } + } +} + func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) { if mg == nil { return @@ -296,13 +411,17 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels } mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary}) } + case pmetric.MetricTypeExponentialHistogram: + if metricName == mf.metadata.Metric+metricSuffixCreated { + mg.created = v + } case pmetric.MetricTypeSum: if metricName == mf.metadata.Metric+metricSuffixCreated { mg.created = v } else { mg.value = v } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: mg.value = v @@ -311,6 +430,37 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels return nil } +func (mf *metricFamily) addExponentialHistogramSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error { + mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t) + if mg.ts != t { + return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName) + } + if mg.mtype != pmetric.MetricTypeExponentialHistogram { + return fmt.Errorf("metric type mismatch for exponential histogram metric %v type %s", metricName, mg.mtype.String()) + } + switch { + case fh != nil: + if mg.hValue != nil { + return fmt.Errorf("exponential histogram %v already has float counts", metricName) + } + mg.count = fh.Count + mg.sum = fh.Sum + mg.hasCount = true + mg.hasSum = true + mg.fhValue = fh + case h != nil: + if mg.fhValue != nil { + return fmt.Errorf("exponential histogram %v already has integer counts", metricName) + } + mg.count = float64(h.Count) + mg.sum = h.Sum + mg.hasCount = true + mg.hasSum = true + mg.hValue = h + } + return nil +} + func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) { metric := pmetric.NewMetric() // Trims type and unit suffixes from metric name @@ -352,7 +502,16 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes b } pointCount = sdpL.Len() - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + hdpL := histogram.DataPoints() + for _, mg := range mf.groupOrders { + mg.toExponentialHistogramDataPoints(hdpL) + } + pointCount = hdpL.Len() + + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: // Everything else should be set to a Gauge. gauge := metric.SetEmptyGauge() diff --git a/receiver/prometheusreceiver/internal/metricfamily_test.go b/receiver/prometheusreceiver/internal/metricfamily_test.go index 7b37d1c6c51b..586be1992912 100644 --- a/receiver/prometheusreceiver/internal/metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/metricfamily_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" @@ -293,6 +294,199 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { } } +func TestMetricGroupData_toExponentialDistributionUnitTest(t *testing.T) { + type scrape struct { + at int64 + metric string + extraLabel labels.Label + + // Only one kind of value should be set. + value float64 + integerHistogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram // TODO: add tests for float histograms. + } + tests := []struct { + name string + metricName string + labels labels.Labels + scrapes []*scrape + want func() pmetric.ExponentialHistogramDataPoint + wantErr bool + intervalStartTimeMs int64 + }{ + { + name: "integer histogram with startTimestamp", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + { + name: "integer histogram with startTimestamp from _created", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + { + at: 11, + metric: "request_duration_seconds_created", + value: 600.78, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(timestampFromFloat64(600.78)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + return point + }, + }, + { + name: "integer histogram that is stale", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + Sum: math.Float64frombits(value.StaleNaN), + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + mp := newMetricFamily(tt.metricName, mc, zap.NewNop()) + for i, tv := range tt.scrapes { + var lbls labels.Labels + if tv.extraLabel.Name != "" { + lbls = labels.NewBuilder(tt.labels).Set(tv.extraLabel.Name, tv.extraLabel.Value).Labels() + } else { + lbls = tt.labels.Copy() + } + + var err error + switch { + case tv.integerHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, tv.integerHistogram, nil) + case tv.floatHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, nil, tv.floatHistogram) + default: + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addSeries(sRef, tv.metric, lbls, tv.at, tv.value) + } + if tt.wantErr { + if i != 0 { + require.Error(t, err) + } + } else { + require.NoError(t, err) + } + } + if tt.wantErr { + // Don't check the result if we got an error + return + } + + require.Len(t, mp.groups, 1) + + sl := pmetric.NewMetricSlice() + mp.appendMetric(sl, false) + + require.Equal(t, 1, sl.Len(), "Exactly one metric expected") + metric := sl.At(0) + require.Equal(t, mc[tt.metricName].Help, metric.Description(), "Expected help metadata in metric description") + require.Equal(t, mc[tt.metricName].Unit, metric.Unit(), "Expected unit metadata in metric") + + hdpL := metric.ExponentialHistogram().DataPoints() + require.Equal(t, 1, hdpL.Len(), "Exactly one point expected") + got := hdpL.At(0) + want := tt.want() + require.Equal(t, want, got, "Expected the points to be equal") + }) + } +} + func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { type scrape struct { at int64 diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index ab03810b57fd..26825fd6ae54 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -102,11 +102,17 @@ func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeserie name: name, attributes: getAttributesSignature(kv), } - if metric.Type() == pmetric.MetricTypeHistogram { + switch metric.Type() { + case pmetric.MetricTypeHistogram: // There are 2 types of Histograms whose aggregation temporality needs distinguishing: // * CumulativeHistogram // * GaugeHistogram key.aggTemporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + // There are 2 types of ExponentialHistograms whose aggregation temporality needs distinguishing: + // * CumulativeHistogram + // * GaugeHistogram + key.aggTemporality = metric.ExponentialHistogram().AggregationTemporality() } tsm.mark = true @@ -285,7 +291,10 @@ func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { case pmetric.MetricTypeSum: a.adjustMetricSum(tsm, metric) - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + a.adjustMetricExponentialHistogram(tsm, metric) + + case pmetric.MetricTypeEmpty: fallthrough default: @@ -346,6 +355,54 @@ func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current } } +func (a *initialPointAdjuster) adjustMetricExponentialHistogram(tsm *timeseriesMap, current pmetric.Metric) { + histogram := current.ExponentialHistogram() + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + // Only dealing with CumulativeDistributions. + return + } + + currentPoints := histogram.DataPoints() + for i := 0; i < currentPoints.Len(); i++ { + currentDist := currentPoints.At(i) + + // start timestamp was set from _created + if a.useCreatedMetric && + !currentDist.Flags().NoRecordedValue() && + currentDist.StartTimestamp() < currentDist.Timestamp() { + continue + } + + tsi, found := tsm.get(current, currentDist.Attributes()) + if !found { + // initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + + if currentDist.Flags().NoRecordedValue() { + // TODO: Investigate why this does not reset. + currentDist.SetStartTimestamp(tsi.histogram.startTime) + continue + } + + if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum { + // reset re-initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + + // Update only previous values. + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + currentDist.SetStartTimestamp(tsi.histogram.startTime) + } +} + func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) { currentPoints := current.Sum().DataPoints() for i := 0; i < currentPoints.Len(); i++ { diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go index df38dea9e968..d80dcf512ac1 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go @@ -25,10 +25,11 @@ var ( bounds0 = []float64{1, 2, 4} percent0 = []float64{10, 50, 90} - sum1 = "sum1" - gauge1 = "gauge1" - histogram1 = "histogram1" - summary1 = "summary1" + sum1 = "sum1" + gauge1 = "gauge1" + histogram1 = "histogram1" + summary1 = "summary1" + exponentialHistogram1 = "exponentialHistogram1" k1v1k2v2 = []*kv{ {"k1", "v1"}, @@ -246,6 +247,67 @@ func TestHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) } +// In TestExponentHistogram we exclude negative buckets on purpose as they are +// not considered the main use case - response times that are most commonly +// observed are never negative. Negative buckets would make the Sum() non +// monotonic and cause unexpected resets. +func TestExponentialHistogram(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Exponential Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t2, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + }, { + description: "Exponential Histogram: round 4 - instance adjusted based on round 3", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t4, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + }, + } + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValue(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + }, + { + description: "Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, t1, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is unknown", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + }, + { + description: "Histogram: round 2 - instance unchanged", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + func TestSummaryFlagNoRecordedValueFirstObservation(t *testing.T) { script := []*metricsAdjusterTest{ { diff --git a/receiver/prometheusreceiver/internal/metricsutil_test.go b/receiver/prometheusreceiver/internal/metricsutil_test.go index 4ba25cfe846e..8a0670a1d7ea 100644 --- a/receiver/prometheusreceiver/internal/metricsutil_test.go +++ b/receiver/prometheusreceiver/internal/metricsutil_test.go @@ -78,6 +78,99 @@ func histogramMetric(name string, points ...pmetric.HistogramDataPoint) pmetric. return metric } +func exponentialHistogramMetric(name string, points ...pmetric.ExponentialHistogramDataPoint) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(name) + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + destPointL := histogram.DataPoints() + // By default the AggregationTemporality is Cumulative until it'll be changed by the caller. + for _, point := range points { + destPoint := destPointL.AppendEmpty() + point.CopyTo(destPoint) + } + + return metric +} + +func exponentialHistogramPointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := pmetric.NewExponentialHistogramDataPoint() + hdp.SetStartTimestamp(startTimestamp) + hdp.SetTimestamp(timestamp) + + attrs := hdp.Attributes() + for _, kv := range attributes { + attrs.PutStr(kv.Key, kv.Value) + } + + return hdp +} + +func exponentialHistogramPoint(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, negativeOffset int32, negativeBuckets []uint64, positiveOffset int32, positiveBuckets []uint64) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + hdp.Negative().SetOffset(negativeOffset) + hdp.Negative().BucketCounts().FromRaw(negativeBuckets) + hdp.Positive().SetOffset(positiveOffset) + hdp.Positive().BucketCounts().FromRaw(positiveBuckets) + + count := uint64(0) + sum := float64(0) + for i, bCount := range positiveBuckets { + count += bCount + sum += float64(bCount) * float64(i) + } + for i, bCount := range negativeBuckets { + count += bCount + sum -= float64(bCount) * float64(i) + } + hdp.SetCount(count) + hdp.SetSum(sum) + return hdp +} + +func exponentialHistogramPointNoValue(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + + return hdp +} + +// exponentialHistogramPointSimplified let's you define an exponential +// histogram with just a few parameters. +// Scale and ZeroCount are set to the provided values. +// Positive and negative buckets are generated using the offset and bucketCount +// parameters by adding buckets from offset in both positive and negative +// directions. Bucket counts start from 1 and increase by 1 for each bucket. +// Sum and Count will be proportional to the bucket count. +func exponentialHistogramPointSimplified(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, offset int32, bucketCount int) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + + positive := hdp.Positive() + positive.SetOffset(offset) + positive.BucketCounts().EnsureCapacity(bucketCount) + negative := hdp.Negative() + negative.SetOffset(offset) + negative.BucketCounts().EnsureCapacity(bucketCount) + + var sum float64 + var count uint64 + for i := 0; i < bucketCount; i++ { + positive.BucketCounts().Append(uint64(i + 1)) + negative.BucketCounts().Append(uint64(i + 1)) + count += uint64(i+1) + uint64(i+1) + sum += float64(i+1)*10 + float64(i+1)*10.0 + } + hdp.SetCount(count) + hdp.SetSum(sum) + + return hdp +} + func doublePointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.NumberDataPoint { ndp := pmetric.NewNumberDataPoint() ndp.SetStartTimestamp(startTimestamp) diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go index 9195136e7841..ca7ae2a29171 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go @@ -68,7 +68,14 @@ func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) erro dp.SetStartTimestamp(startTimeTs) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + dp := dataPoints.At(l) + dp.SetStartTimestamp(startTimeTs) + } + + case pmetric.MetricTypeEmpty: fallthrough default: diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go index 89e4b10f8e5f..84bdc2756ed5 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go @@ -33,6 +33,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"), expectedStartTime: timestampFromFloat64(matchBuilderStartTime), @@ -45,6 +46,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), expectedStartTime: timestampFromFloat64(matchBuilderStartTime + 1), }, @@ -139,7 +141,12 @@ func TestStartTimeMetricMatch(t *testing.T) { for l := 0; l < dps.Len(); l++ { assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 563642bc61bb..dc1bf78dba9e 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -34,19 +35,20 @@ const ( ) type transaction struct { - isNew bool - trimSuffixes bool - ctx context.Context - families map[scopeID]map[string]*metricFamily - mc scrape.MetricMetadataStore - sink consumer.Metrics - externalLabels labels.Labels - nodeResource pcommon.Resource - scopeAttributes map[scopeID]pcommon.Map - logger *zap.Logger - buildInfo component.BuildInfo - metricAdjuster MetricsAdjuster - obsrecv *receiverhelper.ObsReport + isNew bool + trimSuffixes bool + enableNativeHistograms bool + ctx context.Context + families map[scopeID]map[string]*metricFamily + mc scrape.MetricMetadataStore + sink consumer.Metrics + externalLabels labels.Labels + nodeResource pcommon.Resource + scopeAttributes map[scopeID]pcommon.Map + logger *zap.Logger + buildInfo component.BuildInfo + metricAdjuster MetricsAdjuster + obsrecv *receiverhelper.ObsReport // Used as buffer to calculate series ref hash. bufBytes []byte } @@ -65,20 +67,22 @@ func newTransaction( externalLabels labels.Labels, settings receiver.CreateSettings, obsrecv *receiverhelper.ObsReport, - trimSuffixes bool) *transaction { + trimSuffixes bool, + enableNativeHistograms bool) *transaction { return &transaction{ - ctx: ctx, - families: make(map[scopeID]map[string]*metricFamily), - isNew: true, - trimSuffixes: trimSuffixes, - sink: sink, - metricAdjuster: metricAdjuster, - externalLabels: externalLabels, - logger: settings.Logger, - buildInfo: settings.BuildInfo, - obsrecv: obsrecv, - bufBytes: make([]byte, 0, 1024), - scopeAttributes: make(map[scopeID]pcommon.Map), + ctx: ctx, + families: make(map[scopeID]map[string]*metricFamily), + isNew: true, + trimSuffixes: trimSuffixes, + enableNativeHistograms: enableNativeHistograms, + sink: sink, + metricAdjuster: metricAdjuster, + externalLabels: externalLabels, + logger: settings.Logger, + buildInfo: settings.BuildInfo, + obsrecv: obsrecv, + bufBytes: make([]byte, 0, 1024), + scopeAttributes: make(map[scopeID]pcommon.Map), } } @@ -145,16 +149,42 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, return 0, nil } - curMF := t.getOrCreateMetricFamily(getScopeID(ls), metricName) - err := curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val) + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + + if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram { + // If a histogram has both classic and native version, the native histogram is scraped + // first. Getting a float sample for the same series means that `scrape_classic_histogram` + // is set to true in the scrape config. In this case, we should ignore the native histogram. + curMF.mtype = pmetric.MetricTypeHistogram + } + + seriesRef := t.getSeriesRef(ls, curMF.mtype) + err := curMF.addSeries(seriesRef, metricName, ls, atMs, val) if err != nil { - t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + // Handle special case of float sample indicating staleness of native + // histogram. This is similar to how Prometheus handles it, but we + // don't have access to the previous value so we're applying some + // heuristics to figure out if this is native histogram or not. + // The metric type will indicate histogram, but presumably there will be no + // _bucket, _count, _sum suffix or `le` label, which makes addSeries fail + // with errEmptyLeLabel. + if t.enableNativeHistograms && errors.Is(err, errEmptyLeLabel) && !existing && value.IsStaleNaN(val) && curMF.mtype == pmetric.MetricTypeHistogram { + mg := curMF.loadMetricGroupOrCreate(seriesRef, ls, atMs) + curMF.mtype = pmetric.MetricTypeExponentialHistogram + mg.mtype = pmetric.MetricTypeExponentialHistogram + _ = curMF.addExponentialHistogramSeries(seriesRef, metricName, ls, atMs, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}, nil) + // ignore errors here, this is best effort. + } else { + t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } } return 0, nil // never return errors, as that fails the whole scrape } -func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricFamily { +// getOrCreateMetricFamily returns the metric family for the given metric name and scope, +// and true if an existing family was found. +func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) (*metricFamily, bool) { _, ok := t.families[scope] if !ok { t.families[scope] = make(map[string]*metricFamily) @@ -170,9 +200,10 @@ func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricF } else { curMf = newMetricFamily(mn, t.mc, t.logger) t.families[scope][curMf.name] = curMf + return curMf, false } } - return curMf + return curMf, true } func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { @@ -199,15 +230,71 @@ func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exe return 0, errMetricNameNotFound } - mf := t.getOrCreateMetricFamily(getScopeID(l), mn) + mf, _ := t.getOrCreateMetricFamily(getScopeID(l), mn) mf.addExemplar(t.getSeriesRef(l, mf.mtype), e) return 0, nil } -func (t *transaction) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - //TODO: implement this func - return 0, nil +func (t *transaction) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, atMs int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if !t.enableNativeHistograms { + return 0, nil + } + + select { + case <-t.ctx.Done(): + return 0, errTransactionAborted + default: + } + + if t.externalLabels.Len() != 0 { + b := labels.NewBuilder(ls) + t.externalLabels.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + ls = b.Labels() + } + + if t.isNew { + if err := t.initTransaction(ls); err != nil { + return 0, err + } + } + + // Any datapoint with duplicate labels MUST be rejected per: + // * https://github.com/open-telemetry/wg-prometheus/issues/44 + // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407 + // as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13. + if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup { + return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel) + } + + metricName := ls.Get(model.MetricNameLabel) + if metricName == "" { + return 0, errMetricNameNotFound + } + + // The `up`, `target_info`, `otel_scope_info` metrics should never generate native histograms, + // thus we don't check for them here as opposed to the Append function. + + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + if !existing { + curMF.mtype = pmetric.MetricTypeExponentialHistogram + } else if curMF.mtype != pmetric.MetricTypeExponentialHistogram { + // Already scraped as classic histogram. + return 0, nil + } + + if h != nil && h.CounterResetHint == histogram.GaugeType || fh != nil && fh.CounterResetHint == histogram.GaugeType { + t.logger.Warn("dropping unsupported gauge histogram datapoint", zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + err := curMF.addExponentialHistogramSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, h, fh) + if err != nil { + t.logger.Warn("failed to add histogram datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + return 0, nil // never return errors, as that fails the whole scrape } func (t *transaction) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index fde7c6a07000..2956ec2e3b12 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -6,14 +6,17 @@ package internal import ( "context" "errors" + "fmt" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -53,42 +56,89 @@ var ( ) func TestTransactionCommitWithoutAdding(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitWithoutAdding(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitWithoutAdding(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Commit()) } func TestTransactionRollbackDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionRollbackDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionRollbackDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Rollback()) } func TestTransactionUpdateMetadataDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionUpdateMetadataDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionUpdateMetadataDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{}) assert.NoError(t, err) } func TestTransactionAppendNoTarget(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoTarget(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoTarget(t *testing.T, enableNativeHistograms bool) { badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test") - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0) assert.Error(t, err) } func TestTransactionAppendNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoMetricName(t *testing.T, enableNativeHistograms bool) { jobNotFoundLb := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", }) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0) assert.ErrorIs(t, err, errMetricNameNotFound) - assert.ErrorIs(t, tr.Commit(), errNoDataToBuild) } func TestTransactionAppendEmptyMetricName(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendEmptyMetricName(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", @@ -98,8 +148,16 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) { } func TestTransactionAppendResource(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendResource(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendResource(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -121,8 +179,16 @@ func TestTransactionAppendResource(t *testing.T) { } func TestReceiverVersionAndNameAreAttached(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testReceiverVersionAndNameAreAttached(t, enableNativeHistograms) + }) + } +} + +func testReceiverVersionAndNameAreAttached(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -143,6 +209,14 @@ func TestReceiverVersionAndNameAreAttached(t *testing.T) { } func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitErrorWhenAdjusterError(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitErrorWhenAdjusterError(t *testing.T, enableNativeHistograms bool) { goodLabels := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -150,7 +224,7 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { }) sink := new(consumertest.MetricsSink) adjusterErr := errors.New("adjuster error") - tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0) assert.NoError(t, err) assert.ErrorIs(t, tr.Commit(), adjusterErr) @@ -158,8 +232,16 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestTransactionAppendDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) dupLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -176,6 +258,14 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) { } func TestTransactionAppendHistogramNoLe(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendHistogramNoLe(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendHistogramNoLe(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -188,6 +278,7 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -206,6 +297,14 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { } func TestTransactionAppendSummaryNoQuantile(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendSummaryNoQuantile(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendSummaryNoQuantile(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -218,6 +317,7 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -236,6 +336,14 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { } func TestTransactionAppendValidAndInvalid(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendValidAndInvalid(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendValidAndInvalid(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -248,6 +356,7 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) // a valid counter @@ -281,8 +390,16 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { } func TestAppendExemplarWithNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -294,8 +411,16 @@ func TestAppendExemplarWithNoMetricName(t *testing.T) { } func TestAppendExemplarWithEmptyMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -307,8 +432,16 @@ func TestAppendExemplarWithEmptyMetricName(t *testing.T) { } func TestAppendExemplarWithDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -323,8 +456,16 @@ func TestAppendExemplarWithDuplicateLabels(t *testing.T) { } func TestAppendExemplarWithoutAddingMetric(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithoutAddingMetric(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithoutAddingMetric(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -337,16 +478,32 @@ func TestAppendExemplarWithoutAddingMetric(t *testing.T) { } func TestAppendExemplarWithNoLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) } func TestAppendExemplarWithEmptyLabelArray(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyLabelArray(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyLabelArray(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.AppendExemplar(0, labels.FromStrings(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) @@ -575,9 +732,11 @@ func TestMetricBuilderCounters(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -798,9 +957,11 @@ func TestMetricBuilderGauges(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -894,9 +1055,11 @@ func TestMetricBuilderUntyped(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1312,9 +1475,12 @@ func TestMetricBuilderHistogram(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + // None of the histograms above have native histogram versions, so enabling native hisotgrams has no effect. + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1454,30 +1620,137 @@ func TestMetricBuilderSummary(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } +func TestMetricBuilderNativeHistogram(t *testing.T) { + for _, enableNativeHistograms := range []bool{false, true} { + emptyH := &histogram.Histogram{ + Schema: 1, + Count: 0, + Sum: 0, + ZeroThreshold: 0.001, + ZeroCount: 0, + } + h0 := tsdbutil.GenerateTestHistogram(0) + + tests := []buildTestData{ + { + name: "empty integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", emptyH, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(0) + pt0.SetSum(0) + pt0.SetZeroThreshold(0.001) + pt0.SetScale(1) + + return []pmetric.Metrics{md0} + }, + }, + { + name: "integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", h0, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(12) + pt0.SetSum(18.4) + pt0.SetScale(1) + pt0.SetZeroThreshold(0.001) + pt0.SetZeroCount(2) + pt0.Positive().SetOffset(-1) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(2) + pt0.Positive().BucketCounts().Append(0) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(1) + pt0.Negative().SetOffset(-1) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(2) + pt0.Negative().BucketCounts().Append(0) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(1) + + return []pmetric.Metrics{md0} + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } + } +} + type buildTestData struct { name string inputs []*testScrapedPage wants func() []pmetric.Metrics } -func (tt buildTestData) run(t *testing.T) { +func (tt buildTestData) run(t *testing.T, enableNativeHistograms bool) { wants := tt.wants() assert.EqualValues(t, len(wants), len(tt.inputs)) st := ts for i, page := range tt.inputs { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) for _, pt := range page.pts { // set ts for testing pt.t = st - _, err := tr.Append(0, pt.lb, pt.t, pt.v) + var err error + switch { + case pt.fh != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, nil, pt.fh) + case pt.h != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, pt.h, nil) + default: + _, err = tr.Append(0, pt.lb, pt.t, pt.v) + } assert.NoError(t, err) for _, e := range pt.exemplars { @@ -1534,7 +1807,12 @@ func (s *startTimeAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { for l := 0; l < dps.Len(); l++ { dps.At(l).SetStartTimestamp(s.startTime) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dps.At(l).SetStartTimestamp(s.startTime) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } @@ -1546,6 +1824,8 @@ type testDataPoint struct { lb labels.Labels t int64 v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram exemplars []exemplar.Exemplar } @@ -1568,6 +1848,13 @@ func createDataPoint(mname string, value float64, es []exemplar.Exemplar, tagPai } } +func createHistogramDataPoint(mname string, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar, tagPairs ...string) *testDataPoint { + dataPoint := createDataPoint(mname, 0, es, tagPairs...) + dataPoint.h = h + dataPoint.fh = fh + return dataPoint +} + func assertEquivalentMetrics(t *testing.T, want, got pmetric.Metrics) { require.Equal(t, want.ResourceMetrics().Len(), got.ResourceMetrics().Len()) if want.ResourceMetrics().Len() == 0 { diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index bcd2192a55fe..771e0dfaca9d 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -233,6 +233,13 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config } func (r *pReceiver) applyCfg(cfg *PromConfig) error { + if !enableNativeHistogramsGate.IsEnabled() { + // Enforce scraping classic histograms to avoid dropping them. + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + } + if err := r.scrapeManager.ApplyConfig((*config.Config)(cfg)); err != nil { return err } @@ -284,6 +291,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log r.cfg.UseStartTimeMetric, startTimeMetricRegex, useCreatedMetricGate.IsEnabled(), + enableNativeHistogramsGate.IsEnabled(), r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, r.cfg.TrimMetricSuffixes, ) diff --git a/receiver/prometheusreceiver/metrics_receiver_helper_test.go b/receiver/prometheusreceiver/metrics_receiver_helper_test.go index 332183ee0fa8..41ed19989a0a 100644 --- a/receiver/prometheusreceiver/metrics_receiver_helper_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_helper_test.go @@ -283,6 +283,12 @@ func isFirstFailedScrape(metrics []pmetric.Metric, normalizedNames bool) bool { return false } } + case pmetric.MetricTypeExponentialHistogram: + for i := 0; i < m.ExponentialHistogram().DataPoints().Len(); i++ { + if !m.ExponentialHistogram().DataPoints().At(i).Flags().NoRecordedValue() { + return false + } + } case pmetric.MetricTypeHistogram: for i := 0; i < m.Histogram().DataPoints().Len(); i++ { if !m.Histogram().DataPoints().At(i).Flags().NoRecordedValue() { @@ -295,7 +301,7 @@ func isFirstFailedScrape(metrics []pmetric.Metric, normalizedNames bool) bool { return false } } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeEmpty: } } return true @@ -362,11 +368,13 @@ type metricTypeComparator func(*testing.T, pmetric.Metric) type numberPointComparator func(*testing.T, pmetric.NumberDataPoint) type histogramPointComparator func(*testing.T, pmetric.HistogramDataPoint) type summaryPointComparator func(*testing.T, pmetric.SummaryDataPoint) +type exponentialHistogramComparator func(*testing.T, pmetric.ExponentialHistogramDataPoint) type dataPointExpectation struct { - numberPointComparator []numberPointComparator - histogramPointComparator []histogramPointComparator - summaryPointComparator []summaryPointComparator + numberPointComparator []numberPointComparator + histogramPointComparator []histogramPointComparator + summaryPointComparator []summaryPointComparator + exponentialHistogramComparator []exponentialHistogramComparator } type testExpectation func(*testing.T, pmetric.ResourceMetrics) @@ -426,7 +434,12 @@ func assertMetricPresent(name string, metricTypeExpectations metricTypeComparato require.Equal(t, m.Summary().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Summary metric '%s' does not match to testdata", name) spc(t, m.Summary().DataPoints().At(i)) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + for _, ehc := range de.exponentialHistogramComparator { + require.Equal(t, m.ExponentialHistogram().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Exponential Histogram metric '%s' does not match to testdata", name) + ehc(t, m.ExponentialHistogram().DataPoints().At(i)) + } + case pmetric.MetricTypeEmpty: } } } @@ -521,6 +534,13 @@ func assertHistogramPointFlagNoRecordedValue() histogramPointComparator { } } +func assertExponentialHistogramPointFlagNoRecordedValue() exponentialHistogramComparator { + return func(t *testing.T, histogramDataPoint pmetric.ExponentialHistogramDataPoint) { + assert.True(t, histogramDataPoint.Flags().NoRecordedValue(), + "Datapoint flag for staleness marker not found as expected") + } +} + func assertSummaryPointFlagNoRecordedValue() summaryPointComparator { return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) { assert.True(t, summaryDataPoint.Flags().NoRecordedValue(), @@ -586,6 +606,19 @@ func compareHistogram(count uint64, sum float64, upperBounds []float64, buckets } } +func compareExponentialHistogram(scale int32, count uint64, sum float64, zeroCount uint64, negativeOffset int32, negativeBuckets []uint64, positiveOffset int32, positiveBuckets []uint64) exponentialHistogramComparator { + return func(t *testing.T, exponentialHistogramDataPoint pmetric.ExponentialHistogramDataPoint) { + assert.Equal(t, scale, exponentialHistogramDataPoint.Scale(), "Exponential Histogram scale value does not match") + assert.Equal(t, count, exponentialHistogramDataPoint.Count(), "Exponential Histogram count value does not match") + assert.Equal(t, sum, exponentialHistogramDataPoint.Sum(), "Exponential Histogram sum value does not match") + assert.Equal(t, zeroCount, exponentialHistogramDataPoint.ZeroCount(), "Exponential Histogram zero count value does not match") + assert.Equal(t, negativeOffset, exponentialHistogramDataPoint.Negative().Offset(), "Exponential Histogram negative offset value does not match") + assert.Equal(t, negativeBuckets, exponentialHistogramDataPoint.Negative().BucketCounts().AsRaw(), "Exponential Histogram negative bucket count values do not match") + assert.Equal(t, positiveOffset, exponentialHistogramDataPoint.Positive().Offset(), "Exponential Histogram positive offset value does not match") + assert.Equal(t, positiveBuckets, exponentialHistogramDataPoint.Positive().BucketCounts().AsRaw(), "Exponential Histogram positive bucket count values do not match") + } +} + func compareSummary(count uint64, sum float64, quantiles [][]float64) summaryPointComparator { return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) { assert.Equal(t, count, summaryDataPoint.Count(), "Summary count value does not match") diff --git a/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go b/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go index 5aa5ce15afd8..0b16a10c1c1a 100644 --- a/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/prometheus/config" dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -157,3 +159,455 @@ func TestScrapeViaProtobuf(t *testing.T) { c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} }) } + +func TestNativeVsClassicHistogramScrapeViaProtobuf(t *testing.T) { + classicHistogram := &dto.MetricFamily{ + Name: "test_classic_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + Bucket: []dto.Bucket{ + { + UpperBound: 0.5, + CumulativeCount: 789, + }, + { + UpperBound: 10, + CumulativeCount: 1011, + }, + { + UpperBound: math.Inf(1), + CumulativeCount: 1213, + }, + }, + }, + }, + }, + } + buffer := prometheusMetricFamilyToProtoBuf(t, nil, classicHistogram) + + mixedHistogram := &dto.MetricFamily{ + Name: "test_mixed_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + Bucket: []dto.Bucket{ + { + UpperBound: 0.5, + CumulativeCount: 789, + }, + { + UpperBound: 10, + CumulativeCount: 1011, + }, + { + UpperBound: math.Inf(1), + CumulativeCount: 1213, + }, + }, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 2, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer, mixedHistogram) + + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1214, + SampleSum: 3456, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 5, + NegativeSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: 3, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer, nativeHistogram) + + testCases := map[string]struct { + mutCfg func(*PromConfig) + enableNativeHistograms bool + expected []testExpectation + }{ + "feature enabled scrape classic off": { + enableNativeHistograms: true, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape native buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 1}), + }, + }}, + ), + assertMetricPresent( // Scrape native only histograms as is. + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1214, 3456, 5, -3, []uint64{1, 0, 2}, 2, []uint64{1, 0, 0, 1}), + }, + }}, + ), + }, + }, + "feature disabled scrape classic off": { + enableNativeHistograms: false, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Fallback to scraping classic histograms if feature is off. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + // When the native histograms feature is off, no native histograms are scraped. + assertMetricAbsent("test_native_histogram"), + }, + }, + "feature enabled scrape classic on": { + mutCfg: func(cfg *PromConfig) { + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + }, + enableNativeHistograms: true, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape classic buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Scrape native only histograms as is. + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1214, 3456, 5, -3, []uint64{1, 0, 2}, 2, []uint64{1, 0, 0, 1}), + }, + }}, + ), + }, + }, + "feature disabled scrape classic on": { + mutCfg: func(cfg *PromConfig) { + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + }, + enableNativeHistograms: false, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape classic buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + // When the native histograms feature is off, no native histograms are scraped. + assertMetricAbsent("test_native_histogram"), + }, + }, + } + + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", tc.enableNativeHistograms) + require.NoError(t, err) + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target1", td.attributes, result[0], tc.expected) + }, + }, + } + mutCfg := tc.mutCfg + if mutCfg == nil { + mutCfg = func(*PromConfig) {} + } + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }, mutCfg) + }) + } +} + +func TestStaleExponentialHistogram(t *testing.T) { + mf := &dto.MetricFamily{ + Name: "test_counter", + Type: dto.MetricType_COUNTER, + Metric: []dto.Metric{ + { + Label: []dto.LabelPair{ + { + Name: "foo", + Value: "bar", + }, + }, + Counter: &dto.Counter{ + Value: 1234, + }, + }, + }, + } + buffer1 := prometheusMetricFamilyToProtoBuf(t, nil, mf) + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 2, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer1, nativeHistogram) + + mf = &dto.MetricFamily{ + Name: "test_counter", + Type: dto.MetricType_COUNTER, + Metric: []dto.Metric{ + { + Label: []dto.LabelPair{ + { + Name: "foo", + Value: "bar", + }, + }, + Counter: &dto.Counter{ + Value: 2222, + }, + }, + }, + } + buffer2 := prometheusMetricFamilyToProtoBuf(t, nil, mf) + + expectations1 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 0, 1}), + }, + }}, + ), + } + + expectations2 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + assertExponentialHistogramPointFlagNoRecordedValue(), + }, + }}, + ), + } + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer1.Bytes()}, + {code: 200, useProtoBuf: true, buf: buffer2.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target11", td.attributes, result[0], expectations1) + doCompare(t, "target12", td.attributes, result[1], expectations2) + }, + }, + } + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", true) + require.NoError(t, err) + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }) +} + +func TestFloatCounterHistogram(t *testing.T) { + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCountFloat: 1213.0, + SampleSum: 456, + // Float counter histogram definition + Schema: -1, + ZeroThreshold: 0.001, + ZeroCountFloat: 2.0, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeCount: []float64{1.5, 2.5}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveCount: []float64{1.0, 3.0}, + }, + }, + }, + } + buffer := prometheusMetricFamilyToProtoBuf(t, nil, nativeHistogram) + + expectations1 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(-1, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 0, 3}), + }, + }}, + ), + } + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target1", td.attributes, result[0], expectations1) + }, + }, + } + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", true) + require.NoError(t, err) + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }) +}