Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/groupbyattr] Use hash of resource attributes for grouping #17527

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/use-hash-groupbyattrproc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/groupbyattr

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improve performance by using map hashes for resource grouping

# One or more tracking issues related to the change
issues: [17527]
193 changes: 92 additions & 101 deletions processor/groupbyattrsprocessor/attribute_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,93 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/pdatautil"
)

type tracesGroup struct {
traces ptrace.Traces
resourceHashes [][16]byte
}

func newTracesGroup() *tracesGroup {
return &tracesGroup{traces: ptrace.NewTraces()}
}

// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (tg *tracesGroup) findOrCreateResourceSpans(originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rss := tg.traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
if tg.resourceHashes[i] == referenceResourceHash {
return rss.At(i)
}
}

rs := tg.traces.ResourceSpans().AppendEmpty()
referenceResource.MoveTo(rs.Resource())
tg.resourceHashes = append(tg.resourceHashes, referenceResourceHash)
return rs
}

type metricsGroup struct {
metrics pmetric.Metrics
resourceHashes [][16]byte
}

func newMetricsGroup() *metricsGroup {
return &metricsGroup{metrics: pmetric.NewMetrics()}
}

// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (mg *metricsGroup) findOrCreateResourceMetrics(originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rms := mg.metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
if mg.resourceHashes[i] == referenceResourceHash {
return rms.At(i)
}
}

rm := mg.metrics.ResourceMetrics().AppendEmpty()
referenceResource.MoveTo(rm.Resource())
mg.resourceHashes = append(mg.resourceHashes, referenceResourceHash)
return rm

}

type logsGroup struct {
logs plog.Logs
resourceHashes [][16]byte
}

// newLogsGroup returns new logsGroup with predefined capacity
func newLogsGroup() *logsGroup {
return &logsGroup{logs: plog.NewLogs()}
}

// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func (lg *logsGroup) findOrCreateResourceLogs(originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {
referenceResource := buildReferenceResource(originResource, requiredAttributes)
referenceResourceHash := pdatautil.MapHash(referenceResource.Attributes())

rls := lg.logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
if lg.resourceHashes[i] == referenceResourceHash {
return rls.At(i)
}
}

rl := lg.logs.ResourceLogs().AppendEmpty()
referenceResource.MoveTo(rl.Resource())
lg.resourceHashes = append(lg.resourceHashes, referenceResourceHash)
return rl
}

func instrumentationLibrariesEqual(il1, il2 pcommon.InstrumentationScope) bool {
return il1.Name() == il2.Name() && il1.Version() == il2.Version()
}
Expand Down Expand Up @@ -73,108 +158,14 @@ func matchingScopeMetrics(rm pmetric.ResourceMetrics, library pcommon.Instrument
return ilm
}

// Build the Attributes that we'll be looking for in existing Resources as a merge of the Attributes
// of the original Resource with the requested Attributes
func buildReferenceAttributes(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Map {
referenceAttributes := pcommon.NewMap()
originResource.Attributes().CopyTo(referenceAttributes)
requiredAttributes.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(referenceAttributes.PutEmpty(k))
return true
})
return referenceAttributes
}

// resourceMatches verifies if given pcommon.Resource attributes strictly match with the specified
// reference Attributes (all attributes must match strictly)
func resourceMatches(resource pcommon.Resource, referenceAttributes pcommon.Map) bool {

// If not the same number of attributes, it doesn't match
if referenceAttributes.Len() != resource.Attributes().Len() {
return false
}

// Go through each attribute and check the corresponding attribute value in the tested Resource
matching := true
referenceAttributes.Range(func(referenceKey string, referenceValue pcommon.Value) bool {
testedValue, foundKey := resource.Attributes().Get(referenceKey)
if !foundKey || !referenceValue.Equal(testedValue) {
// One difference is enough to consider it doesn't match, so fail early
matching = false
return false
}
return true
})

return matching
}

// Update the specified (and new) Resource with the properties of the original Resource, and with the
// required Attributes
func updateResourceToMatch(newResource pcommon.Resource, originResource pcommon.Resource, requiredAttributes pcommon.Map) {
originResource.CopyTo(newResource)

// This prioritizes required attributes over the original resource attributes, if they overlap
attrs := newResource.Attributes()
// buildReferenceResource returns a new resource that we'll be looking for in existing Resources
// as a merge of the Attributes of the original Resource with the requested Attributes.
func buildReferenceResource(originResource pcommon.Resource, requiredAttributes pcommon.Map) pcommon.Resource {
referenceResource := pcommon.NewResource()
originResource.Attributes().CopyTo(referenceResource.Attributes())
requiredAttributes.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(attrs.PutEmpty(k))
v.CopyTo(referenceResource.Attributes().PutEmpty(k))
return true
})
}

// findOrCreateResource searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceSpans(traces ptrace.Traces, originResource pcommon.Resource, requiredAttributes pcommon.Map) ptrace.ResourceSpans {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rss := traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
if resourceMatches(rss.At(i).Resource(), referenceAttributes) {
return rss.At(i)
}
}

// Not found: append a new ResourceSpans.
rs := rss.AppendEmpty()
updateResourceToMatch(rs.Resource(), originResource, requiredAttributes)
return rs
}

// findOrCreateResourceLogs searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceLogs(logs plog.Logs, originResource pcommon.Resource, requiredAttributes pcommon.Map) plog.ResourceLogs {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
if resourceMatches(rls.At(i).Resource(), referenceAttributes) {
return rls.At(i)
}
}

// Not found: append a new ResourceLogs
rl := rls.AppendEmpty()
updateResourceToMatch(rl.Resource(), originResource, requiredAttributes)
return rl
}

// findOrCreateResourceMetrics searches for a Resource with matching attributes and returns it. If nothing is found, it is being created
func findOrCreateResourceMetrics(metrics pmetric.Metrics, originResource pcommon.Resource, requiredAttributes pcommon.Map) pmetric.ResourceMetrics {

// Build the reference attributes that we're looking for in Resources
referenceAttributes := buildReferenceAttributes(originResource, requiredAttributes)

rms := metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
if resourceMatches(rms.At(i).Resource(), referenceAttributes) {
return rms.At(i)
}
}

// Not found: append a new ResourceMetrics
rm := rms.AppendEmpty()
updateResourceToMatch(rm.Resource(), originResource, requiredAttributes)
return rm
return referenceResource
}
10 changes: 5 additions & 5 deletions processor/groupbyattrsprocessor/attribute_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestResourceAttributeScenarios(t *testing.T) {
},
}

logs := plog.NewLogs()
lg := newLogsGroup()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recordAttributeMap := pcommon.NewMap()
Expand All @@ -119,8 +119,8 @@ func TestResourceAttributeScenarios(t *testing.T) {
tt.fillExpectedResourceFun(tt.baseResource, expectedResource)
}

rl := findOrCreateResourceLogs(logs, tt.baseResource, recordAttributeMap)
assert.EqualValues(t, expectedResource.Attributes(), rl.Resource().Attributes())
rl := lg.findOrCreateResourceLogs(tt.baseResource, recordAttributeMap)
assert.Equal(t, expectedResource.Attributes().AsRaw(), rl.Resource().Attributes().AsRaw())
})
}
}
Expand Down Expand Up @@ -158,9 +158,9 @@ func TestInstrumentationLibraryMatching(t *testing.T) {
}

func BenchmarkAttrGrouping(b *testing.B) {
logs := plog.NewLogs()
lg := newLogsGroup()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
findOrCreateResourceLogs(logs, res, groups[rand.Intn(count)])
lg.findOrCreateResourceLogs(res, groups[rand.Intn(count)])
}
}
8 changes: 6 additions & 2 deletions processor/groupbyattrsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/group
go 1.18

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.1
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.69.2-0.20230112233839-f2a0133bf677
Expand All @@ -14,20 +15,22 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.4.5 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
go.opentelemetry.io/collector/featuregate v0.69.2-0.20230112233839-f2a0133bf677 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/metric v0.34.0 // indirect
Expand All @@ -40,10 +43,11 @@ require (
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/grpc v1.52.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ../../processor/groupbytraceprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal
Expand Down
6 changes: 2 additions & 4 deletions processor/groupbyattrsprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading