From 73b5f3cce37180e0498832b2fdba970d879e286f Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 26 Jul 2021 18:51:28 -0700 Subject: [PATCH] receiver/prometheus: roundtrip Prometheus->Pdata direct conversion Wire up and use the direct Prometheus->Pdata conversion end to end. With this change the receiver will no longer need OpenCensus. This change will involve more follow-ups that just migrate over the tests, because we don't want a super bloated/massive PR. Fixes #3691 Depends on PR #3694 Depends on PR #3695 --- .../internal/metricsbuilder_test.go | 155 ------- .../prometheusreceiver/internal/ocastore.go | 26 +- .../internal/otlp_metricfamily.go | 16 +- .../internal/otlp_metricfamily_test.go | 5 +- .../internal/otlp_metrics_adjuster.go | 384 ++++++++++++++++++ .../internal/otlp_metricsbuilder.go | 29 +- .../internal/otlp_metricsbuilder_test.go | 314 +++++++++++++- .../internal/otlp_transaction.go | 205 ++++++++++ .../internal/prom_to_otlp.go | 4 +- .../internal/prom_to_otlp_test.go | 6 +- .../internal/staleness_end_to_end_test.go | 1 + .../prometheusreceiver/metrics_receiver.go | 4 +- 12 files changed, 966 insertions(+), 183 deletions(-) create mode 100644 receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go create mode 100644 receiver/prometheusreceiver/internal/otlp_transaction.go diff --git a/receiver/prometheusreceiver/internal/metricsbuilder_test.go b/receiver/prometheusreceiver/internal/metricsbuilder_test.go index d398ace689a..7047e402541 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder_test.go @@ -194,161 +194,6 @@ func Test_startTimeMetricMatch(t *testing.T) { runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime) } -func Test_metricBuilder_counters(t *testing.T) { - tests := []buildTestData{ - { - name: "single-item", - inputs: []*testScrapedPage{ - { - pts: []*testDataPoint{ - createDataPoint("counter_test", 100, "foo", "bar"), - }, - }, - }, - wants: [][]*metricspb.Metric{ - { - { - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "counter_test", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, - LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, - }, - }, - }, - }, - }, - }, - }, - { - name: "two-items", - inputs: []*testScrapedPage{ - { - pts: []*testDataPoint{ - createDataPoint("counter_test", 150, "foo", "bar"), - createDataPoint("counter_test", 25, "foo", "other"), - }, - }, - }, - wants: [][]*metricspb.Metric{ - { - { - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "counter_test", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, - LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 150.0}}, - }, - }, - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "other", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 25.0}}, - }, - }, - }, - }, - }, - }, - }, - { - name: "two-metrics", - inputs: []*testScrapedPage{ - { - pts: []*testDataPoint{ - createDataPoint("counter_test", 150, "foo", "bar"), - createDataPoint("counter_test", 25, "foo", "other"), - createDataPoint("counter_test2", 100, "foo", "bar"), - }, - }, - }, - wants: [][]*metricspb.Metric{ - { - { - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "counter_test", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, - LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 150.0}}, - }, - }, - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "other", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 25.0}}, - }, - }, - }, - }, - { - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "counter_test2", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, - LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, - }, - }, - }, - }, - }, - }, - }, - { - name: "metrics-with-poor-names", - inputs: []*testScrapedPage{ - { - pts: []*testDataPoint{ - createDataPoint("poor_name_count", 100, "foo", "bar"), - }, - }, - }, - wants: [][]*metricspb.Metric{ - { - { - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "poor_name_count", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, - LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: timestampFromMs(startTs), - LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, - Points: []*metricspb.Point{ - {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, - }, - }, - }, - }, - }, - }, - }, - } - - runBuilderTests(t, tests) -} - func Test_metricBuilder_gauges(t *testing.T) { tests := []buildTestData{ { diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index b7ed0427b18..86fe6c4633f 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -45,7 +45,7 @@ type OcaStore struct { running int32 // access atomically sink consumer.Metrics mc *metadataService - jobsMap *JobsMap + jobsMap *JobsMapPdata useStartTimeMetric bool startTimeMetricRegex string receiverID config.ComponentID @@ -60,7 +60,7 @@ func NewOcaStore( ctx context.Context, sink consumer.Metrics, logger *zap.Logger, - jobsMap *JobsMap, + jobsMap *JobsMapPdata, useStartTimeMetric bool, startTimeMetricRegex string, receiverID config.ComponentID, @@ -93,17 +93,19 @@ func (o *OcaStore) Appender(context.Context) storage.Appender { // Firstly prepare the stalenessStore for a new scrape cyle. o.stalenessStore.refresh() - return newTransaction( + return newTransactionPdata( o.ctx, - o.jobsMap, - o.useStartTimeMetric, - o.startTimeMetricRegex, - o.receiverID, - o.mc, - o.sink, - o.externalLabels, - o.logger, - o.stalenessStore, + &txConfig{ + jobsMap: o.jobsMap, + useStartTimeMetric: o.useStartTimeMetric, + startTimeMetricRegex: o.startTimeMetricRegex, + receiverID: o.receiverID, + ms: o.mc, + sink: o.sink, + externalLabels: o.externalLabels, + logger: o.logger, + stalenessStore: o.stalenessStore, + }, ) } else if state == runningStateInit { panic("ScrapeManager is not set") diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index 427309e2dde..bf08c21fb23 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -274,6 +274,9 @@ func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata { func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) { metric := pdata.NewMetric() + metric.SetDataType(mf.mtype) + metric.SetName(mf.name) + pointCount := 0 switch mf.mtype { @@ -297,7 +300,18 @@ func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int } pointCount = sdpL.Len() - default: + case pdata.MetricDataTypeSum: + sum := metric.Sum() + sdpL := sum.DataPoints() + for _, mg := range mf.getGroups() { + if !mg.toNumberDataPoint(mf.labelKeysOrdered, &sdpL) { + mf.droppedTimeseries++ + } + } + pointCount = sdpL.Len() + + default: // Everything else should be set to a Gauge. + metric.SetDataType(pdata.MetricDataTypeGauge) gauge := metric.Gauge() gdpL := gauge.DataPoints() for _, mg := range mf.getGroups() { diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index b5c0d2c0c42..fb345287354 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -96,7 +96,10 @@ func TestIsCumulativeEquivalence(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) - assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative") + msg := fmt.Sprintf("\n%q::mf.isCumulativeType()=%t\n%q::mp.isCumulativeType()=%t\n", + mf.mtype, mf.isCumulativeType(), + mfp.mtype, mfp.isCumulativeTypePdata()) + assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative "+msg) assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily") assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily") }) diff --git a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go new file mode 100644 index 00000000000..7ee3052cec1 --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go @@ -0,0 +1,384 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "fmt" + "strings" + "sync" + "time" + + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +// Notes on garbage collection (gc): +// +// Job-level gc: +// The Prometheus receiver will likely execute in a long running service whose lifetime may exceed +// the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from +// leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that +// haven't been accessed for a long period of time. +// +// Timeseries-level gc: +// Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics +// from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries +// of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't +// been accessed for a long period of time. +// +// The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed, +// it is marked. Similarly, each time a timeseriesinfo is accessed, it is also marked. +// +// At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval', +// the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap +// otherwise the timeseriesMap is gc'd +// +// The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesinfo +// in the map, if it has not been marked, it is removed otherwise it is unmarked. +// +// Alternative Strategies +// 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can +// be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially +// adds more contention and latency to each scrape so the current approach is used. Note that +// the go routine will need to be cancelled upon Shutdown(). +// 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention, +// the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetrics(). This +// approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to +// timeseriesMap so the current approach is used instead. + +// timeseriesinfo contains the information necessary to adjust from the initial point and to detect +// resets. +type timeseriesinfoPdata struct { + mark bool + initial *pdata.Metric + previous *pdata.Metric +} + +// timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for +// the instance. +type timeseriesMapPdata struct { + sync.RWMutex + mark bool + tsiMap map[string]*timeseriesinfoPdata +} + +// Get the timeseriesinfo for the timeseries associated with the metric and label values. +func (tsm *timeseriesMapPdata) get(metric *pdata.Metric, kv pdata.StringMap) *timeseriesinfoPdata { + name := metric.Name() + sig := getTimeseriesSignaturePdata(name, kv) + tsi, ok := tsm.tsiMap[sig] + if !ok { + tsi = ×eriesinfoPdata{} + tsm.tsiMap[sig] = tsi + } + tsm.mark = true + tsi.mark = true + return tsi +} + +// Create a unique timeseries signature consisting of the metric name and label values. +func getTimeseriesSignaturePdata(name string, kv pdata.StringMap) string { + labelValues := make([]string, 0, kv.Len()) + kv.Sort().Range(func(_, value string) bool { + if value != "" { + labelValues = append(labelValues, value) + } + return true + }) + return fmt.Sprintf("%s,%s", name, strings.Join(labelValues, ",")) +} + +// Remove timeseries that have aged out. +func (tsm *timeseriesMapPdata) gc() { + tsm.Lock() + defer tsm.Unlock() + // this shouldn't happen under the current gc() strategy + if !tsm.mark { + return + } + for ts, tsi := range tsm.tsiMap { + if !tsi.mark { + delete(tsm.tsiMap, ts) + } else { + tsi.mark = false + } + } + tsm.mark = false +} + +func newTimeseriesMapPdata() *timeseriesMapPdata { + return ×eriesMapPdata{mark: true, tsiMap: map[string]*timeseriesinfoPdata{}} +} + +// JobsMapPdata maps from a job instance to a map of timeseriesPdata instances for the job. +type JobsMapPdata struct { + sync.RWMutex + gcInterval time.Duration + lastGC time.Time + jobsMap map[string]*timeseriesMapPdata +} + +// NewJobsMap creates a new (empty) JobsMapPdata. +func NewJobsMapPdata(gcInterval time.Duration) *JobsMapPdata { + return &JobsMapPdata{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMapPdata)} +} + +// Remove jobs and timeseries that have aged out. +func (jm *JobsMapPdata) gc() { + jm.Lock() + defer jm.Unlock() + // once the structure is locked, confirm that gc() is still necessary + if time.Since(jm.lastGC) > jm.gcInterval { + for sig, tsm := range jm.jobsMap { + tsm.RLock() + tsmNotMarked := !tsm.mark + tsm.RUnlock() + if tsmNotMarked { + delete(jm.jobsMap, sig) + } else { + tsm.gc() + } + } + jm.lastGC = time.Now() + } +} + +func (jm *JobsMapPdata) maybeGC() { + // speculatively check if gc() is necessary, recheck once the structure is locked + jm.RLock() + defer jm.RUnlock() + if time.Since(jm.lastGC) > jm.gcInterval { + go jm.gc() + } +} + +func (jm *JobsMapPdata) get(job, instance string) *timeseriesMapPdata { + sig := job + ":" + instance + jm.RLock() + tsm, ok := jm.jobsMap[sig] + jm.RUnlock() + defer jm.maybeGC() + if ok { + return tsm + } + jm.Lock() + defer jm.Unlock() + tsm2, ok2 := jm.jobsMap[sig] + if ok2 { + return tsm2 + } + tsm2 = newTimeseriesMapPdata() + jm.jobsMap[sig] = tsm2 + return tsm2 +} + +// MetricsAdjusterPdata takes a map from a metric instance to the initial point in the metrics instance +// and provides AdjustMetrics, which takes a sequence of metrics and adjust their start times based on +// the initial points. +type MetricsAdjusterPdata struct { + tsm *timeseriesMapPdata + logger *zap.Logger +} + +// NewMetricsAdjuster is a constructor for MetricsAdjuster. +func NewMetricsAdjusterPdata(tsm *timeseriesMapPdata, logger *zap.Logger) *MetricsAdjusterPdata { + return &MetricsAdjusterPdata{ + tsm: tsm, + logger: logger, + } +} + +// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and +// previous points in the timeseriesMap. +// Returns the total number of timeseries that had reset start times. +func (ma *MetricsAdjusterPdata) AdjustMetrics(metricL *pdata.MetricSlice) int { + resets := 0 + ma.tsm.Lock() + defer ma.tsm.Unlock() + for i := 0; i < metricL.Len(); i++ { + metric := metricL.At(i) + resets += ma.adjustMetric(&metric) + } + return resets +} + +// Returns the number of timeseries with reset start times. +func (ma *MetricsAdjusterPdata) adjustMetric(metric *pdata.Metric) int { + switch metric.DataType() { + case pdata.MetricDataTypeGauge: + // gauges don't need to be adjusted so no additional processing is necessary + return 0 + default: + return ma.adjustMetricPoints(metric) + } +} + +// Returns the number of timeseries that had reset start times. +func (ma *MetricsAdjusterPdata) adjustMetricPoints(metric *pdata.Metric) int { + switch dataType := metric.DataType(); dataType { + case pdata.MetricDataTypeGauge: + return ma.adjustMetricGauge(metric) + + case pdata.MetricDataTypeHistogram: + return ma.adjustMetricHistogram(metric) + + case pdata.MetricDataTypeSummary: + return ma.adjustMetricSummary(metric) + + default: + // this shouldn't happen + ma.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String())) + return 0 + } + + /* + resets := 0 + filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries())) + for _, current := range metric.GetTimeseries() { + tsi := ma.tsm.get(metric, current.GetLabelValues()) + if tsi.initial == nil || !ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) { + // initial || reset timeseries + tsi.initial = current + resets++ + } + tsi.previous = current + filtered = append(filtered, current) + } + metric.Timeseries = filtered + return resets + */ +} + +// Returns true if 'current' was adjusted and false if 'current' is an the initial occurrence or a +// reset of the timeseries. +func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pdata.Metric) (resets int) { + currentPoints := current.Gauge().DataPoints() + + for i := 0; i < currentPoints.Len(); i++ { + currentGauge := currentPoints.At(i) + tsi := ma.tsm.get(current, currentGauge.LabelsMap()) + if tsi.initial == nil { + // initial || reset timeseries. + tsi.initial = current + resets++ + } + initialPoints := tsi.initial.Gauge().DataPoints() + previousPoints := tsi.previous.Gauge().DataPoints() + if i >= initialPoints.Len() || i >= previousPoints.Len() { + ma.logger.Info("Adjusting Points, all lengths should be equal", + zap.Int("len(current)", currentPoints.Len()), + zap.Int("len(initial)", initialPoints.Len()), + zap.Int("len(previous)", previousPoints.Len())) + // initial || reset timeseries. + tsi.initial = current + resets++ + continue + } + + currentGauge, previousGauge := currentPoints.At(i), previousPoints.At(i) + if currentGauge.DoubleVal() < previousGauge.DoubleVal() { + // reset detected + tsi.initial = current + continue + } + initialGauge := initialPoints.At(i) + currentGauge.SetStartTimestamp(initialGauge.StartTimestamp()) + resets++ + } + return +} + +func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (resets int) { + // note: sum of squared deviation not currently supported + currentPoints := current.Histogram().DataPoints() + + for i := 0; i < currentPoints.Len(); i++ { + currentDist := currentPoints.At(i) + tsi := ma.tsm.get(current, currentDist.LabelsMap()) + if tsi.initial == nil { + // initial || reset timeseries. + tsi.initial = current + resets++ + continue + } + initialPoints := tsi.initial.Histogram().DataPoints() + previousPoints := tsi.previous.Histogram().DataPoints() + if i >= initialPoints.Len() || i >= previousPoints.Len() { + ma.logger.Info("Adjusting Points, all lengths should be equal", + zap.Int("len(current)", currentPoints.Len()), + zap.Int("len(initial)", initialPoints.Len()), + zap.Int("len(previous)", previousPoints.Len())) + // initial || reset timeseries. + tsi.initial = current + resets++ + continue + } + + previousDist := previousPoints.At(i) + if currentDist.Count() < previousDist.Count() || currentDist.Sum() < previousDist.Sum() { + // reset detected + tsi.initial = current + resets++ + continue + } + initialDist := initialPoints.At(i) + currentDist.SetStartTimestamp(initialDist.StartTimestamp()) + } + return +} + +func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (resets int) { + currentPoints := current.Summary().DataPoints() + + for i := 0; i < currentPoints.Len(); i++ { + currentSummary := currentPoints.At(i) + tsi := ma.tsm.get(current, currentSummary.LabelsMap()) + if tsi.initial == nil { + // initial || reset timeseries. + tsi.initial = current + resets++ + continue + } + initialPoints := tsi.initial.Summary().DataPoints() + previousPoints := tsi.previous.Summary().DataPoints() + if i >= initialPoints.Len() || i >= previousPoints.Len() { + ma.logger.Info("Adjusting Points, all lengths should be equal", + zap.Int("len(current)", currentPoints.Len()), + zap.Int("len(initial)", initialPoints.Len()), + zap.Int("len(previous)", previousPoints.Len())) + tsi.initial = current + resets++ + continue + } + + previousSummary := previousPoints.At(i) + if (currentSummary.Count() != 0 && + previousSummary.Count() != 0 && + currentSummary.Count() < previousSummary.Count()) || + + (currentSummary.Sum() != 0 && + previousSummary.Sum() != 0 && + currentSummary.Sum() < previousSummary.Sum()) { + // reset detected + tsi.initial = current + resets++ + continue + } + initialSummary := initialPoints.At(i) + currentSummary.SetStartTimestamp(initialSummary.StartTimestamp()) + } + + return +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go index 7b2da6d82bd..ba0ee3ac24e 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go @@ -89,7 +89,7 @@ type metricBuilderPdata struct { } // newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus -// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object +// scraped page by calling its AddDataPoint function, and turn them into a pdata.Metrics object. // by calling its Build function func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilderPdata { var regex *regexp.Regexp @@ -167,9 +167,9 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) b.hasData = true if b.currentMf != nil && !b.currentMf.IsSameFamily(metricName) { - ts, dts := b.currentMf.ToMetricPdata(&b.metrics) - b.numTimeseries += ts - b.droppedTimeseries += dts + nTs, nDts := b.currentMf.ToMetricPdata(&b.metrics) + b.numTimeseries += nTs + b.droppedTimeseries += nDts b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs) } else if b.currentMf == nil { b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs) @@ -177,3 +177,24 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) return b.currentMf.Add(metricName, ls, t, v) } + +// Build an pdata.MetricSlice based on all added data complexValue. +// The only error returned by this function is errNoDataToBuild. +func (b *metricBuilderPdata) Build() (*pdata.MetricSlice, int, int, error) { + if !b.hasData { + if b.hasInternalMetric { + metricsL := pdata.NewMetricSlice() + return &metricsL, 0, 0, nil + } + return nil, 0, 0, errNoDataToBuild + } + + if b.currentMf != nil { + ts, dts := b.currentMf.ToMetricPdata(&b.metrics) + b.numTimeseries += ts + b.droppedTimeseries += dts + b.currentMf = nil + } + + return &b.metrics, b.numTimeseries, b.droppedTimeseries, nil +} diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go index 48dc485d9f5..6935cac6ea1 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go @@ -198,8 +198,8 @@ func TestConvToPdataMetricType(t *testing.T) { }, { name: "textparse.gauge", - mtype: textparse.MetricTypeCounter, - want: pdata.MetricDataTypeSum, + mtype: textparse.MetricTypeGauge, + want: pdata.MetricDataTypeGauge, }, { name: "textparse.unknown", @@ -232,7 +232,7 @@ func TestConvToPdataMetricType(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { got := convToPdataMetricType(tt.mtype) - require.Equal(t, got, tt.want) + require.Equal(t, got.String(), tt.want.String()) }) } } @@ -326,3 +326,311 @@ func TestIsUsefulLabelPdata(t *testing.T) { }) } } + +type buildTestDataPdata struct { + name string + inputs []*testScrapedPage + wants func() []*pdata.MetricSlice +} + +func Test_OTLPMetricBuilder_counters(t *testing.T) { + startTsNanos := pdata.Timestamp(startTs * 1e6) + tests := []buildTestDataPdata{ + { + name: "single-item", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("counter_test", 100, "foo", "bar"), + }, + }, + }, + wants: func() []*pdata.MetricSlice { + mL := pdata.NewMetricSlice() + m0 := mL.AppendEmpty() + m0.SetName("counter_test") + m0.SetDataType(pdata.MetricDataTypeSum) + sum := m0.Sum() + pt0 := sum.DataPoints().AppendEmpty() + pt0.SetDoubleVal(100.0) + pt0.SetStartTimestamp(0) + pt0.SetTimestamp(startTsNanos) + pt0.LabelsMap().Insert("foo", "bar") + + return []*pdata.MetricSlice{&mL} + }, + }, + { + name: "two-items", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("counter_test", 150, "foo", "bar"), + createDataPoint("counter_test", 25, "foo", "other"), + }, + }, + }, + wants: func() []*pdata.MetricSlice { + mL := pdata.NewMetricSlice() + m0 := mL.AppendEmpty() + m0.SetName("counter_test") + m0.SetDataType(pdata.MetricDataTypeSum) + sum := m0.Sum() + pt0 := sum.DataPoints().AppendEmpty() + pt0.SetDoubleVal(150.0) + pt0.SetStartTimestamp(0) + pt0.SetTimestamp(startTsNanos) + pt0.LabelsMap().Insert("foo", "bar") + + pt1 := sum.DataPoints().AppendEmpty() + pt1.SetDoubleVal(25.0) + pt1.SetStartTimestamp(0) + pt1.SetTimestamp(startTsNanos) + pt1.LabelsMap().Insert("foo", "other") + + return []*pdata.MetricSlice{&mL} + }, + }, + { + name: "two-metrics", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("counter_test", 150, "foo", "bar"), + createDataPoint("counter_test", 25, "foo", "other"), + createDataPoint("counter_test2", 100, "foo", "bar"), + }, + }, + }, + wants: func() []*pdata.MetricSlice { + mL0 := pdata.NewMetricSlice() + m0 := mL0.AppendEmpty() + m0.SetName("counter_test") + m0.SetDataType(pdata.MetricDataTypeSum) + sum0 := m0.Sum() + pt0 := sum0.DataPoints().AppendEmpty() + pt0.SetDoubleVal(150.0) + pt0.SetStartTimestamp(0) + pt0.SetTimestamp(startTsNanos) + pt0.LabelsMap().Insert("foo", "bar") + + pt1 := sum0.DataPoints().AppendEmpty() + pt1.SetDoubleVal(25.0) + pt1.SetStartTimestamp(0) + pt1.SetTimestamp(startTsNanos) + pt1.LabelsMap().Insert("foo", "other") + + m1 := mL0.AppendEmpty() + m1.SetName("counter_test2") + m1.SetDataType(pdata.MetricDataTypeSum) + sum1 := m1.Sum() + pt2 := sum1.DataPoints().AppendEmpty() + pt2.SetDoubleVal(100.0) + pt2.SetStartTimestamp(0) + pt2.SetTimestamp(startTsNanos) + pt2.LabelsMap().Insert("foo", "bar") + + return []*pdata.MetricSlice{&mL0} + }, + }, + { + name: "metrics-with-poor-names", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("poor_name_count", 100, "foo", "bar"), + }, + }, + }, + wants: func() []*pdata.MetricSlice { + mL := pdata.NewMetricSlice() + m0 := mL.AppendEmpty() + m0.SetName("poor_name_count") + m0.SetDataType(pdata.MetricDataTypeSum) + sum := m0.Sum() + pt0 := sum.DataPoints().AppendEmpty() + pt0.SetDoubleVal(100.0) + pt0.SetStartTimestamp(0) + pt0.SetTimestamp(startTsNanos) + pt0.LabelsMap().Insert("foo", "bar") + + return []*pdata.MetricSlice{&mL} + }, + }, + } + + runBuilderTestsPdata(t, tests) +} + +func runBuilderTestsPdata(t *testing.T, tests []buildTestDataPdata) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + wants := tt.wants() + assert.EqualValues(t, len(wants), len(tt.inputs)) + mc := newMockMetadataCache(testMetadata) + st := startTs + for i, page := range tt.inputs { + b := newMetricBuilderPdata(mc, true, "", testLogger, dummyStalenessStore()) + b.startTime = defaultBuilderStartTime // set to a non-zero value + for _, pt := range page.pts { + // set ts for testing + pt.t = st + assert.NoError(t, b.AddDataPoint(pt.lb, pt.t, pt.v)) + } + metrics, _, _, err := b.Build() + assert.NoError(t, err) + assert.EqualValues(t, wants[i], metrics) + st += interval + } + }) + } +} + +func Test_OTLPMetricBuilder_gauges(t *testing.T) { + tests := []buildTestData{ + { + name: "one-gauge", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("gauge_test", 100, "foo", "bar"), + }, + }, + { + pts: []*testDataPoint{ + createDataPoint("gauge_test", 90, "foo", "bar"), + }, + }, + }, + wants: [][]*metricspb.Metric{ + { + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, + }, + }, + }, + }, + }, + { + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs + interval), Value: &metricspb.Point_DoubleValue{DoubleValue: 90.0}}, + }, + }, + }, + }, + }, + }, + }, + { + name: "gauge-with-different-tags", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("gauge_test", 100, "foo", "bar"), + createDataPoint("gauge_test", 200, "bar", "foo"), + }, + }, + }, + wants: [][]*metricspb.Metric{ + { + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "bar"}, {Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "foo", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 200.0}}, + }, + }, + }, + }, + }, + }, + }, + { + // TODO: A decision need to be made. If we want to have the behavior which can generate different tag key + // sets because metrics come and go + name: "gauge-comes-and-go-with-different-tagset", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createDataPoint("gauge_test", 100, "foo", "bar"), + createDataPoint("gauge_test", 200, "bar", "foo"), + }, + }, + { + pts: []*testDataPoint{ + createDataPoint("gauge_test", 20, "foo", "bar"), + }, + }, + }, + wants: [][]*metricspb.Metric{ + { + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "bar"}, {Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "", HasValue: false}, {Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 100.0}}, + }, + }, + { + LabelValues: []*metricspb.LabelValue{{Value: "foo", HasValue: true}, {Value: "", HasValue: false}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs), Value: &metricspb.Point_DoubleValue{DoubleValue: 200.0}}, + }, + }, + }, + }, + }, + { + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "gauge_test", + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + LabelKeys: []*metricspb.LabelKey{{Key: "foo"}}}, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{{Value: "bar", HasValue: true}}, + Points: []*metricspb.Point{ + {Timestamp: timestampFromMs(startTs + interval), Value: &metricspb.Point_DoubleValue{DoubleValue: 20.0}}, + }, + }, + }, + }, + }, + }, + }, + } + + runBuilderTests(t, tests) +} diff --git a/receiver/prometheusreceiver/internal/otlp_transaction.go b/receiver/prometheusreceiver/internal/otlp_transaction.go new file mode 100644 index 00000000000..ab74553c8ea --- /dev/null +++ b/receiver/prometheusreceiver/internal/otlp_transaction.go @@ -0,0 +1,205 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "math" + "sync/atomic" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/obsreport" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "go.uber.org/zap" +) + +type transactionPdata struct { + id int64 + startTimeMs int64 + isNew bool + ctx context.Context + useStartTimeMetric bool + startTimeMetricRegex string + sink consumer.Metrics + metadataService *metadataService + externalLabels labels.Labels + nodeResource *pdata.Resource + stalenessStore *stalenessStore + logger *zap.Logger + receiverID config.ComponentID + metricBuilder *metricBuilderPdata + job, instance string + jobsMap *JobsMapPdata + obsrecv *obsreport.Receiver +} + +type txConfig struct { + jobsMap *JobsMapPdata + useStartTimeMetric bool + startTimeMetricRegex string + receiverID config.ComponentID + ms *metadataService + sink consumer.Metrics + externalLabels labels.Labels + logger *zap.Logger + stalenessStore *stalenessStore +} + +func newTransactionPdata(ctx context.Context, txc *txConfig) *transactionPdata { + return &transactionPdata{ + id: atomic.AddInt64(&idSeq, 1), + ctx: ctx, + isNew: true, + sink: txc.sink, + jobsMap: txc.jobsMap, + useStartTimeMetric: txc.useStartTimeMetric, + startTimeMetricRegex: txc.startTimeMetricRegex, + receiverID: txc.receiverID, + metadataService: txc.ms, + externalLabels: txc.externalLabels, + logger: txc.logger, + obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: txc.receiverID, Transport: transport}), + stalenessStore: txc.stalenessStore, + startTimeMs: -1, + } +} + +// Append always returns 0 to disable label caching. +func (t *transactionPdata) Append(ref uint64, labels labels.Labels, atMs int64, value float64) (pointCount uint64, err error) { + if t.startTimeMs < 0 { + t.startTimeMs = atMs + } + if math.IsNaN(value) { + return 0, nil + } + + select { + case <-t.ctx.Done(): + return 0, errTransactionAborted + default: + } + + if len(t.externalLabels) != 0 { + labels = append(labels, t.externalLabels...) + } + + if t.isNew { + if err := t.initTransaction(labels); err != nil { + return 0, err + } + } + + return 0, t.metricBuilder.AddDataPoint(labels, atMs, value) +} + +func (t *transactionPdata) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + return 0, nil +} + +func (t *transactionPdata) initTransaction(labels labels.Labels) error { + job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel) + if job == "" || instance == "" { + return errNoJobInstance + } + metadataCache, err := t.metadataService.Get(job, instance) + if err != nil { + return err + } + if t.jobsMap != nil { + t.job = job + t.instance = instance + } + t.nodeResource = createNodeAndResourcePdata(job, instance, metadataCache.SharedLabels().Get(model.SchemeLabel)) + t.metricBuilder = newMetricBuilderPdata(metadataCache, t.useStartTimeMetric, t.startTimeMetricRegex, t.logger, t.stalenessStore) + t.isNew = false + return nil +} + +func (t *transactionPdata) Commit() error { + if t.isNew { + return nil + } + + // Emit the staleness markers. + staleLabels := t.stalenessStore.emitStaleLabels() + for _, sEntry := range staleLabels { + t.metricBuilder.AddDataPoint(sEntry.labels, sEntry.seenAtMs, stalenessSpecialValue) + } + t.startTimeMs = -1 + + ctx := t.obsrecv.StartMetricsOp(t.ctx) + metricsL, numPoints, _, err := t.metricBuilder.Build() + if err != nil { + t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err) + return err + } + + if t.useStartTimeMetric { + if t.metricBuilder.startTime == 0.0 { + err = errNoStartTimeMetrics + t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err) + return err + } + // Otherwise adjust the startTimestamp for all the metrics. + adjustStartTimestampPdata(t.metricBuilder.startTime, metricsL) + } else { + // TODO: Derive numPoints in this case. + _ = NewMetricsAdjusterPdata(t.jobsMap.get(t.job, t.instance), t.logger).AdjustMetrics(metricsL) + } + + if metricsL.Len() > 0 { + metrics := t.metricSliceToMetrics(metricsL) + t.sink.ConsumeMetrics(ctx, *metrics) + } + + t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, nil) + return nil +} + +func (t *transactionPdata) Rollback() error { + t.startTimeMs = -1 + return nil +} + +func adjustStartTimestampPdata(startTime float64, metricsL *pdata.MetricSlice) { + for i := 0; i < metricsL.Len(); i++ { + metric := metricsL.At(i) + switch metric.DataType() { + case pdata.MetricDataTypeGauge, pdata.MetricDataTypeHistogram: + continue + + default: + dataPoints := metric.Summary().DataPoints() + for i := 0; i < dataPoints.Len(); i++ { + dataPoint := dataPoints.At(i) + dataPoint.SetStartTimestamp(pdata.Timestamp(startTime)) + } + } + } +} + +func (t *transactionPdata) metricSliceToMetrics(metricsL *pdata.MetricSlice) *pdata.Metrics { + metrics := pdata.NewMetrics() + rms := metrics.ResourceMetrics().AppendEmpty() + ilm := rms.InstrumentationLibraryMetrics().AppendEmpty() + metricsL.CopyTo(ilm.Metrics()) + t.nodeResource.CopyTo(rms.Resource()) + return &metrics +} diff --git a/receiver/prometheusreceiver/internal/prom_to_otlp.go b/receiver/prometheusreceiver/internal/prom_to_otlp.go index 9f8aa13cadf..2053b36ecec 100644 --- a/receiver/prometheusreceiver/internal/prom_to_otlp.go +++ b/receiver/prometheusreceiver/internal/prom_to_otlp.go @@ -21,7 +21,7 @@ import ( "go.opentelemetry.io/collector/translator/conventions" ) -func createNodeAndResourcePdata(job, instance, scheme string) pdata.Resource { +func createNodeAndResourcePdata(job, instance, scheme string) *pdata.Resource { host, port, err := net.SplitHostPort(instance) if err != nil { host = instance @@ -35,5 +35,5 @@ func createNodeAndResourcePdata(job, instance, scheme string) pdata.Resource { attrs.UpsertString(portAttr, port) attrs.UpsertString(schemeAttr, scheme) - return resource + return &resource } diff --git a/receiver/prometheusreceiver/internal/prom_to_otlp_test.go b/receiver/prometheusreceiver/internal/prom_to_otlp_test.go index 8a8a421ed5c..f110999f6a3 100644 --- a/receiver/prometheusreceiver/internal/prom_to_otlp_test.go +++ b/receiver/prometheusreceiver/internal/prom_to_otlp_test.go @@ -52,7 +52,7 @@ type jobInstanceDefinition struct { job, instance, host, scheme, port string } -func makeResourceWithJobInstanceScheme(def *jobInstanceDefinition) pdata.Resource { +func makeResourceWithJobInstanceScheme(def *jobInstanceDefinition) *pdata.Resource { resource := pdata.NewResource() attrs := resource.Attributes() // Using hardcoded values to assert on outward expectations so that @@ -63,7 +63,7 @@ func makeResourceWithJobInstanceScheme(def *jobInstanceDefinition) pdata.Resourc attrs.UpsertString("instance", def.instance) attrs.UpsertString("port", def.port) attrs.UpsertString("scheme", def.scheme) - return resource + return &resource } func TestCreateNodeAndResourcePromToOTLP(t *testing.T) { @@ -71,7 +71,7 @@ func TestCreateNodeAndResourcePromToOTLP(t *testing.T) { name, job string instance string scheme string - want pdata.Resource + want *pdata.Resource }{ { name: "all attributes proper", diff --git a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go index bf34180cca5..974c24389e4 100644 --- a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go +++ b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go @@ -199,6 +199,7 @@ service: // 6. Assert that we encounter the stale markers aka special NaNs for the various time series. staleMarkerCount := 0 totalSamples := 0 + require.True(t, len(wReqL) > 0, "Expecting at least one WriteRequest") for i, wReq := range wReqL { name := fmt.Sprintf("WriteRequest#%d", i) require.True(t, len(wReq.Timeseries) > 0, "Expecting at least 1 timeSeries for:: "+name) diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index e43ab7c4483..49ab4e10fea 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -71,9 +71,9 @@ func (r *pReceiver) Start(_ context.Context, host component.Host) error { } }() - var jobsMap *internal.JobsMap + var jobsMap *internal.JobsMapPdata if !r.cfg.UseStartTimeMetric { - jobsMap = internal.NewJobsMap(2 * time.Minute) + jobsMap = internal.NewJobsMapPdata(2 * time.Minute) } // Per component.Component Start instructions, for async operations we should not use the // incoming context, it may get cancelled.