Skip to content

Commit

Permalink
carbonexporter: fix empty metric serialization (open-telemetry#30182)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored and cparkins committed Jan 10, 2024
1 parent 515a881 commit 74c22e3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 68 deletions.
22 changes: 22 additions & 0 deletions .chloggen/fixcarbonexporterempty.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'carbonexporter'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix metric with empty numberdatapoint serialization"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30182]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions exporter/carbonexporter/metricdata_to_plaintext.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func writeNumberDataPoints(buf *bytes.Buffer, metricName string, dps pmetric.Num
dp := dps.At(i)
var valueStr string
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeEmpty:
continue // skip this data point - otherwise an empty string will be used as the value and the backend will use the timestamp as the metric value
case pmetric.NumberDataPointValueTypeInt:
valueStr = formatInt64(dp.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
Expand Down
127 changes: 59 additions & 68 deletions exporter/carbonexporter/metricdata_to_plaintext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ func TestBuildPath(t *testing.T) {
}

func TestToPlaintext(t *testing.T) {
expectedTagsCombinations := []string{";k0=v0;k1=v1", ";k1=v1;k0=v0"}

unixSecs := int64(1574092046)
expectedUnixSecsStr := strconv.FormatInt(unixSecs, 10)
unixNSecs := int64(11 * time.Millisecond)
Expand All @@ -135,13 +133,13 @@ func TestToPlaintext(t *testing.T) {
summaryQuantiles := []float64{90, 95, 99, 99.9}
summaryQuantileValues := []float64{100, 6, 4, 1}
tests := []struct {
name string
metricsDataFn func() pmetric.Metrics
wantLines []string
wantLinesCount int
name string
metricsDataFn func() pmetric.Metrics
wantLines []string
wantExtraLinesCount int
}{
{
name: "no_dims",
name: "gauge",
metricsDataFn: func() pmetric.Metrics {
md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
Expand All @@ -153,46 +151,45 @@ func TestToPlaintext(t *testing.T) {
dps2 := ms.At(1).SetEmptyGauge().DataPoints()
dps2.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
dps2.At(0).SetIntValue(int64Val)

ms.AppendEmpty().SetName("cumulative_double_no_dims")
ms.At(2).SetEmptySum().SetIsMonotonic(true)
dps3 := ms.At(2).Sum().DataPoints()
ms.AppendEmpty().SetName("gauge_double_with_dims")
dps3 := ms.At(2).SetEmptyGauge().DataPoints()
dps3.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
dps3.At(0).Attributes().PutStr("k0", "v0")
dps3.At(0).Attributes().PutStr("k1", "v1")
dps3.At(0).SetDoubleValue(doubleVal)
ms.AppendEmpty().SetName("cumulative_int_no_dims")
ms.At(3).SetEmptySum().SetIsMonotonic(true)
dps4 := ms.At(3).Sum().DataPoints()
ms.AppendEmpty().SetName("gauge_int_with_dims")
dps4 := ms.At(3).SetEmptyGauge().DataPoints()
dps4.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
dps4.At(0).Attributes().PutStr("k0", "v0")
dps4.At(0).Attributes().PutStr("k1", "v1")
dps4.At(0).SetIntValue(int64Val)
ms.AppendEmpty().SetName("gauge_no_value")
dps5 := ms.At(4).SetEmptyGauge().DataPoints()
dps5.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
return md

},
wantLines: []string{
"gauge_double_no_dims " + expectedDobuleValStr + " " + expectedUnixSecsStr,
"gauge_int_no_dims " + expectedInt64ValStr + " " + expectedUnixSecsStr,
"cumulative_double_no_dims " + expectedDobuleValStr + " " + expectedUnixSecsStr,
"cumulative_int_no_dims " + expectedInt64ValStr + " " + expectedUnixSecsStr,
"gauge_double_with_dims;k0=v0;k1=v1 " + expectedDobuleValStr + " " + expectedUnixSecsStr,
"gauge_int_with_dims;k0=v0;k1=v1 " + expectedInt64ValStr + " " + expectedUnixSecsStr,
},
wantLinesCount: 4,
},
{
name: "with_dims",
name: "cumulative_monotonic_sum",
metricsDataFn: func() pmetric.Metrics {
md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
ms.AppendEmpty().SetName("gauge_double_with_dims")
dps1 := ms.At(0).SetEmptyGauge().DataPoints()
ms.AppendEmpty().SetName("cumulative_double_no_dims")
ms.At(0).SetEmptySum().SetIsMonotonic(true)
dps1 := ms.At(0).Sum().DataPoints()
dps1.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
dps1.At(0).Attributes().PutStr("k1", "v1")
dps1.At(0).Attributes().PutStr("k0", "v0")
dps1.At(0).SetDoubleValue(doubleVal)
ms.AppendEmpty().SetName("gauge_int_with_dims")
dps2 := ms.At(1).SetEmptyGauge().DataPoints()
ms.AppendEmpty().SetName("cumulative_int_no_dims")
ms.At(1).SetEmptySum().SetIsMonotonic(true)
dps2 := ms.At(1).Sum().DataPoints()
dps2.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
dps2.At(0).Attributes().PutStr("k0", "v0")
dps2.At(0).Attributes().PutStr("k1", "v1")
dps2.At(0).SetIntValue(int64Val)

ms.AppendEmpty().SetName("cumulative_double_with_dims")
ms.At(2).SetEmptySum().SetIsMonotonic(true)
dps3 := ms.At(2).Sum().DataPoints()
Expand All @@ -207,45 +204,42 @@ func TestToPlaintext(t *testing.T) {
dps4.At(0).Attributes().PutStr("k0", "v0")
dps4.At(0).Attributes().PutStr("k1", "v1")
dps4.At(0).SetIntValue(int64Val)
ms.AppendEmpty().SetName("cumulative_no_value")
ms.At(4).SetEmptySum().SetIsMonotonic(true)
dps5 := ms.At(4).Sum().DataPoints()
dps5.AppendEmpty().SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
return md
},
wantLines: func() []string {
combinations := make([]string, 0, 4*len(expectedTagsCombinations))
for _, tags := range expectedTagsCombinations {
combinations = append(combinations,
"gauge_double_with_dims"+tags+" "+expectedDobuleValStr+" "+expectedUnixSecsStr,
"gauge_int_with_dims"+tags+" "+expectedInt64ValStr+" "+expectedUnixSecsStr,
"cumulative_double_with_dims"+tags+" "+expectedDobuleValStr+" "+expectedUnixSecsStr,
"cumulative_int_with_dims"+tags+" "+expectedInt64ValStr+" "+expectedUnixSecsStr,
)
}
return combinations
}(),
wantLinesCount: 4,
wantLines: []string{
"cumulative_double_no_dims " + expectedDobuleValStr + " " + expectedUnixSecsStr,
"cumulative_int_no_dims " + expectedInt64ValStr + " " + expectedUnixSecsStr,
"cumulative_double_with_dims;k0=v0;k1=v1 " + expectedDobuleValStr + " " + expectedUnixSecsStr,
"cumulative_int_with_dims;k0=v0;k1=v1 " + expectedInt64ValStr + " " + expectedUnixSecsStr,
},
},
{
name: "distributions",
name: "histogram",
metricsDataFn: func() pmetric.Metrics {
md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
ms.AppendEmpty().SetName("distrib")
ms.At(0).SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := ms.At(0).SetEmptyHistogram().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
assert.NoError(t, dp.Attributes().FromRaw(map[string]any{"k0": "v0", "k1": "v1"}))
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetCount(distributionCount)
dp.SetSum(distributionSum)
dp.ExplicitBounds().FromRaw(distributionBounds)
dp.BucketCounts().FromRaw(distributionCounts)
return md
},
wantLines: expectedDistributionLines(
"distrib", expectedTagsCombinations, expectedUnixSecsStr,
"distrib", ";k0=v0;k1=v1", expectedUnixSecsStr,
distributionSum,
distributionCount,
distributionBounds,
distributionCounts),
wantLinesCount: 6,
},
{
name: "summary",
Expand All @@ -255,7 +249,8 @@ func TestToPlaintext(t *testing.T) {
ms.AppendEmpty().SetName("summary")
dp := ms.At(0).SetEmptySummary().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix))
assert.NoError(t, dp.Attributes().FromRaw(map[string]any{"k0": "v0", "k1": "v1"}))
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetCount(summaryCount)
dp.SetSum(summarySum)
for i := range summaryQuantiles {
Expand All @@ -266,68 +261,64 @@ func TestToPlaintext(t *testing.T) {
return md
},
wantLines: expectedSummaryLines(
"summary", expectedTagsCombinations, expectedUnixSecsStr,
"summary", ";k0=v0;k1=v1", expectedUnixSecsStr,
summarySum,
summaryCount,
summaryQuantiles,
summaryQuantileValues),
wantLinesCount: 6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotLines := metricDataToPlaintext(tt.metricsDataFn())
got := strings.Split(gotLines, "\n")
got = got[:len(got)-1]
assert.Equal(t, tt.wantLinesCount, len(got))
assert.Len(t, got, len(tt.wantLines)+tt.wantExtraLinesCount)
assert.Subset(t, tt.wantLines, got)
})
}
}

func expectedDistributionLines(
metricName string,
tagsCombinations []string,
tags string,
timestampStr string,
sum float64,
count uint64,
bounds []float64,
counts []uint64,
) []string {
var lines []string
for _, tags := range tagsCombinations {
lines = append(lines,
metricName+".count"+tags+" "+formatInt64(int64(count))+" "+timestampStr,
metricName+tags+" "+formatFloatForLabel(sum)+" "+timestampStr,
metricName+".bucket"+tags+";upper_bound=inf "+formatInt64(int64(counts[len(bounds)]))+" "+timestampStr,
)
for i, bound := range bounds {
lines = append(lines,
metricName+".count"+tags+" "+formatInt64(int64(count))+" "+timestampStr,
metricName+tags+" "+formatFloatForLabel(sum)+" "+timestampStr,
metricName+".bucket"+tags+";upper_bound=inf "+formatInt64(int64(counts[len(bounds)]))+" "+timestampStr,
)
for i, bound := range bounds {
lines = append(lines,
metricName+".bucket"+tags+";upper_bound="+formatFloatForLabel(bound)+" "+formatInt64(int64(counts[i]))+" "+timestampStr)
}
metricName+".bucket"+tags+";upper_bound="+formatFloatForLabel(bound)+" "+formatInt64(int64(counts[i]))+" "+timestampStr)
}

return lines
}

func expectedSummaryLines(
metricName string,
tagsCombinations []string,
tags string,
timestampStr string,
sum float64,
count uint64,
summaryQuantiles []float64,
summaryQuantileValues []float64,
) []string {
var lines []string
for _, tags := range tagsCombinations {
lines = append(lines,
metricName+".count"+tags+" "+formatInt64(int64(count))+" "+timestampStr,
metricName+tags+" "+formatFloatForValue(sum)+" "+timestampStr,
)
for i := range summaryQuantiles {
lines = append(lines,
metricName+".count"+tags+" "+formatInt64(int64(count))+" "+timestampStr,
metricName+tags+" "+formatFloatForValue(sum)+" "+timestampStr,
)
for i := range summaryQuantiles {
lines = append(lines,
metricName+".quantile"+tags+";quantile="+formatFloatForLabel(summaryQuantiles[i])+" "+formatFloatForValue(summaryQuantileValues[i])+" "+timestampStr)
}
metricName+".quantile"+tags+";quantile="+formatFloatForLabel(summaryQuantiles[i])+" "+formatFloatForValue(summaryQuantileValues[i])+" "+timestampStr)
}
return lines
}
Expand Down

0 comments on commit 74c22e3

Please sign in to comment.