From 13fca794d57a15e68b41b95ea8df64ff7564f04d Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Tue, 9 Apr 2024 14:59:30 +0200 Subject: [PATCH] [receiver/prometheusreceiver] implement append native histogram (#28663) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Implement native histogram append MVP. Very similar to appending a float sample. Limitations: - Only support integer counter histograms fully. - In case a histogram has both classic and native buckets, we only store one of them. Governed by scrape_classic_histograms scrape option. The reason is that in the OTEL model the metric family is identified by the normalized name (without _count, _sum, _bucket suffixes for the classic histograms), meaning that the classic and native histograms would map to the same metric family in OTEL model , but that cannot have both Histogram and ExponentialHistogram types at the same time. - Gauge histograms are dropped with warning as that temporality is unsupported, see https://github.com/open-telemetry/opentelemetry-specification/issues/2714 - NoRecordedValue attribute might be unreliable. Prometheus scrape marks all series with float NaN values when stale, but transactions in prometheusreceiver are stateless, meaning that we have to use heuristics to figure out if we need to add a NoRecordedValue data point to an Exponential Histogram metric. (Need work in Prometheus.) Additionally: - Created timestamp supported. - Float counter histograms not fully tested and lose precision, but we don't expect instrumentation to expose these anyway. **Link to tracking Issue:** Fixes: #26555 **Testing:** Added unit tests and e2e tests. **Documentation:** TBD: will have to call out protobuf negotiation while no text format. #27030 --------- Signed-off-by: György Krajcsovits Co-authored-by: David Ashpole --- ...theusreceiver-append-native-histogram.yaml | 36 ++ receiver/prometheusreceiver/README.md | 23 +- receiver/prometheusreceiver/factory.go | 8 + .../prometheusreceiver/internal/appendable.go | 33 +- .../internal/metricfamily.go | 163 ++++++- .../internal/metricfamily_test.go | 194 ++++++++ .../internal/metrics_adjuster.go | 61 ++- .../internal/metrics_adjuster_test.go | 70 ++- .../internal/metricsutil_test.go | 93 ++++ .../internal/starttimemetricadjuster.go | 9 +- .../internal/starttimemetricadjuster_test.go | 9 +- .../internal/transaction.go | 157 ++++-- .../internal/transaction_test.go | 359 ++++++++++++-- .../prometheusreceiver/metrics_receiver.go | 8 + .../metrics_receiver_helper_test.go | 43 +- .../metrics_receiver_protobuf_test.go | 454 ++++++++++++++++++ 16 files changed, 1618 insertions(+), 102 deletions(-) create mode 100644 .chloggen/prometheusreceiver-append-native-histogram.yaml diff --git a/.chloggen/prometheusreceiver-append-native-histogram.yaml b/.chloggen/prometheusreceiver-append-native-histogram.yaml new file mode 100644 index 000000000000..34fb9ea34dec --- /dev/null +++ b/.chloggen/prometheusreceiver-append-native-histogram.yaml @@ -0,0 +1,36 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allows receiving prometheus native histograms + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26555] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Native histograms are compatible with OTEL exponential histograms. + - The feature can be enabled via the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`. + Run the collector with the command line option `--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms`. + - Currently the feature also requires that targets are scraped via the ProtoBuf format. + To start scraping native histograms, set + `config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]` in the + receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats. + - For more up to date information see the README.md file of the receiver at + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/prometheusreceiver/README.md#prometheus-native-histograms. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/prometheusreceiver/README.md b/receiver/prometheusreceiver/README.md index 7eb086fb4c9e..9b2f1f844eab 100644 --- a/receiver/prometheusreceiver/README.md +++ b/receiver/prometheusreceiver/README.md @@ -67,6 +67,12 @@ prometheus --config.file=prom.yaml "--feature-gates=receiver.prometheusreceiver.UseCreatedMetric" ``` +- `receiver.prometheusreceiver.EnableNativeHistograms`: process and turn native histogram metrics into OpenTelemetry exponential histograms. For more details consult the [Prometheus native histograms](#prometheus-native-histograms) section. + +```shell +"--feature-gates=receiver.prometheusreceiver.EnableNativeHistograms" +``` + - `report_extra_scrape_metrics`: Extra Prometheus scrape metrics can be reported by setting this parameter to `true` You can copy and paste that same configuration under: @@ -123,7 +129,22 @@ receivers: - targets: ['0.0.0.0:8888'] ``` -## OpenTelemetry Operator +## Prometheus native histograms + +Native histograms are an experimental [feature](https://prometheus.io/docs/prometheus/latest/feature_flags/#native-histograms) of Prometheus. + +To start scraping native histograms, set `config.global.scrape_protocols` to `[ PrometheusProto, OpenMetricsText1.0.0, OpenMetricsText0.0.1, PrometheusText0.0.4 ]` +in the receiver configuration. This requirement will be lifted once Prometheus can scrape native histograms over text formats. + +To enable converting native histograms to OpenTelemetry exponential histograms, enable the feature gate `receiver.prometheusreceiver.EnableNativeHistograms`. +The feature is considered experimental. + +This feature applies to the most common integer counter histograms, gauge histograms are dropped. +In case a metric has both the conventional (aka classic) buckets and also native histogram buckets, only the native histogram buckets will be +taken into account to create the corresponding exponential histogram. To scrape the classic buckets instead use the +[scrape option](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) `scrape_classic_histograms`. + +## OpenTelemetry Operator Additional to this static job definitions this receiver allows to query a list of jobs from the OpenTelemetryOperators TargetAllocator or a compatible endpoint. diff --git a/receiver/prometheusreceiver/factory.go b/receiver/prometheusreceiver/factory.go index 91eba1919a9b..e3717e15aa3c 100644 --- a/receiver/prometheusreceiver/factory.go +++ b/receiver/prometheusreceiver/factory.go @@ -25,6 +25,14 @@ var useCreatedMetricGate = featuregate.GlobalRegistry().MustRegister( " retrieve the start time for Summary, Histogram and Sum metrics from _created metric"), ) +var enableNativeHistogramsGate = featuregate.GlobalRegistry().MustRegister( + "receiver.prometheusreceiver.EnableNativeHistograms", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, the Prometheus receiver will convert"+ + " Prometheus native histograms to OTEL exponential histograms and ignore"+ + " those Prometheus classic histograms that have a native histogram alternative"), +) + // NewFactory creates a new Prometheus receiver factory. func NewFactory() receiver.Factory { return receiver.NewFactory( diff --git a/receiver/prometheusreceiver/internal/appendable.go b/receiver/prometheusreceiver/internal/appendable.go index 33acfa4608cc..2be6e408a8de 100644 --- a/receiver/prometheusreceiver/internal/appendable.go +++ b/receiver/prometheusreceiver/internal/appendable.go @@ -17,12 +17,13 @@ import ( // appendable translates Prometheus scraping diffs into OpenTelemetry format. type appendable struct { - sink consumer.Metrics - metricAdjuster MetricsAdjuster - useStartTimeMetric bool - trimSuffixes bool - startTimeMetricRegex *regexp.Regexp - externalLabels labels.Labels + sink consumer.Metrics + metricAdjuster MetricsAdjuster + useStartTimeMetric bool + enableNativeHistograms bool + trimSuffixes bool + startTimeMetricRegex *regexp.Regexp + externalLabels labels.Labels settings receiver.CreateSettings obsrecv *receiverhelper.ObsReport @@ -36,6 +37,7 @@ func NewAppendable( useStartTimeMetric bool, startTimeMetricRegex *regexp.Regexp, useCreatedMetric bool, + enableNativeHistograms bool, externalLabels labels.Labels, trimSuffixes bool) (storage.Appendable, error) { var metricAdjuster MetricsAdjuster @@ -51,17 +53,18 @@ func NewAppendable( } return &appendable{ - sink: sink, - settings: set, - metricAdjuster: metricAdjuster, - useStartTimeMetric: useStartTimeMetric, - startTimeMetricRegex: startTimeMetricRegex, - externalLabels: externalLabels, - obsrecv: obsrecv, - trimSuffixes: trimSuffixes, + sink: sink, + settings: set, + metricAdjuster: metricAdjuster, + useStartTimeMetric: useStartTimeMetric, + enableNativeHistograms: enableNativeHistograms, + startTimeMetricRegex: startTimeMetricRegex, + externalLabels: externalLabels, + obsrecv: obsrecv, + trimSuffixes: trimSuffixes, }, nil } func (o *appendable) Appender(ctx context.Context) storage.Appender { - return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes) + return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms) } diff --git a/receiver/prometheusreceiver/internal/metricfamily.go b/receiver/prometheusreceiver/internal/metricfamily.go index 63ff895d4e26..ba87700d0442 100644 --- a/receiver/prometheusreceiver/internal/metricfamily.go +++ b/receiver/prometheusreceiver/internal/metricfamily.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" @@ -49,6 +50,8 @@ type metricGroup struct { hasSum bool created float64 value float64 + hValue *histogram.Histogram + fhValue *histogram.FloatHistogram complexValue []*dataPoint exemplars pmetric.ExemplarSlice } @@ -156,6 +159,118 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) mg.setExemplars(point.Exemplars()) } +// toExponentialHistogramDataPoints is based on +// https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms +func (mg *metricGroup) toExponentialHistogramDataPoints(dest pmetric.ExponentialHistogramDataPointSlice) { + if !mg.hasCount { + return + } + point := dest.AppendEmpty() + point.SetTimestamp(timestampFromMs(mg.ts)) + + // We do not set Min or Max as native histograms don't have that information. + switch { + case mg.fhValue != nil: + fh := mg.fhValue + + if value.IsStaleNaN(fh.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(fh.Schema) + // Input is a float native histogram. This conversion will lose + // precision,but we don't actually expect float histograms in scrape, + // since these are typically the result of operations on integer + // native histograms in the database. + point.SetCount(uint64(fh.Count)) + point.SetSum(fh.Sum) + point.SetZeroThreshold(fh.ZeroThreshold) + point.SetZeroCount(uint64(fh.ZeroCount)) + + if len(fh.PositiveSpans) > 0 { + point.Positive().SetOffset(fh.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.PositiveSpans, fh.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(fh.NegativeSpans) > 0 { + point.Negative().SetOffset(fh.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.NegativeSpans, fh.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + case mg.hValue != nil: + h := mg.hValue + + if value.IsStaleNaN(h.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(h.Schema) + point.SetCount(h.Count) + point.SetSum(h.Sum) + point.SetZeroThreshold(h.ZeroThreshold) + point.SetZeroCount(h.ZeroCount) + + if len(h.PositiveSpans) > 0 { + point.Positive().SetOffset(h.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.PositiveSpans, h.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(h.NegativeSpans) > 0 { + point.Negative().SetOffset(h.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.NegativeSpans, h.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + default: + // This should never happen. + return + } + + tsNanos := timestampFromMs(mg.ts) + if mg.created != 0 { + point.SetStartTimestamp(timestampFromFloat64(mg.created)) + } else { + // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp + point.SetStartTimestamp(tsNanos) + } + point.SetTimestamp(tsNanos) + populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes()) + mg.setExemplars(point.Exemplars()) +} + +func convertDeltaBuckets(spans []histogram.Span, deltas []int64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(deltas)) + bucketIdx := 0 + bucketCount := int64(0) + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + bucketCount += deltas[bucketIdx] + bucketIdx++ + buckets.Append(uint64(bucketCount)) + } + } +} + +func convertAbsoluteBuckets(spans []histogram.Span, counts []float64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(counts)) + bucketIdx := 0 + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + buckets.Append(uint64(counts[bucketIdx])) + bucketIdx++ + } + } +} + func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) { if mg == nil { return @@ -296,13 +411,17 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels } mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary}) } + case pmetric.MetricTypeExponentialHistogram: + if metricName == mf.metadata.Metric+metricSuffixCreated { + mg.created = v + } case pmetric.MetricTypeSum: if metricName == mf.metadata.Metric+metricSuffixCreated { mg.created = v } else { mg.value = v } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: mg.value = v @@ -311,6 +430,37 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels return nil } +func (mf *metricFamily) addExponentialHistogramSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error { + mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t) + if mg.ts != t { + return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName) + } + if mg.mtype != pmetric.MetricTypeExponentialHistogram { + return fmt.Errorf("metric type mismatch for exponential histogram metric %v type %s", metricName, mg.mtype.String()) + } + switch { + case fh != nil: + if mg.hValue != nil { + return fmt.Errorf("exponential histogram %v already has float counts", metricName) + } + mg.count = fh.Count + mg.sum = fh.Sum + mg.hasCount = true + mg.hasSum = true + mg.fhValue = fh + case h != nil: + if mg.fhValue != nil { + return fmt.Errorf("exponential histogram %v already has integer counts", metricName) + } + mg.count = float64(h.Count) + mg.sum = h.Sum + mg.hasCount = true + mg.hasSum = true + mg.hValue = h + } + return nil +} + func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) { metric := pmetric.NewMetric() // Trims type and unit suffixes from metric name @@ -352,7 +502,16 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes b } pointCount = sdpL.Len() - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + hdpL := histogram.DataPoints() + for _, mg := range mf.groupOrders { + mg.toExponentialHistogramDataPoints(hdpL) + } + pointCount = hdpL.Len() + + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: // Everything else should be set to a Gauge. gauge := metric.SetEmptyGauge() diff --git a/receiver/prometheusreceiver/internal/metricfamily_test.go b/receiver/prometheusreceiver/internal/metricfamily_test.go index 7b37d1c6c51b..586be1992912 100644 --- a/receiver/prometheusreceiver/internal/metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/metricfamily_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" @@ -293,6 +294,199 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { } } +func TestMetricGroupData_toExponentialDistributionUnitTest(t *testing.T) { + type scrape struct { + at int64 + metric string + extraLabel labels.Label + + // Only one kind of value should be set. + value float64 + integerHistogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram // TODO: add tests for float histograms. + } + tests := []struct { + name string + metricName string + labels labels.Labels + scrapes []*scrape + want func() pmetric.ExponentialHistogramDataPoint + wantErr bool + intervalStartTimeMs int64 + }{ + { + name: "integer histogram with startTimestamp", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + { + name: "integer histogram with startTimestamp from _created", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + { + at: 11, + metric: "request_duration_seconds_created", + value: 600.78, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(timestampFromFloat64(600.78)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + return point + }, + }, + { + name: "integer histogram that is stale", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + Sum: math.Float64frombits(value.StaleNaN), + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + mp := newMetricFamily(tt.metricName, mc, zap.NewNop()) + for i, tv := range tt.scrapes { + var lbls labels.Labels + if tv.extraLabel.Name != "" { + lbls = labels.NewBuilder(tt.labels).Set(tv.extraLabel.Name, tv.extraLabel.Value).Labels() + } else { + lbls = tt.labels.Copy() + } + + var err error + switch { + case tv.integerHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, tv.integerHistogram, nil) + case tv.floatHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, nil, tv.floatHistogram) + default: + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addSeries(sRef, tv.metric, lbls, tv.at, tv.value) + } + if tt.wantErr { + if i != 0 { + require.Error(t, err) + } + } else { + require.NoError(t, err) + } + } + if tt.wantErr { + // Don't check the result if we got an error + return + } + + require.Len(t, mp.groups, 1) + + sl := pmetric.NewMetricSlice() + mp.appendMetric(sl, false) + + require.Equal(t, 1, sl.Len(), "Exactly one metric expected") + metric := sl.At(0) + require.Equal(t, mc[tt.metricName].Help, metric.Description(), "Expected help metadata in metric description") + require.Equal(t, mc[tt.metricName].Unit, metric.Unit(), "Expected unit metadata in metric") + + hdpL := metric.ExponentialHistogram().DataPoints() + require.Equal(t, 1, hdpL.Len(), "Exactly one point expected") + got := hdpL.At(0) + want := tt.want() + require.Equal(t, want, got, "Expected the points to be equal") + }) + } +} + func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { type scrape struct { at int64 diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index ab03810b57fd..26825fd6ae54 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -102,11 +102,17 @@ func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeserie name: name, attributes: getAttributesSignature(kv), } - if metric.Type() == pmetric.MetricTypeHistogram { + switch metric.Type() { + case pmetric.MetricTypeHistogram: // There are 2 types of Histograms whose aggregation temporality needs distinguishing: // * CumulativeHistogram // * GaugeHistogram key.aggTemporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + // There are 2 types of ExponentialHistograms whose aggregation temporality needs distinguishing: + // * CumulativeHistogram + // * GaugeHistogram + key.aggTemporality = metric.ExponentialHistogram().AggregationTemporality() } tsm.mark = true @@ -285,7 +291,10 @@ func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { case pmetric.MetricTypeSum: a.adjustMetricSum(tsm, metric) - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + a.adjustMetricExponentialHistogram(tsm, metric) + + case pmetric.MetricTypeEmpty: fallthrough default: @@ -346,6 +355,54 @@ func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current } } +func (a *initialPointAdjuster) adjustMetricExponentialHistogram(tsm *timeseriesMap, current pmetric.Metric) { + histogram := current.ExponentialHistogram() + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + // Only dealing with CumulativeDistributions. + return + } + + currentPoints := histogram.DataPoints() + for i := 0; i < currentPoints.Len(); i++ { + currentDist := currentPoints.At(i) + + // start timestamp was set from _created + if a.useCreatedMetric && + !currentDist.Flags().NoRecordedValue() && + currentDist.StartTimestamp() < currentDist.Timestamp() { + continue + } + + tsi, found := tsm.get(current, currentDist.Attributes()) + if !found { + // initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + + if currentDist.Flags().NoRecordedValue() { + // TODO: Investigate why this does not reset. + currentDist.SetStartTimestamp(tsi.histogram.startTime) + continue + } + + if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum { + // reset re-initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + + // Update only previous values. + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + currentDist.SetStartTimestamp(tsi.histogram.startTime) + } +} + func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) { currentPoints := current.Sum().DataPoints() for i := 0; i < currentPoints.Len(); i++ { diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go index df38dea9e968..d80dcf512ac1 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster_test.go @@ -25,10 +25,11 @@ var ( bounds0 = []float64{1, 2, 4} percent0 = []float64{10, 50, 90} - sum1 = "sum1" - gauge1 = "gauge1" - histogram1 = "histogram1" - summary1 = "summary1" + sum1 = "sum1" + gauge1 = "gauge1" + histogram1 = "histogram1" + summary1 = "summary1" + exponentialHistogram1 = "exponentialHistogram1" k1v1k2v2 = []*kv{ {"k1", "v1"}, @@ -246,6 +247,67 @@ func TestHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) } +// In TestExponentHistogram we exclude negative buckets on purpose as they are +// not considered the main use case - response times that are most commonly +// observed are never negative. Negative buckets would make the Sum() non +// monotonic and cause unexpected resets. +func TestExponentialHistogram(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Exponential Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t2, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + }, { + description: "Exponential Histogram: round 4 - instance adjusted based on round 3", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t4, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + }, + } + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValue(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + }, + { + description: "Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, t1, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is unknown", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + }, + { + description: "Histogram: round 2 - instance unchanged", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + func TestSummaryFlagNoRecordedValueFirstObservation(t *testing.T) { script := []*metricsAdjusterTest{ { diff --git a/receiver/prometheusreceiver/internal/metricsutil_test.go b/receiver/prometheusreceiver/internal/metricsutil_test.go index 4ba25cfe846e..8a0670a1d7ea 100644 --- a/receiver/prometheusreceiver/internal/metricsutil_test.go +++ b/receiver/prometheusreceiver/internal/metricsutil_test.go @@ -78,6 +78,99 @@ func histogramMetric(name string, points ...pmetric.HistogramDataPoint) pmetric. return metric } +func exponentialHistogramMetric(name string, points ...pmetric.ExponentialHistogramDataPoint) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(name) + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + destPointL := histogram.DataPoints() + // By default the AggregationTemporality is Cumulative until it'll be changed by the caller. + for _, point := range points { + destPoint := destPointL.AppendEmpty() + point.CopyTo(destPoint) + } + + return metric +} + +func exponentialHistogramPointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := pmetric.NewExponentialHistogramDataPoint() + hdp.SetStartTimestamp(startTimestamp) + hdp.SetTimestamp(timestamp) + + attrs := hdp.Attributes() + for _, kv := range attributes { + attrs.PutStr(kv.Key, kv.Value) + } + + return hdp +} + +func exponentialHistogramPoint(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, negativeOffset int32, negativeBuckets []uint64, positiveOffset int32, positiveBuckets []uint64) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + hdp.Negative().SetOffset(negativeOffset) + hdp.Negative().BucketCounts().FromRaw(negativeBuckets) + hdp.Positive().SetOffset(positiveOffset) + hdp.Positive().BucketCounts().FromRaw(positiveBuckets) + + count := uint64(0) + sum := float64(0) + for i, bCount := range positiveBuckets { + count += bCount + sum += float64(bCount) * float64(i) + } + for i, bCount := range negativeBuckets { + count += bCount + sum -= float64(bCount) * float64(i) + } + hdp.SetCount(count) + hdp.SetSum(sum) + return hdp +} + +func exponentialHistogramPointNoValue(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + + return hdp +} + +// exponentialHistogramPointSimplified let's you define an exponential +// histogram with just a few parameters. +// Scale and ZeroCount are set to the provided values. +// Positive and negative buckets are generated using the offset and bucketCount +// parameters by adding buckets from offset in both positive and negative +// directions. Bucket counts start from 1 and increase by 1 for each bucket. +// Sum and Count will be proportional to the bucket count. +func exponentialHistogramPointSimplified(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, offset int32, bucketCount int) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + + positive := hdp.Positive() + positive.SetOffset(offset) + positive.BucketCounts().EnsureCapacity(bucketCount) + negative := hdp.Negative() + negative.SetOffset(offset) + negative.BucketCounts().EnsureCapacity(bucketCount) + + var sum float64 + var count uint64 + for i := 0; i < bucketCount; i++ { + positive.BucketCounts().Append(uint64(i + 1)) + negative.BucketCounts().Append(uint64(i + 1)) + count += uint64(i+1) + uint64(i+1) + sum += float64(i+1)*10 + float64(i+1)*10.0 + } + hdp.SetCount(count) + hdp.SetSum(sum) + + return hdp +} + func doublePointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.NumberDataPoint { ndp := pmetric.NewNumberDataPoint() ndp.SetStartTimestamp(startTimestamp) diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go index 9195136e7841..ca7ae2a29171 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go @@ -68,7 +68,14 @@ func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) erro dp.SetStartTimestamp(startTimeTs) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + dp := dataPoints.At(l) + dp.SetStartTimestamp(startTimeTs) + } + + case pmetric.MetricTypeEmpty: fallthrough default: diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go index 89e4b10f8e5f..84bdc2756ed5 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go @@ -33,6 +33,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"), expectedStartTime: timestampFromFloat64(matchBuilderStartTime), @@ -45,6 +46,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), expectedStartTime: timestampFromFloat64(matchBuilderStartTime + 1), }, @@ -139,7 +141,12 @@ func TestStartTimeMetricMatch(t *testing.T) { for l := 0; l < dps.Len(); l++ { assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 563642bc61bb..dc1bf78dba9e 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -34,19 +35,20 @@ const ( ) type transaction struct { - isNew bool - trimSuffixes bool - ctx context.Context - families map[scopeID]map[string]*metricFamily - mc scrape.MetricMetadataStore - sink consumer.Metrics - externalLabels labels.Labels - nodeResource pcommon.Resource - scopeAttributes map[scopeID]pcommon.Map - logger *zap.Logger - buildInfo component.BuildInfo - metricAdjuster MetricsAdjuster - obsrecv *receiverhelper.ObsReport + isNew bool + trimSuffixes bool + enableNativeHistograms bool + ctx context.Context + families map[scopeID]map[string]*metricFamily + mc scrape.MetricMetadataStore + sink consumer.Metrics + externalLabels labels.Labels + nodeResource pcommon.Resource + scopeAttributes map[scopeID]pcommon.Map + logger *zap.Logger + buildInfo component.BuildInfo + metricAdjuster MetricsAdjuster + obsrecv *receiverhelper.ObsReport // Used as buffer to calculate series ref hash. bufBytes []byte } @@ -65,20 +67,22 @@ func newTransaction( externalLabels labels.Labels, settings receiver.CreateSettings, obsrecv *receiverhelper.ObsReport, - trimSuffixes bool) *transaction { + trimSuffixes bool, + enableNativeHistograms bool) *transaction { return &transaction{ - ctx: ctx, - families: make(map[scopeID]map[string]*metricFamily), - isNew: true, - trimSuffixes: trimSuffixes, - sink: sink, - metricAdjuster: metricAdjuster, - externalLabels: externalLabels, - logger: settings.Logger, - buildInfo: settings.BuildInfo, - obsrecv: obsrecv, - bufBytes: make([]byte, 0, 1024), - scopeAttributes: make(map[scopeID]pcommon.Map), + ctx: ctx, + families: make(map[scopeID]map[string]*metricFamily), + isNew: true, + trimSuffixes: trimSuffixes, + enableNativeHistograms: enableNativeHistograms, + sink: sink, + metricAdjuster: metricAdjuster, + externalLabels: externalLabels, + logger: settings.Logger, + buildInfo: settings.BuildInfo, + obsrecv: obsrecv, + bufBytes: make([]byte, 0, 1024), + scopeAttributes: make(map[scopeID]pcommon.Map), } } @@ -145,16 +149,42 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, return 0, nil } - curMF := t.getOrCreateMetricFamily(getScopeID(ls), metricName) - err := curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val) + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + + if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram { + // If a histogram has both classic and native version, the native histogram is scraped + // first. Getting a float sample for the same series means that `scrape_classic_histogram` + // is set to true in the scrape config. In this case, we should ignore the native histogram. + curMF.mtype = pmetric.MetricTypeHistogram + } + + seriesRef := t.getSeriesRef(ls, curMF.mtype) + err := curMF.addSeries(seriesRef, metricName, ls, atMs, val) if err != nil { - t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + // Handle special case of float sample indicating staleness of native + // histogram. This is similar to how Prometheus handles it, but we + // don't have access to the previous value so we're applying some + // heuristics to figure out if this is native histogram or not. + // The metric type will indicate histogram, but presumably there will be no + // _bucket, _count, _sum suffix or `le` label, which makes addSeries fail + // with errEmptyLeLabel. + if t.enableNativeHistograms && errors.Is(err, errEmptyLeLabel) && !existing && value.IsStaleNaN(val) && curMF.mtype == pmetric.MetricTypeHistogram { + mg := curMF.loadMetricGroupOrCreate(seriesRef, ls, atMs) + curMF.mtype = pmetric.MetricTypeExponentialHistogram + mg.mtype = pmetric.MetricTypeExponentialHistogram + _ = curMF.addExponentialHistogramSeries(seriesRef, metricName, ls, atMs, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}, nil) + // ignore errors here, this is best effort. + } else { + t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } } return 0, nil // never return errors, as that fails the whole scrape } -func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricFamily { +// getOrCreateMetricFamily returns the metric family for the given metric name and scope, +// and true if an existing family was found. +func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) (*metricFamily, bool) { _, ok := t.families[scope] if !ok { t.families[scope] = make(map[string]*metricFamily) @@ -170,9 +200,10 @@ func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricF } else { curMf = newMetricFamily(mn, t.mc, t.logger) t.families[scope][curMf.name] = curMf + return curMf, false } } - return curMf + return curMf, true } func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { @@ -199,15 +230,71 @@ func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exe return 0, errMetricNameNotFound } - mf := t.getOrCreateMetricFamily(getScopeID(l), mn) + mf, _ := t.getOrCreateMetricFamily(getScopeID(l), mn) mf.addExemplar(t.getSeriesRef(l, mf.mtype), e) return 0, nil } -func (t *transaction) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - //TODO: implement this func - return 0, nil +func (t *transaction) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, atMs int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if !t.enableNativeHistograms { + return 0, nil + } + + select { + case <-t.ctx.Done(): + return 0, errTransactionAborted + default: + } + + if t.externalLabels.Len() != 0 { + b := labels.NewBuilder(ls) + t.externalLabels.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + ls = b.Labels() + } + + if t.isNew { + if err := t.initTransaction(ls); err != nil { + return 0, err + } + } + + // Any datapoint with duplicate labels MUST be rejected per: + // * https://github.com/open-telemetry/wg-prometheus/issues/44 + // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407 + // as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13. + if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup { + return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel) + } + + metricName := ls.Get(model.MetricNameLabel) + if metricName == "" { + return 0, errMetricNameNotFound + } + + // The `up`, `target_info`, `otel_scope_info` metrics should never generate native histograms, + // thus we don't check for them here as opposed to the Append function. + + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + if !existing { + curMF.mtype = pmetric.MetricTypeExponentialHistogram + } else if curMF.mtype != pmetric.MetricTypeExponentialHistogram { + // Already scraped as classic histogram. + return 0, nil + } + + if h != nil && h.CounterResetHint == histogram.GaugeType || fh != nil && fh.CounterResetHint == histogram.GaugeType { + t.logger.Warn("dropping unsupported gauge histogram datapoint", zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + err := curMF.addExponentialHistogramSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, h, fh) + if err != nil { + t.logger.Warn("failed to add histogram datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + return 0, nil // never return errors, as that fails the whole scrape } func (t *transaction) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index fde7c6a07000..2956ec2e3b12 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -6,14 +6,17 @@ package internal import ( "context" "errors" + "fmt" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -53,42 +56,89 @@ var ( ) func TestTransactionCommitWithoutAdding(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitWithoutAdding(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitWithoutAdding(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Commit()) } func TestTransactionRollbackDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionRollbackDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionRollbackDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Rollback()) } func TestTransactionUpdateMetadataDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionUpdateMetadataDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionUpdateMetadataDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{}) assert.NoError(t, err) } func TestTransactionAppendNoTarget(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoTarget(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoTarget(t *testing.T, enableNativeHistograms bool) { badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test") - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0) assert.Error(t, err) } func TestTransactionAppendNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoMetricName(t *testing.T, enableNativeHistograms bool) { jobNotFoundLb := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", }) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0) assert.ErrorIs(t, err, errMetricNameNotFound) - assert.ErrorIs(t, tr.Commit(), errNoDataToBuild) } func TestTransactionAppendEmptyMetricName(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendEmptyMetricName(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", @@ -98,8 +148,16 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) { } func TestTransactionAppendResource(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendResource(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendResource(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -121,8 +179,16 @@ func TestTransactionAppendResource(t *testing.T) { } func TestReceiverVersionAndNameAreAttached(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testReceiverVersionAndNameAreAttached(t, enableNativeHistograms) + }) + } +} + +func testReceiverVersionAndNameAreAttached(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -143,6 +209,14 @@ func TestReceiverVersionAndNameAreAttached(t *testing.T) { } func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitErrorWhenAdjusterError(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitErrorWhenAdjusterError(t *testing.T, enableNativeHistograms bool) { goodLabels := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -150,7 +224,7 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { }) sink := new(consumertest.MetricsSink) adjusterErr := errors.New("adjuster error") - tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0) assert.NoError(t, err) assert.ErrorIs(t, tr.Commit(), adjusterErr) @@ -158,8 +232,16 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestTransactionAppendDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) dupLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -176,6 +258,14 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) { } func TestTransactionAppendHistogramNoLe(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendHistogramNoLe(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendHistogramNoLe(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -188,6 +278,7 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -206,6 +297,14 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { } func TestTransactionAppendSummaryNoQuantile(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendSummaryNoQuantile(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendSummaryNoQuantile(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -218,6 +317,7 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -236,6 +336,14 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { } func TestTransactionAppendValidAndInvalid(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendValidAndInvalid(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendValidAndInvalid(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -248,6 +356,7 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) // a valid counter @@ -281,8 +390,16 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { } func TestAppendExemplarWithNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -294,8 +411,16 @@ func TestAppendExemplarWithNoMetricName(t *testing.T) { } func TestAppendExemplarWithEmptyMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -307,8 +432,16 @@ func TestAppendExemplarWithEmptyMetricName(t *testing.T) { } func TestAppendExemplarWithDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -323,8 +456,16 @@ func TestAppendExemplarWithDuplicateLabels(t *testing.T) { } func TestAppendExemplarWithoutAddingMetric(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithoutAddingMetric(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithoutAddingMetric(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -337,16 +478,32 @@ func TestAppendExemplarWithoutAddingMetric(t *testing.T) { } func TestAppendExemplarWithNoLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) } func TestAppendExemplarWithEmptyLabelArray(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyLabelArray(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyLabelArray(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.AppendExemplar(0, labels.FromStrings(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) @@ -575,9 +732,11 @@ func TestMetricBuilderCounters(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -798,9 +957,11 @@ func TestMetricBuilderGauges(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -894,9 +1055,11 @@ func TestMetricBuilderUntyped(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1312,9 +1475,12 @@ func TestMetricBuilderHistogram(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + // None of the histograms above have native histogram versions, so enabling native hisotgrams has no effect. + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1454,30 +1620,137 @@ func TestMetricBuilderSummary(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } +func TestMetricBuilderNativeHistogram(t *testing.T) { + for _, enableNativeHistograms := range []bool{false, true} { + emptyH := &histogram.Histogram{ + Schema: 1, + Count: 0, + Sum: 0, + ZeroThreshold: 0.001, + ZeroCount: 0, + } + h0 := tsdbutil.GenerateTestHistogram(0) + + tests := []buildTestData{ + { + name: "empty integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", emptyH, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(0) + pt0.SetSum(0) + pt0.SetZeroThreshold(0.001) + pt0.SetScale(1) + + return []pmetric.Metrics{md0} + }, + }, + { + name: "integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", h0, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(12) + pt0.SetSum(18.4) + pt0.SetScale(1) + pt0.SetZeroThreshold(0.001) + pt0.SetZeroCount(2) + pt0.Positive().SetOffset(-1) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(2) + pt0.Positive().BucketCounts().Append(0) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(1) + pt0.Negative().SetOffset(-1) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(2) + pt0.Negative().BucketCounts().Append(0) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(1) + + return []pmetric.Metrics{md0} + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } + } +} + type buildTestData struct { name string inputs []*testScrapedPage wants func() []pmetric.Metrics } -func (tt buildTestData) run(t *testing.T) { +func (tt buildTestData) run(t *testing.T, enableNativeHistograms bool) { wants := tt.wants() assert.EqualValues(t, len(wants), len(tt.inputs)) st := ts for i, page := range tt.inputs { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) for _, pt := range page.pts { // set ts for testing pt.t = st - _, err := tr.Append(0, pt.lb, pt.t, pt.v) + var err error + switch { + case pt.fh != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, nil, pt.fh) + case pt.h != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, pt.h, nil) + default: + _, err = tr.Append(0, pt.lb, pt.t, pt.v) + } assert.NoError(t, err) for _, e := range pt.exemplars { @@ -1534,7 +1807,12 @@ func (s *startTimeAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { for l := 0; l < dps.Len(); l++ { dps.At(l).SetStartTimestamp(s.startTime) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dps.At(l).SetStartTimestamp(s.startTime) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } @@ -1546,6 +1824,8 @@ type testDataPoint struct { lb labels.Labels t int64 v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram exemplars []exemplar.Exemplar } @@ -1568,6 +1848,13 @@ func createDataPoint(mname string, value float64, es []exemplar.Exemplar, tagPai } } +func createHistogramDataPoint(mname string, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar, tagPairs ...string) *testDataPoint { + dataPoint := createDataPoint(mname, 0, es, tagPairs...) + dataPoint.h = h + dataPoint.fh = fh + return dataPoint +} + func assertEquivalentMetrics(t *testing.T, want, got pmetric.Metrics) { require.Equal(t, want.ResourceMetrics().Len(), got.ResourceMetrics().Len()) if want.ResourceMetrics().Len() == 0 { diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index bcd2192a55fe..771e0dfaca9d 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -233,6 +233,13 @@ func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config } func (r *pReceiver) applyCfg(cfg *PromConfig) error { + if !enableNativeHistogramsGate.IsEnabled() { + // Enforce scraping classic histograms to avoid dropping them. + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + } + if err := r.scrapeManager.ApplyConfig((*config.Config)(cfg)); err != nil { return err } @@ -284,6 +291,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log r.cfg.UseStartTimeMetric, startTimeMetricRegex, useCreatedMetricGate.IsEnabled(), + enableNativeHistogramsGate.IsEnabled(), r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, r.cfg.TrimMetricSuffixes, ) diff --git a/receiver/prometheusreceiver/metrics_receiver_helper_test.go b/receiver/prometheusreceiver/metrics_receiver_helper_test.go index 332183ee0fa8..41ed19989a0a 100644 --- a/receiver/prometheusreceiver/metrics_receiver_helper_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_helper_test.go @@ -283,6 +283,12 @@ func isFirstFailedScrape(metrics []pmetric.Metric, normalizedNames bool) bool { return false } } + case pmetric.MetricTypeExponentialHistogram: + for i := 0; i < m.ExponentialHistogram().DataPoints().Len(); i++ { + if !m.ExponentialHistogram().DataPoints().At(i).Flags().NoRecordedValue() { + return false + } + } case pmetric.MetricTypeHistogram: for i := 0; i < m.Histogram().DataPoints().Len(); i++ { if !m.Histogram().DataPoints().At(i).Flags().NoRecordedValue() { @@ -295,7 +301,7 @@ func isFirstFailedScrape(metrics []pmetric.Metric, normalizedNames bool) bool { return false } } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeEmpty: } } return true @@ -362,11 +368,13 @@ type metricTypeComparator func(*testing.T, pmetric.Metric) type numberPointComparator func(*testing.T, pmetric.NumberDataPoint) type histogramPointComparator func(*testing.T, pmetric.HistogramDataPoint) type summaryPointComparator func(*testing.T, pmetric.SummaryDataPoint) +type exponentialHistogramComparator func(*testing.T, pmetric.ExponentialHistogramDataPoint) type dataPointExpectation struct { - numberPointComparator []numberPointComparator - histogramPointComparator []histogramPointComparator - summaryPointComparator []summaryPointComparator + numberPointComparator []numberPointComparator + histogramPointComparator []histogramPointComparator + summaryPointComparator []summaryPointComparator + exponentialHistogramComparator []exponentialHistogramComparator } type testExpectation func(*testing.T, pmetric.ResourceMetrics) @@ -426,7 +434,12 @@ func assertMetricPresent(name string, metricTypeExpectations metricTypeComparato require.Equal(t, m.Summary().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Summary metric '%s' does not match to testdata", name) spc(t, m.Summary().DataPoints().At(i)) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + for _, ehc := range de.exponentialHistogramComparator { + require.Equal(t, m.ExponentialHistogram().DataPoints().Len(), len(dataPointExpectations), "Expected number of data-points in Exponential Histogram metric '%s' does not match to testdata", name) + ehc(t, m.ExponentialHistogram().DataPoints().At(i)) + } + case pmetric.MetricTypeEmpty: } } } @@ -521,6 +534,13 @@ func assertHistogramPointFlagNoRecordedValue() histogramPointComparator { } } +func assertExponentialHistogramPointFlagNoRecordedValue() exponentialHistogramComparator { + return func(t *testing.T, histogramDataPoint pmetric.ExponentialHistogramDataPoint) { + assert.True(t, histogramDataPoint.Flags().NoRecordedValue(), + "Datapoint flag for staleness marker not found as expected") + } +} + func assertSummaryPointFlagNoRecordedValue() summaryPointComparator { return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) { assert.True(t, summaryDataPoint.Flags().NoRecordedValue(), @@ -586,6 +606,19 @@ func compareHistogram(count uint64, sum float64, upperBounds []float64, buckets } } +func compareExponentialHistogram(scale int32, count uint64, sum float64, zeroCount uint64, negativeOffset int32, negativeBuckets []uint64, positiveOffset int32, positiveBuckets []uint64) exponentialHistogramComparator { + return func(t *testing.T, exponentialHistogramDataPoint pmetric.ExponentialHistogramDataPoint) { + assert.Equal(t, scale, exponentialHistogramDataPoint.Scale(), "Exponential Histogram scale value does not match") + assert.Equal(t, count, exponentialHistogramDataPoint.Count(), "Exponential Histogram count value does not match") + assert.Equal(t, sum, exponentialHistogramDataPoint.Sum(), "Exponential Histogram sum value does not match") + assert.Equal(t, zeroCount, exponentialHistogramDataPoint.ZeroCount(), "Exponential Histogram zero count value does not match") + assert.Equal(t, negativeOffset, exponentialHistogramDataPoint.Negative().Offset(), "Exponential Histogram negative offset value does not match") + assert.Equal(t, negativeBuckets, exponentialHistogramDataPoint.Negative().BucketCounts().AsRaw(), "Exponential Histogram negative bucket count values do not match") + assert.Equal(t, positiveOffset, exponentialHistogramDataPoint.Positive().Offset(), "Exponential Histogram positive offset value does not match") + assert.Equal(t, positiveBuckets, exponentialHistogramDataPoint.Positive().BucketCounts().AsRaw(), "Exponential Histogram positive bucket count values do not match") + } +} + func compareSummary(count uint64, sum float64, quantiles [][]float64) summaryPointComparator { return func(t *testing.T, summaryDataPoint pmetric.SummaryDataPoint) { assert.Equal(t, count, summaryDataPoint.Count(), "Summary count value does not match") diff --git a/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go b/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go index 5aa5ce15afd8..0b16a10c1c1a 100644 --- a/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_protobuf_test.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/prometheus/config" dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -157,3 +159,455 @@ func TestScrapeViaProtobuf(t *testing.T) { c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} }) } + +func TestNativeVsClassicHistogramScrapeViaProtobuf(t *testing.T) { + classicHistogram := &dto.MetricFamily{ + Name: "test_classic_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + Bucket: []dto.Bucket{ + { + UpperBound: 0.5, + CumulativeCount: 789, + }, + { + UpperBound: 10, + CumulativeCount: 1011, + }, + { + UpperBound: math.Inf(1), + CumulativeCount: 1213, + }, + }, + }, + }, + }, + } + buffer := prometheusMetricFamilyToProtoBuf(t, nil, classicHistogram) + + mixedHistogram := &dto.MetricFamily{ + Name: "test_mixed_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + Bucket: []dto.Bucket{ + { + UpperBound: 0.5, + CumulativeCount: 789, + }, + { + UpperBound: 10, + CumulativeCount: 1011, + }, + { + UpperBound: math.Inf(1), + CumulativeCount: 1213, + }, + }, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 2, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer, mixedHistogram) + + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1214, + SampleSum: 3456, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 5, + NegativeSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: 3, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer, nativeHistogram) + + testCases := map[string]struct { + mutCfg func(*PromConfig) + enableNativeHistograms bool + expected []testExpectation + }{ + "feature enabled scrape classic off": { + enableNativeHistograms: true, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape native buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 1}), + }, + }}, + ), + assertMetricPresent( // Scrape native only histograms as is. + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1214, 3456, 5, -3, []uint64{1, 0, 2}, 2, []uint64{1, 0, 0, 1}), + }, + }}, + ), + }, + }, + "feature disabled scrape classic off": { + enableNativeHistograms: false, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Fallback to scraping classic histograms if feature is off. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + // When the native histograms feature is off, no native histograms are scraped. + assertMetricAbsent("test_native_histogram"), + }, + }, + "feature enabled scrape classic on": { + mutCfg: func(cfg *PromConfig) { + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + }, + enableNativeHistograms: true, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape classic buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Scrape native only histograms as is. + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1214, 3456, 5, -3, []uint64{1, 0, 2}, 2, []uint64{1, 0, 0, 1}), + }, + }}, + ), + }, + }, + "feature disabled scrape classic on": { + mutCfg: func(cfg *PromConfig) { + for _, scrapeConfig := range cfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + }, + enableNativeHistograms: false, + expected: []testExpectation{ + assertMetricPresent( // Scrape classic only histograms as is. + "test_classic_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + assertMetricPresent( // Only scrape classic buckets from mixed histograms. + "test_mixed_histogram", + compareMetricType(pmetric.MetricTypeHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + histogramPointComparator: []histogramPointComparator{ + compareHistogram(1213, 456, []float64{0.5, 10}, []uint64{789, 222, 202}), + }, + }}, + ), + // When the native histograms feature is off, no native histograms are scraped. + assertMetricAbsent("test_native_histogram"), + }, + }, + } + + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", tc.enableNativeHistograms) + require.NoError(t, err) + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target1", td.attributes, result[0], tc.expected) + }, + }, + } + mutCfg := tc.mutCfg + if mutCfg == nil { + mutCfg = func(*PromConfig) {} + } + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }, mutCfg) + }) + } +} + +func TestStaleExponentialHistogram(t *testing.T) { + mf := &dto.MetricFamily{ + Name: "test_counter", + Type: dto.MetricType_COUNTER, + Metric: []dto.Metric{ + { + Label: []dto.LabelPair{ + { + Name: "foo", + Value: "bar", + }, + }, + Counter: &dto.Counter{ + Value: 1234, + }, + }, + }, + } + buffer1 := prometheusMetricFamilyToProtoBuf(t, nil, mf) + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCount: 1213, + SampleSum: 456, + // Integer counter histogram definition + Schema: 3, + ZeroThreshold: 0.001, + ZeroCount: 2, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeDelta: []int64{1, 1}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveDelta: []int64{1, 0}, + }, + }, + }, + } + prometheusMetricFamilyToProtoBuf(t, buffer1, nativeHistogram) + + mf = &dto.MetricFamily{ + Name: "test_counter", + Type: dto.MetricType_COUNTER, + Metric: []dto.Metric{ + { + Label: []dto.LabelPair{ + { + Name: "foo", + Value: "bar", + }, + }, + Counter: &dto.Counter{ + Value: 2222, + }, + }, + }, + } + buffer2 := prometheusMetricFamilyToProtoBuf(t, nil, mf) + + expectations1 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(3, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 0, 1}), + }, + }}, + ), + } + + expectations2 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + assertExponentialHistogramPointFlagNoRecordedValue(), + }, + }}, + ), + } + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer1.Bytes()}, + {code: 200, useProtoBuf: true, buf: buffer2.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target11", td.attributes, result[0], expectations1) + doCompare(t, "target12", td.attributes, result[1], expectations2) + }, + }, + } + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", true) + require.NoError(t, err) + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }) +} + +func TestFloatCounterHistogram(t *testing.T) { + nativeHistogram := &dto.MetricFamily{ + Name: "test_native_histogram", + Type: dto.MetricType_HISTOGRAM, + Metric: []dto.Metric{ + { + Histogram: &dto.Histogram{ + SampleCountFloat: 1213.0, + SampleSum: 456, + // Float counter histogram definition + Schema: -1, + ZeroThreshold: 0.001, + ZeroCountFloat: 2.0, + NegativeSpan: []dto.BucketSpan{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 1}, + }, + NegativeCount: []float64{1.5, 2.5}, + PositiveSpan: []dto.BucketSpan{ + {Offset: -2, Length: 1}, + {Offset: 2, Length: 1}, + }, + PositiveCount: []float64{1.0, 3.0}, + }, + }, + }, + } + buffer := prometheusMetricFamilyToProtoBuf(t, nil, nativeHistogram) + + expectations1 := []testExpectation{ + assertMetricPresent( + "test_native_histogram", + compareMetricType(pmetric.MetricTypeExponentialHistogram), + compareMetricUnit(""), + []dataPointExpectation{{ + exponentialHistogramComparator: []exponentialHistogramComparator{ + compareExponentialHistogram(-1, 1213, 456, 2, -1, []uint64{1, 0, 2}, -3, []uint64{1, 0, 0, 3}), + }, + }}, + ), + } + + targets := []*testData{ + { + name: "target1", + pages: []mockPrometheusResponse{ + {code: 200, useProtoBuf: true, buf: buffer.Bytes()}, + }, + validateFunc: func(t *testing.T, td *testData, result []pmetric.ResourceMetrics) { + verifyNumValidScrapeResults(t, td, result) + doCompare(t, "target1", td.attributes, result[0], expectations1) + }, + }, + } + err := featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", true) + require.NoError(t, err) + defer func() { + _ = featuregate.GlobalRegistry().Set("receiver.prometheusreceiver.EnableNativeHistograms", false) + }() + testComponent(t, targets, func(c *Config) { + c.PrometheusConfig.GlobalConfig.ScrapeProtocols = []config.ScrapeProtocol{config.PrometheusProto} + }) +}