Skip to content

Commit

Permalink
fix: drop a metric if it has a staleness flag (#6977)
Browse files Browse the repository at this point in the history
  • Loading branch information
hyunuk authored Jan 5, 2022
1 parent 6fe53a4 commit fddf498
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## 💡 Enhancements 💡

- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)
- `prometheusexporter`: Handling Staleness flag from OTLP (#6805)
- `mysqlreceiver`: Add Integration test (#6916)
- `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670)
- `k8s_observer`: discover k8s.node endpoints (#6820)
Expand Down
16 changes: 16 additions & 0 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.I
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
stalePoint := ok &&
Expand All @@ -130,6 +134,10 @@ func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.Ins
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
Expand Down Expand Up @@ -167,6 +175,10 @@ func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.Instr
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
Expand Down Expand Up @@ -208,6 +220,10 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes())
if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
if !ok {
Expand Down
105 changes: 105 additions & 0 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package prometheusexporter

import (
"log"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -216,6 +217,102 @@ func TestAccumulateMetrics(t *testing.T) {
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
},
},
{
name: "Summary",
metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeSummary)
metric.SetDescription("test description")
dp := metric.Summary().DataPoints().AppendEmpty()
dp.SetCount(10)
dp.SetSum(0.012)
dp.SetCount(10)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
fillQuantileValue := func(pN, value float64, dest pdata.ValueAtQuantile) {
dest.SetQuantile(pN)
dest.SetValue(value)
}
fillQuantileValue(0.50, 190, dp.QuantileValues().AppendEmpty())
fillQuantileValue(0.99, 817, dp.QuantileValues().AppendEmpty())
},
},
{
name: "StalenessMarkerGauge",
metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeGauge)
metric.SetDescription("test description")
dp := metric.Gauge().DataPoints().AppendEmpty()
dp.SetDoubleVal(v)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
},
},
{
name: "StalenessMarkerSum",
metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeSum)
metric.SetDescription("test description")
metric.Sum().SetIsMonotonic(false)
metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(v)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
},
},
{
name: "StalenessMarkerHistogram",
metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeHistogram)
metric.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.SetBucketCounts([]uint64{5, 2})
dp.SetCount(7)
dp.SetExplicitBounds([]float64{3.5, 10.0})
dp.SetSum(v)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
},
},
{
name: "StalenessMarkerSummary",
metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeSummary)
metric.SetDescription("test description")
dp := metric.Summary().DataPoints().AppendEmpty()
dp.SetCount(10)
dp.SetSum(0.012)
dp.SetCount(10)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pdata.NewTimestampFromTime(ts))
dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
fillQuantileValue := func(pN, value float64, dest pdata.ValueAtQuantile) {
dest.SetQuantile(pN)
dest.SetValue(value)
}
fillQuantileValue(0.50, 190, dp.QuantileValues().AppendEmpty())
fillQuantileValue(0.99, 817, dp.QuantileValues().AppendEmpty())
},
},
}

for _, tt := range tests {
Expand All @@ -235,6 +332,10 @@ func TestAccumulateMetrics(t *testing.T) {

// 2 metric arrived
n := a.Accumulate(resourceMetrics2)
if strings.HasPrefix(tt.name, "StalenessMarker") {
require.Equal(t, 0, n)
return
}
require.Equal(t, 1, n)

m2Labels, _, m2Value, m2Temporality, m2IsMonotonic := getMetricProperties(ilm2.Metrics().At(0))
Expand Down Expand Up @@ -322,6 +423,10 @@ func getMetricProperties(metric pdata.Metric) (
value = metric.Histogram().DataPoints().At(0).Sum()
temporality = metric.Histogram().AggregationTemporality()
isMonotonic = true
case pdata.MetricDataTypeSummary:
attributes = metric.Summary().DataPoints().At(0).Attributes()
ts = metric.Summary().DataPoints().At(0).Timestamp().AsTime()
value = metric.Summary().DataPoints().At(0).Sum()
default:
log.Panicf("Invalid data type %s", metric.DataType().String())
}
Expand Down

0 comments on commit fddf498

Please sign in to comment.