diff --git a/.chloggen/use-hash-groupbyattrproc.yaml b/.chloggen/use-hash-groupbyattrproc.yaml new file mode 100755 index 000000000000..eed34e71be3d --- /dev/null +++ b/.chloggen/use-hash-groupbyattrproc.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/groupbyattr + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Improve performance by using map hashes for resource grouping + +# One or more tracking issues related to the change +issues: [17527] diff --git a/processor/groupbyattrsprocessor/attribute_groups.go b/processor/groupbyattrsprocessor/attribute_groups.go index 02d982b1302c..70599abeb7ad 100644 --- a/processor/groupbyattrsprocessor/attribute_groups.go +++ b/processor/groupbyattrsprocessor/attribute_groups.go @@ -19,8 +19,93 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/pdatautil" ) +type tracesGroup struct { + traces ptrace.Traces + resourceHashes [][16]byte +} + +func newTracesGroup() *tracesGroup { + return &tracesGroup{traces: ptrace.NewTraces()} +} + +// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created +func (tg *tracesGroup) findOrCreateResourceSpans(originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans { + referenceResource := buildReferenceResource(originResource, requiredAttributes) + referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes()) + + rss := tg.traces.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + if tg.resourceHashes[i] == referenceResourceHash { + return rss.At(i) + } + } + + rs := tg.traces.ResourceSpans().AppendEmpty() + referenceResource.MoveTo(rs.Resource()) + tg.resourceHashes = append(tg.resourceHashes, referenceResourceHash) + return rs +} + +type metricsGroup struct { + metrics pmetric.Metrics + resourceHashes [][16]byte +} + +func newMetricsGroup() *metricsGroup { + return &metricsGroup{metrics: pmetric.NewMetrics()} +} + +// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created +func (mg *metricsGroup) findOrCreateResourceMetrics(originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics { + referenceResource := buildReferenceResource(originResource, requiredAttributes) + referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes()) + + rms := mg.metrics.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + if mg.resourceHashes[i] == referenceResourceHash { + return rms.At(i) + } + } + + rm := mg.metrics.ResourceMetrics().AppendEmpty() + referenceResource.MoveTo(rm.Resource()) + mg.resourceHashes = append(mg.resourceHashes, referenceResourceHash) + return rm + +} + +type logsGroup struct { + logs plog.Logs + resourceHashes [][16]byte +} + +// newLogsGroup returns new logsGroup with predefined capacity +func newLogsGroup() *logsGroup { + return &logsGroup{logs: plog.NewLogs()} +} + +// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created +func (lg *logsGroup) findOrCreateResourceLogs(originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs { + referenceResource := buildReferenceResource(originResource, requiredAttributes) + referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes()) + + rls := lg.logs.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + if lg.resourceHashes[i] == referenceResourceHash { + return rls.At(i) + } + } + + rl := lg.logs.ResourceLogs().AppendEmpty() + referenceResource.MoveTo(rl.Resource()) + lg.resourceHashes = append(lg.resourceHashes, referenceResourceHash) + return rl +} + func instrumentationLibrariesEqual(il1, il2 pcommon.InstrumentationScope) bool { return il1.Name() == il2.Name() && il1.Version() == il2.Version() } @@ -73,108 +158,14 @@ func matchingScopeMetrics(rm pmetric.ResourceMetrics, library pcommon.Instrument return ilm } -// Build the Attributes that we'll be looking for in existing Resources as a merge of the Attributes -// of the original Resource with the requested Attributes -func buildReferenceAttributes(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Map { - referenceAttributes := pcommon.NewMap() - originResource.Attributes().CopyTo(referenceAttributes) - requiredAttributes.Range(func(k string, v pcommon.Value) bool { - v.CopyTo(referenceAttributes.PutEmpty(k)) - return true - }) - return referenceAttributes -} - -// resourceMatches verifies if given pcommon.Resource attributes strictly match with the specified -// reference Attributes (all attributes must match strictly) -func resourceMatches(resource pcommon.Resource, referenceAttributes pcommon.Map) bool { - - // If not the same number of attributes, it doesn't match - if referenceAttributes.Len() != resource.Attributes().Len() { - return false - } - - // Go through each attribute and check the corresponding attribute value in the tested Resource - matching := true - referenceAttributes.Range(func(referenceKey string, referenceValue pcommon.Value) bool { - testedValue, foundKey := resource.Attributes().Get(referenceKey) - if !foundKey || !referenceValue.Equal(testedValue) { - // One difference is enough to consider it doesn't match, so fail early - matching = false - return false - } - return true - }) - - return matching -} - -// Update the specified (and new) Resource with the properties of the original Resource, and with the -// required Attributes -func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.Resource, requiredAttributes pcommon.Map) { - originResource.CopyTo(newResource) - - // This prioritizes required attributes over the original resource attributes, if they overlap - attrs := newResource.Attributes() +// buildReferenceResource returns a new resource that we'll be looking for in existing Resources +// as a merge of the Attributes of the original Resource with the requested Attributes. +func buildReferenceResource(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Resource { + referenceResource := pcommon.NewResource() + originResource.Attributes().CopyTo(referenceResource.Attributes()) requiredAttributes.Range(func(k string, v pcommon.Value) bool { - v.CopyTo(attrs.PutEmpty(k)) + v.CopyTo(referenceResource.Attributes().PutEmpty(k)) return true }) -} - -// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func findOrCreateResourceSpans(traces ptrace.Traces, originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans { - - // Build the reference attributes that we're looking for in Resources - referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - - rss := traces.ResourceSpans() - for i := 0; i < rss.Len(); i++ { - if resourceMatches(rss.At(i).Resource(), referenceAttributes) { - return rss.At(i) - } - } - - // Not found: append a new ResourceSpans. - rs := rss.AppendEmpty() - updateResourceToMatch(rs.Resource(), originResource, requiredAttributes) - return rs -} - -// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func findOrCreateResourceLogs(logs plog.Logs, originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs { - - // Build the reference attributes that we're looking for in Resources - referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - - rls := logs.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - if resourceMatches(rls.At(i).Resource(), referenceAttributes) { - return rls.At(i) - } - } - - // Not found: append a new ResourceLogs - rl := rls.AppendEmpty() - updateResourceToMatch(rl.Resource(), originResource, requiredAttributes) - return rl -} - -// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created -func findOrCreateResourceMetrics(metrics pmetric.Metrics, originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics { - - // Build the reference attributes that we're looking for in Resources - referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes) - - rms := metrics.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - if resourceMatches(rms.At(i).Resource(), referenceAttributes) { - return rms.At(i) - } - } - - // Not found: append a new ResourceMetrics - rm := rms.AppendEmpty() - updateResourceToMatch(rm.Resource(), originResource, requiredAttributes) - return rm + return referenceResource } diff --git a/processor/groupbyattrsprocessor/attribute_groups_test.go b/processor/groupbyattrsprocessor/attribute_groups_test.go index 758fc23bcd86..d92bc5573b9d 100644 --- a/processor/groupbyattrsprocessor/attribute_groups_test.go +++ b/processor/groupbyattrsprocessor/attribute_groups_test.go @@ -106,7 +106,7 @@ func TestResourceAttributeScenarios(t *testing.T) { }, } - logs := plog.NewLogs() + lg := newLogsGroup() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { recordAttributeMap := pcommon.NewMap() @@ -119,8 +119,8 @@ func TestResourceAttributeScenarios(t *testing.T) { tt.fillExpectedResourceFun(tt.baseResource, expectedResource) } - rl := findOrCreateResourceLogs(logs, tt.baseResource, recordAttributeMap) - assert.EqualValues(t, expectedResource.Attributes(), rl.Resource().Attributes()) + rl := lg.findOrCreateResourceLogs(tt.baseResource, recordAttributeMap) + assert.Equal(t, expectedResource.Attributes().AsRaw(), rl.Resource().Attributes().AsRaw()) }) } } @@ -158,9 +158,9 @@ func TestInstrumentationLibraryMatching(t *testing.T) { } func BenchmarkAttrGrouping(b *testing.B) { - logs := plog.NewLogs() + lg := newLogsGroup() b.ReportAllocs() for n := 0; n < b.N; n++ { - findOrCreateResourceLogs(logs, res, groups[rand.Intn(count)]) + lg.findOrCreateResourceLogs(res, groups[rand.Intn(count)]) } } diff --git a/processor/groupbyattrsprocessor/go.mod b/processor/groupbyattrsprocessor/go.mod index d1ca5692b09b..37b92ab4731b 100644 --- a/processor/groupbyattrsprocessor/go.mod +++ b/processor/groupbyattrsprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/group go 1.18 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.1 go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.69.2-0.20230112233839-f2a0133bf677 @@ -14,13 +15,14 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf v1.4.5 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -28,6 +30,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.6.1 // indirect go.opentelemetry.io/collector/featuregate v0.69.2-0.20230112233839-f2a0133bf677 // indirect go.opentelemetry.io/otel v1.11.2 // indirect go.opentelemetry.io/otel/metric v0.34.0 // indirect @@ -40,10 +43,11 @@ require ( google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect google.golang.org/grpc v1.52.0 // indirect google.golang.org/protobuf v1.28.1 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ../../processor/groupbytraceprocessor replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal diff --git a/processor/groupbyattrsprocessor/go.sum b/processor/groupbyattrsprocessor/go.sum index 87c70e820dfc..dd1b25282668 100644 --- a/processor/groupbyattrsprocessor/go.sum +++ b/processor/groupbyattrsprocessor/go.sum @@ -30,7 +30,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -163,9 +164,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -452,7 +451,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/processor/groupbyattrsprocessor/processor.go b/processor/groupbyattrsprocessor/processor.go index 8dd47a4ea690..34ca412aab86 100644 --- a/processor/groupbyattrsprocessor/processor.go +++ b/processor/groupbyattrsprocessor/processor.go @@ -33,7 +33,7 @@ type groupByAttrsProcessor struct { // ProcessTraces process traces and groups traces by attribute. func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { rss := td.ResourceSpans() - groupedTraces := ptrace.NewTraces() + tg := newTracesGroup() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) @@ -56,7 +56,7 @@ func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.T // Lets combine the base resource attributes + the extracted (grouped) attributes // and keep them in the grouping entry - groupedResourceSpans := findOrCreateResourceSpans(groupedTraces, rs.Resource(), requiredAttributes) + groupedResourceSpans := tg.findOrCreateResourceSpans(rs.Resource(), requiredAttributes) sp := matchingScopeSpans(groupedResourceSpans, ils.Scope()).Spans().AppendEmpty() span.CopyTo(sp) } @@ -64,14 +64,14 @@ func (gap *groupByAttrsProcessor) processTraces(ctx context.Context, td ptrace.T } // Copy the grouped data into output - stats.Record(ctx, mDistSpanGroups.M(int64(groupedTraces.ResourceSpans().Len()))) + stats.Record(ctx, mDistSpanGroups.M(int64(tg.traces.ResourceSpans().Len()))) - return groupedTraces, nil + return tg.traces, nil } func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { rl := ld.ResourceLogs() - groupedLogs := plog.NewLogs() + lg := newLogsGroup() for i := 0; i < rl.Len(); i++ { ls := rl.At(i) @@ -94,7 +94,7 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) // Lets combine the base resource attributes + the extracted (grouped) attributes // and keep them in the grouping entry - groupedResourceLogs := findOrCreateResourceLogs(groupedLogs, ls.Resource(), requiredAttributes) + groupedResourceLogs := lg.findOrCreateResourceLogs(ls.Resource(), requiredAttributes) lr := matchingScopeLogs(groupedResourceLogs, sl.Scope()).LogRecords().AppendEmpty() log.CopyTo(lr) } @@ -103,14 +103,14 @@ func (gap *groupByAttrsProcessor) processLogs(ctx context.Context, ld plog.Logs) } // Copy the grouped data into output - stats.Record(ctx, mDistLogGroups.M(int64(groupedLogs.ResourceLogs().Len()))) + stats.Record(ctx, mDistLogGroups.M(int64(lg.logs.ResourceLogs().Len()))) - return groupedLogs, nil + return lg.logs, nil } func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { rms := md.ResourceMetrics() - groupedMetrics := pmetric.NewMetrics() + mg := newMetricsGroup() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -126,35 +126,35 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric case pmetric.MetricTypeGauge: for pointIndex := 0; pointIndex < metric.Gauge().DataPoints().Len(); pointIndex++ { dataPoint := metric.Gauge().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, mg, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Gauge().DataPoints().AppendEmpty()) } case pmetric.MetricTypeSum: for pointIndex := 0; pointIndex < metric.Sum().DataPoints().Len(); pointIndex++ { dataPoint := metric.Sum().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, mg, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Sum().DataPoints().AppendEmpty()) } case pmetric.MetricTypeSummary: for pointIndex := 0; pointIndex < metric.Summary().DataPoints().Len(); pointIndex++ { dataPoint := metric.Summary().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, mg, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Summary().DataPoints().AppendEmpty()) } case pmetric.MetricTypeHistogram: for pointIndex := 0; pointIndex < metric.Histogram().DataPoints().Len(); pointIndex++ { dataPoint := metric.Histogram().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, mg, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.Histogram().DataPoints().AppendEmpty()) } case pmetric.MetricTypeExponentialHistogram: for pointIndex := 0; pointIndex < metric.ExponentialHistogram().DataPoints().Len(); pointIndex++ { dataPoint := metric.ExponentialHistogram().DataPoints().At(pointIndex) - groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, groupedMetrics, rm, ilm, metric, dataPoint.Attributes()) + groupedMetric := gap.getGroupedMetricsFromAttributes(ctx, mg, rm, ilm, metric, dataPoint.Attributes()) dataPoint.CopyTo(groupedMetric.ExponentialHistogram().DataPoints().AppendEmpty()) } @@ -163,9 +163,9 @@ func (gap *groupByAttrsProcessor) processMetrics(ctx context.Context, md pmetric } } - stats.Record(ctx, mDistMetricGroups.M(int64(groupedMetrics.ResourceMetrics().Len()))) + stats.Record(ctx, mDistMetricGroups.M(int64(mg.metrics.ResourceMetrics().Len()))) - return groupedMetrics, nil + return mg.metrics, nil } func deleteAttributes(attrsForRemoval, targetAttrs pcommon.Map) { @@ -241,7 +241,7 @@ func getMetricInInstrumentationLibrary(ilm pmetric.ScopeMetrics, searchedMetric // Returns the Metric in the appropriate Resource matching with the specified Attributes func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes( ctx context.Context, - groupedMetrics pmetric.Metrics, + mg *metricsGroup, originResourceMetrics pmetric.ResourceMetrics, ilm pmetric.ScopeMetrics, metric pmetric.Metric, @@ -259,7 +259,7 @@ func (gap *groupByAttrsProcessor) getGroupedMetricsFromAttributes( } // Get the ResourceMetrics matching with these attributes - groupedResourceMetrics := findOrCreateResourceMetrics(groupedMetrics, originResourceMetrics.Resource(), requiredAttributes) + groupedResourceMetrics := mg.findOrCreateResourceMetrics(originResourceMetrics.Resource(), requiredAttributes) // Get the corresponding instrumentation library groupedInstrumentationLibrary := matchingScopeMetrics(groupedResourceMetrics, ilm.Scope())