Skip to content

Commit

Permalink
[processor/groupbyattrs] Updated metric copy to take into considerati…
Browse files Browse the repository at this point in the history
…on custom properties for … (open-telemetry#9088)

* Updated metric copy to take into consideration custom properties for Histogram, ExponentialHistogram and Sum type metrics.

* Added error message for unknown data types.

* Fixed parameter count and actually ran make.

* Addressed data object renaming and lint errors.

* Add in sampled logging.

* Updated changelog with fix

* Added monotonic property.

* Removed unnecessary missing metric logs.

* Reverted factory changes.

Co-authored-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
2 people authored and kentquirk committed Jun 13, 2022
1 parent 570d50b commit dd3b4b5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
- `logstransformprocessor`: Resolve node ordering to fix intermittent failures (#9761)
- `awsinsightreceiver`: Migrate from `ConfigMapsResourceLock` to `ConfigMapsLeasesResourceLock` as per https://github.com/kubernetes/client-go/commit/276ea3ed979947d7cdd4b3d708862245ddcd8883 (#9885)
- `filelog`, `journald`, `syslog`, `tcplog`, `udplog`: Add support for []string type for converting log record entries (#9887)
- `groupbyattrsprocessor`: copied aggregationtemporality when grouping metrics. (#9087)

## v0.50.0

Expand Down
15 changes: 15 additions & 0 deletions processor/groupbyattrsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ func getMetricInInstrumentationLibrary(ilm pmetric.ScopeMetrics, searchedMetric
metric.SetName(searchedMetric.Name())
metric.SetUnit(searchedMetric.Unit())

// Move other special type specific values
switch metric.DataType() {

case pmetric.MetricDataTypeHistogram:
metric.Histogram().SetAggregationTemporality(searchedMetric.Histogram().AggregationTemporality())

case pmetric.MetricDataTypeExponentialHistogram:
metric.ExponentialHistogram().SetAggregationTemporality(searchedMetric.ExponentialHistogram().AggregationTemporality())

case pmetric.MetricDataTypeSum:
metric.Sum().SetAggregationTemporality(searchedMetric.Sum().AggregationTemporality())
metric.Sum().SetIsMonotonic(searchedMetric.Sum().IsMonotonic())

}

return metric
}

Expand Down
83 changes: 83 additions & 0 deletions processor/groupbyattrsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package groupbyattrsprocessor
import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -138,6 +140,62 @@ func someComplexMetrics(withResourceAttrIndex bool, rmCount int, ilmCount int, d
return metrics
}

func someComplexHistogramMetrics(withResourceAttrIndex bool, rmCount int, ilmCount int, dataPointCount int, histogramSize int) pmetric.Metrics {
metrics := pmetric.NewMetrics()

for i := 0; i < rmCount; i++ {
rm := metrics.ResourceMetrics().AppendEmpty()
if withResourceAttrIndex {
rm.Resource().Attributes().InsertInt("resourceAttrIndex", int64(i))
}

for j := 0; j < ilmCount; j++ {
metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName(fmt.Sprintf("foo-%d-%d", i, j))
metric.SetDataType(pmetric.MetricDataTypeHistogram)
metric.Histogram().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)

for k := 0; k < dataPointCount; k++ {
dataPoint := metric.Histogram().DataPoints().AppendEmpty()
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
buckets := randUIntArr(histogramSize)
sort.Slice(buckets, func(i, j int) bool { return buckets[i] < buckets[j] })
dataPoint.SetBucketCounts(buckets)
dataPoint.SetExplicitBounds(randFloat64Arr(histogramSize))
dataPoint.SetCount(sum(buckets))
dataPoint.Attributes().InsertString("commonGroupedAttr", "abc")
dataPoint.Attributes().InsertString("commonNonGroupedAttr", "xyz")
}
}
}

return metrics
}

func randUIntArr(size int) []uint64 {
arr := make([]uint64, size)
for i := 0; i < size; i++ {
arr[i] = rand.Uint64()
}
return arr
}

func sum(arr []uint64) uint64 {
res := uint64(0)
for _, v := range arr {
res += v
}
return res
}

func randFloat64Arr(size int) []float64 {
arr := make([]float64, size)
for i := 0; i < size; i++ {
arr[i] = rand.Float64()
}
return arr
}

func assertResourceContainsAttributes(t *testing.T, resource pcommon.Resource, attributeMap pcommon.Map) {
attributeMap.Range(func(k string, v pcommon.Value) bool {
rv, found := resource.Attributes().Get(k)
Expand Down Expand Up @@ -229,6 +287,7 @@ func TestComplexAttributeGrouping(t *testing.T) {
inputLogs := someComplexLogs(tt.withResourceAttrIndex, tt.inputResourceCount, tt.inputInstrumentationLibraryCount)
inputTraces := someComplexTraces(tt.withResourceAttrIndex, tt.inputResourceCount, tt.inputInstrumentationLibraryCount)
inputMetrics := someComplexMetrics(tt.withResourceAttrIndex, tt.inputResourceCount, tt.inputInstrumentationLibraryCount, 2)
inputHistogramMetrics := someComplexHistogramMetrics(tt.withResourceAttrIndex, tt.inputResourceCount, tt.inputInstrumentationLibraryCount, 2, 2)

gap := createGroupByAttrsProcessor(zap.NewNop(), tt.groupByKeys)

Expand All @@ -241,6 +300,9 @@ func TestComplexAttributeGrouping(t *testing.T) {
processedMetrics, err := gap.processMetrics(context.Background(), inputMetrics)
assert.NoError(t, err)

processedHistogramMetrics, err := gap.processMetrics(context.Background(), inputHistogramMetrics)
assert.NoError(t, err)

// Following are record-level attributes that should be preserved after processing
outputRecordAttrs := pcommon.NewMap()
outputResourceAttrs := pcommon.NewMap()
Expand Down Expand Up @@ -306,6 +368,27 @@ func TestComplexAttributeGrouping(t *testing.T) {
}
}

rmhs := processedHistogramMetrics.ResourceMetrics()
assert.Equal(t, tt.outputResourceCount, rmhs.Len())
assert.Equal(t, tt.outputTotalRecordsCount, processedHistogramMetrics.MetricCount())
for i := 0; i < rmhs.Len(); i++ {
rm := rmhs.At(i)
assert.Equal(t, tt.outputInstrumentationLibraryCount, rm.ScopeMetrics().Len())

assertResourceContainsAttributes(t, rm.Resource(), outputResourceAttrs)

for j := 0; j < rm.ScopeMetrics().Len(); j++ {
metrics := rm.ScopeMetrics().At(j).Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
assert.Equal(t, metric.Histogram().AggregationTemporality(), pmetric.MetricAggregationTemporalityCumulative)
for l := 0; l < metric.Histogram().DataPoints().Len(); l++ {
assert.EqualValues(t, outputRecordAttrs, metric.Histogram().DataPoints().At(l).Attributes())
}
}
}
}

})
}
}
Expand Down

0 comments on commit dd3b4b5

Please sign in to comment.