Skip to content

Commit

Permalink
[processor/groupbyattr] Use hash of resource attributes for grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Jan 17, 2023
1 parent 956d956 commit 77f5dd2
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 130 deletions.
11 changes: 11 additions & 0 deletions .chloggen/use-hash-groupbyattrproc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/groupbyattr

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improve performance by using map hashes for resource grouping

# One or more tracking issues related to the change
issues: [17527]
193 changes: 92 additions & 101 deletions processor/groupbyattrsprocessor/attribute_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,93 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/pdatautil"
)

type tracesGroup struct {
traces ptrace.Traces
resourceHashes [][16]byte
}

func newTracesGroup() *tracesGroup {
return &tracesGroup{traces: ptrace.NewTraces()}
}

// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (tg *tracesGroup) findOrCreateResourceSpans(originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rss := tg.traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
if tg.resourceHashes[i] == referenceResourceHash {
return rss.At(i)
}
}

rs := tg.traces.ResourceSpans().AppendEmpty()
referenceResource.MoveTo(rs.Resource())
tg.resourceHashes = append(tg.resourceHashes, referenceResourceHash)
return rs
}

type metricsGroup struct {
metrics pmetric.Metrics
resourceHashes [][16]byte
}

func newMetricsGroup() *metricsGroup {
return &metricsGroup{metrics: pmetric.NewMetrics()}
}

// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (mg *metricsGroup) findOrCreateResourceMetrics(originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rms := mg.metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
if mg.resourceHashes[i] == referenceResourceHash {
return rms.At(i)
}
}

rm := mg.metrics.ResourceMetrics().AppendEmpty()
referenceResource.MoveTo(rm.Resource())
mg.resourceHashes = append(mg.resourceHashes, referenceResourceHash)
return rm

}

type logsGroup struct {
logs plog.Logs
resourceHashes [][16]byte
}

// newLogsGroup returns new logsGroup with predefined capacity
func newLogsGroup() *logsGroup {
return &logsGroup{logs: plog.NewLogs()}
}

// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (lg *logsGroup) findOrCreateResourceLogs(originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rls := lg.logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
if lg.resourceHashes[i] == referenceResourceHash {
return rls.At(i)
}
}

rl := lg.logs.ResourceLogs().AppendEmpty()
referenceResource.MoveTo(rl.Resource())
lg.resourceHashes = append(lg.resourceHashes, referenceResourceHash)
return rl
}

func instrumentationLibrariesEqual(il1, il2 pcommon.InstrumentationScope) bool {
return il1.Name() == il2.Name() && il1.Version() == il2.Version()
}
Expand Down Expand Up @@ -73,108 +158,14 @@ func matchingScopeMetrics(rm pmetric.ResourceMetrics, library pcommon.Instrument
return ilm
}

// Build the Attributes that we'll be looking for in existing Resources as a merge of the Attributes
// of the original Resource with the requested Attributes
func buildReferenceAttributes(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Map {
referenceAttributes := pcommon.NewMap()
originResource.Attributes().CopyTo(referenceAttributes)
requiredAttributes.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(referenceAttributes.PutEmpty(k))
return true
})
return referenceAttributes
}

// resourceMatches verifies if given pcommon.Resource attributes strictly match with the specified
// reference Attributes (all attributes must match strictly)
func resourceMatches(resource pcommon.Resource, referenceAttributes pcommon.Map) bool {

// If not the same number of attributes, it doesn't match
if referenceAttributes.Len() != resource.Attributes().Len() {
return false
}

// Go through each attribute and check the corresponding attribute value in the tested Resource
matching := true
referenceAttributes.Range(func(referenceKey string, referenceValue pcommon.Value) bool {
testedValue, foundKey := resource.Attributes().Get(referenceKey)
if !foundKey || !referenceValue.Equal(testedValue) {
// One difference is enough to consider it doesn't match, so fail early
matching = false
return false
}
return true
})

return matching
}

// Update the specified (and new) Resource with the properties of the original Resource, and with the
// required Attributes
func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.Resource, requiredAttributes pcommon.Map) {
originResource.CopyTo(newResource)

// This prioritizes required attributes over the original resource attributes, if they overlap
attrs := newResource.Attributes()
// buildReferenceResource returns a new resource that we'll be looking for in existing Resources
// as a merge of the Attributes of the original Resource with the requested Attributes.
func buildReferenceResource(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Resource {
referenceResource := pcommon.NewResource()
originResource.Attributes().CopyTo(referenceResource.Attributes())
requiredAttributes.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(attrs.PutEmpty(k))
v.CopyTo(referenceResource.Attributes().PutEmpty(k))
return true
})
}

// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceSpans(traces ptrace.Traces, originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rss := traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
if resourceMatches(rss.At(i).Resource(), referenceAttributes) {
return rss.At(i)
}
}

// Not found: append a new ResourceSpans.
rs := rss.AppendEmpty()
updateResourceToMatch(rs.Resource(), originResource, requiredAttributes)
return rs
}

// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceLogs(logs plog.Logs, originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
if resourceMatches(rls.At(i).Resource(), referenceAttributes) {
return rls.At(i)
}
}

// Not found: append a new ResourceLogs
rl := rls.AppendEmpty()
updateResourceToMatch(rl.Resource(), originResource, requiredAttributes)
return rl
}

// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceMetrics(metrics pmetric.Metrics, originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rms := metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
if resourceMatches(rms.At(i).Resource(), referenceAttributes) {
return rms.At(i)
}
}

// Not found: append a new ResourceMetrics
rm := rms.AppendEmpty()
updateResourceToMatch(rm.Resource(), originResource, requiredAttributes)
return rm
return referenceResource
}
10 changes: 5 additions & 5 deletions processor/groupbyattrsprocessor/attribute_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestResourceAttributeScenarios(t *testing.T) {
},
}

logs := plog.NewLogs()
lg := newLogsGroup()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recordAttributeMap := pcommon.NewMap()
Expand All @@ -119,8 +119,8 @@ func TestResourceAttributeScenarios(t *testing.T) {
tt.fillExpectedResourceFun(tt.baseResource, expectedResource)
}

rl := findOrCreateResourceLogs(logs, tt.baseResource, recordAttributeMap)
assert.EqualValues(t, expectedResource.Attributes(), rl.Resource().Attributes())
rl := lg.findOrCreateResourceLogs(tt.baseResource, recordAttributeMap)
assert.Equal(t, expectedResource.Attributes().AsRaw(), rl.Resource().Attributes().AsRaw())
})
}
}
Expand Down Expand Up @@ -158,9 +158,9 @@ func TestInstrumentationLibraryMatching(t *testing.T) {
}

func BenchmarkAttrGrouping(b *testing.B) {
logs := plog.NewLogs()
lg := newLogsGroup()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
findOrCreateResourceLogs(logs, res, groups[rand.Intn(count)])
lg.findOrCreateResourceLogs(res, groups[rand.Intn(count)])
}
}
8 changes: 6 additions & 2 deletions processor/groupbyattrsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/group
go 1.18

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.1
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.69.2-0.20230112233839-f2a0133bf677
Expand All @@ -14,20 +15,22 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.4.5 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
go.opentelemetry.io/collector/featuregate v0.69.2-0.20230112233839-f2a0133bf677 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/metric v0.34.0 // indirect
Expand All @@ -40,10 +43,11 @@ require (
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/grpc v1.52.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ../../processor/groupbytraceprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal
Expand Down
6 changes: 2 additions & 4 deletions processor/groupbyattrsprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 77f5dd2

Please sign in to comment.