Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/spanmetrics] Resource attributes support #7075

Closed
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
f5553a1
starts initial work on optional resource attributes config to span me…
Tenaria Dec 13, 2021
f186407
adds test to check metric length when copying resources
Tenaria Dec 13, 2021
3206a11
adds changelog and fixes tests
Tenaria Dec 13, 2021
3da4719
Merge branch 'main' into OBC-256-Resource-Attributes-Merge-Upstream
Tenaria Dec 13, 2021
945a6d1
fix tests
Tenaria Dec 13, 2021
fc62eab
remove empty new line
Tenaria Dec 13, 2021
07b5ad3
updates changelog
Tenaria Dec 13, 2021
5782cb0
add accidentally removed comment back in
Tenaria Dec 13, 2021
e2eb41d
revert variable name change and fix comment
Tenaria Dec 13, 2021
321c6e2
revert variable name cahnge
Tenaria Dec 13, 2021
fb0561c
adds test to ensure ordering of keys used for aggregation
Tenaria Dec 13, 2021
80ebf64
fixes lock held; optimise for loop; add parallel in test; add comments
Tenaria Dec 13, 2021
19775dc
reverse conditions to simplify code
Tenaria Dec 13, 2021
8c2d38e
update tests to use defined constant instrumentationLibraryName
Tenaria Dec 13, 2021
cbbf532
Merge branch 'main' into OBC-256-Resource-Attributes-Merge-Upstream
chenzhihao Jan 4, 2022
91c8b7d
update the logic of method updateCallMetrics
chenzhihao Jan 5, 2022
068a1bd
fix tests
chenzhihao Jan 5, 2022
18f20d0
add todos
chenzhihao Jan 5, 2022
4d370ad
add todos
chenzhihao Jan 5, 2022
024a122
Merge branch 'main' into OBC-256-Resource-Attributes-Merge-Upstream
chenzhihao Jan 5, 2022
7561f97
fix lint
chenzhihao Jan 5, 2022
62e892f
fix lint
chenzhihao Jan 5, 2022
d8edfda
remove resourceAttrList
chenzhihao Jan 5, 2022
d175a4e
Update "resource_attributes"
chenzhihao Jan 6, 2022
608a083
fix lint
chenzhihao Jan 6, 2022
a0519c0
remove duplicate p.resetExemplarData()
chenzhihao Jan 6, 2022
9772056
refactor how processor reset after every ConsumeTraces process
chenzhihao Jan 6, 2022
ae17b37
move keybuilder to a package
chenzhihao Jan 6, 2022
8ea124c
update comments
chenzhihao Jan 6, 2022
78c11f3
add copyright license
chenzhihao Jan 6, 2022
fbea55f
user defer for p.reset() so that downstream components can get data q…
chenzhihao Jan 6, 2022
3d6b4c1
gofmt
chenzhihao Jan 6, 2022
fd42b76
refactor how we use p.lock
chenzhihao Jan 6, 2022
7ed4b1c
gofmt
chenzhihao Jan 6, 2022
f676e46
added a set of unit tests to cover the excessive concurrent usage
chenzhihao Jan 7, 2022
4b71675
change the usage of the lock to make sure the processor can only be e…
chenzhihao Jan 7, 2022
494b483
Merge branch 'main' into OBC-256-Resource-Attributes-Merge-Upstream
chenzhihao Jan 7, 2022
38ebb2a
add comments; lint
chenzhihao Jan 7, 2022
cb5c3ac
clean up comments
chenzhihao Jan 7, 2022
f3138ae
extract out consumeTraceCase for multiple tests
chenzhihao Jan 10, 2022
f7fdba6
empty commit to rerun test
chenzhihao Jan 10, 2022
ed910fb
Revert "extract out consumeTraceCase for multiple tests"
chenzhihao Jan 10, 2022
0a209af
add compile time assertion check
chenzhihao Jan 10, 2022
17e35f1
remove dated comment
chenzhihao Jan 10, 2022
4c2cc13
make parameter vars consistent with naming
chenzhihao Jan 11, 2022
662dab0
rewording
chenzhihao Jan 11, 2022
442ab94
rename test function to camel case
chenzhihao Jan 11, 2022
40f7eae
add comments for metricKey and resourceKey
chenzhihao Jan 11, 2022
529a623
fix typos
chenzhihao Jan 11, 2022
dd7609a
iterate the slice via range
chenzhihao Jan 11, 2022
76713ee
simplify code
chenzhihao Jan 11, 2022
21822be
simplify code
chenzhihao Jan 11, 2022
5483b44
fix lint
chenzhihao Jan 11, 2022
4df2bb7
rename variable
chenzhihao Jan 12, 2022
d52bc55
simplify code
chenzhihao Jan 12, 2022
80a1dd2
simplify code
chenzhihao Jan 12, 2022
1a90bae
add comments for keybuilder
chenzhihao Jan 12, 2022
62b1e9a
update comment; remove defer()
chenzhihao Jan 12, 2022
6b507da
treat empty string as valid
chenzhihao Jan 12, 2022
f8f6112
make Seperator public
chenzhihao Jan 12, 2022
b840730
add missed configuration field
chenzhihao Jan 12, 2022
2f4034d
add test for BuildMetricKey() and BuildResourceAttrKey()
chenzhihao Jan 12, 2022
524509f
fix typo; add comments to the testdata config files.
chenzhihao Jan 12, 2022
828b674
gofmt
chenzhihao Jan 12, 2022
c8853c2
service name is not mandatory as an attribute
chenzhihao Jan 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- `jaeger` receiver/exporter: Parse/set Jaeger status with OTel spec values (#6682)
- `awsecscontainermetricsreceiver`: remove tag from `container.image.name` (#6436)
- `k8sclusterreceiver`: remove tag from `container.image.name` (#6436)
- `spanmetricproccessor`: service.name attribute added as a default resource attribute instead of attribute in generated metrics (#6717)

## 🚀 New components 🚀

Expand All @@ -48,6 +49,7 @@
- `skywalkingexporter`: add skywalking metrics exporter (#6528)
- `deltatorateprocessor`: add int counter support (#6982)
- `filestorageextension`: document default values (#7022)
- `spanmetricproccessor`: Support specifying resource attributes to attach to metrics generated from traces (#6717)
- `redisreceiver`: Migrate the scraper to the mdatagen metrics builder (#6938)

## v0.41.0
Expand Down
21 changes: 18 additions & 3 deletions processor/spanmetricsprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ latency_bucket{http_method="GET",http_status_code="200",label1="value1",operatio
```
chenzhihao marked this conversation as resolved.
Show resolved Hide resolved

Each metric will have _at least_ the following dimensions because they are common across all spans:
- Service name
- Operation
- Span kind
- Status code

Each metric will have _at least_ the following resource attributes because they are common across all spans:
- Service name

This processor lets traces to continue through the pipeline unmodified.

The following settings are required:
Expand All @@ -48,8 +50,7 @@ The following settings can be optionally configured:
- Default: `[2ms, 4ms, 6ms, 8ms, 10ms, 50ms, 100ms, 200ms, 400ms, 800ms, 1s, 1400ms, 2s, 5s, 10s, 15s]`
- `dimensions`: the list of dimensions to add together with the default dimensions defined above.

Each additional dimension is defined with a `name` which is looked up in the span's collection of attributes or
resource attributes (AKA process tags) such as `ip`, `host.name` or `region`.
Each additional dimension is defined with a `name` which is looked up in the span's collection of attributes.

If the `name`d attribute is missing in the span, the optional provided `default` is used.

Expand All @@ -60,6 +61,15 @@ The following settings can be optionally configured:
One of either `AGGREGATION_TEMPORALITY_CUMULATIVE` or `AGGREGATION_TEMPORALITY_DELTA`.
- Default: `AGGREGATION_TEMPORALITY_CUMULATIVE`

- `resource_attributes`: the list of resource attributes to add together with the default resource attributes defined
above. Each additional resource attribute is defined with a `name` which is looked up in the span's collection of
resource attributes. If the `name`d resource attribute is missing in the span, the optional provided `default` is
used. If no `default` is provided, this resource attribute will be **omitted** from the metric.

`service.name` will be automatically added as a resource attribute to all the generated metrics.

- `resource_attributes_cache_size`: the max number of items in the `resouce_key_to_dimensions_cache`. If not provided,
will use default value size `1000`.
## Examples

The following is a simple example usage of the spanmetrics processor.
Expand Down Expand Up @@ -96,6 +106,11 @@ processors:
default: GET
- name: http.status_code
dimensions_cache_size: 1000
resource_attributes:
- name: region
default: us-east-1
- name: host_id
resource_attributes_cache_size: 1000
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"

exporters:
Expand Down
15 changes: 13 additions & 2 deletions processor/spanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
cumulative = "AGGREGATION_TEMPORALITY_CUMULATIVE"
)

// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
// Dimension defines the key and optional default value if the key is missing from a span attribute.
type Dimension struct {
Name string `mapstructure:"name"`
Default *string `mapstructure:"default"`
Expand All @@ -44,7 +44,6 @@ type Config struct {
LatencyHistogramBuckets []time.Duration `mapstructure:"latency_histogram_buckets"`

// Dimensions defines the list of additional dimensions on top of the provided:
// - service.name
// - operation
// - span.kind
// - status.code
Expand All @@ -58,6 +57,18 @@ type Config struct {
DimensionsCacheSize int `mapstructure:"dimensions_cache_size"`

AggregationTemporality string `mapstructure:"aggregation_temporality"`

// ResourceAttributes defines the list of additional resource attributes to attach to metrics on top of the provided:
// - service.name
// These will be fetched from the span's resource attributes. For more details, see:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md
// and https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/README.md.
ResourceAttributes []Dimension `mapstructure:"resource_attributes"`

// ResourceAttributesCacheSize defines the size of cache for storing ResourceAttributes, which helps to avoid cache
// memory growing indefinitely over the lifetime of the collector.
// Optional. See defaultResourceAttributesCacheSize in processor.go for the default value.
ResourceAttributesCacheSize int `mapstructure:"resource_attributes_cache_size"`
}

// GetAggregationTemporality converts the string value given in the config into a MetricAggregationTemporality.
Expand Down
58 changes: 36 additions & 22 deletions processor/spanmetricsprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,30 @@ import (

func TestLoadConfig(t *testing.T) {
defaultMethod := "GET"
defaultRegion := "us-east-1"
testcases := []struct {
configFile string
wantMetricsExporter string
wantLatencyHistogramBuckets []time.Duration
wantDimensions []Dimension
wantDimensionsCacheSize int
wantAggregationTemporality string
configFile string
wantMetricsExporter string
wantLatencyHistogramBuckets []time.Duration
wantDimensions []Dimension
wantDimensionsCacheSize int
wantResourceAttributes []Dimension
wantResourceAttributesCacheSize int
wantAggregationTemporality string
}{
{
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
wantResourceAttributesCacheSize: 300,
},
{
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
wantResourceAttributesCacheSize: defaultResourceAttributesCacheSize,
},
{
configFile: "config-full.yaml",
Expand All @@ -72,12 +77,19 @@ func TestLoadConfig(t *testing.T) {
{"http.method", &defaultMethod},
{"http.status_code", nil},
},
wantDimensionsCacheSize: 1500,
wantAggregationTemporality: delta,
wantDimensionsCacheSize: 1500,
wantResourceAttributes: []Dimension{
{"region", &defaultRegion},
{"host_id", nil},
},
wantResourceAttributesCacheSize: 3000,
wantAggregationTemporality: delta,
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.configFile, func(t *testing.T) {
t.Parallel()
// Prepare
factories, err := componenttest.NopFactories()
require.NoError(t, err)
Expand All @@ -100,12 +112,14 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)
assert.Equal(t,
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
MetricsExporter: tc.wantMetricsExporter,
LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets,
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
MetricsExporter: tc.wantMetricsExporter,
LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets,
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
ResourceAttributes: tc.wantResourceAttributes,
ResourceAttributesCacheSize: tc.wantResourceAttributesCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
},
cfg.Processors[config.NewComponentID(typeStr)],
)
Expand Down
7 changes: 4 additions & 3 deletions processor/spanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ func NewFactory() component.ProcessorFactory {

func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceAttributesCacheSize: defaultResourceAttributesCacheSize,
}
}

Expand Down
59 changes: 59 additions & 0 deletions processor/spanmetricsprocessor/keybuilder/keybuilder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keybuilder // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/keybuilder"

import "strings"

const (
separator = string(byte(0))
defaultCapacity = 1024
)

type KeyBuilder interface {
chenzhihao marked this conversation as resolved.
Show resolved Hide resolved
Append(value ...string)
String() string
}

type keyBuilder struct {
sb strings.Builder
separator string
}

chenzhihao marked this conversation as resolved.
Show resolved Hide resolved
var _ KeyBuilder = (*keyBuilder)(nil)

func New() KeyBuilder {
b := keyBuilder{
sb: strings.Builder{},
separator: separator,
}
b.sb.Grow(defaultCapacity)
return &b
}

func (mkb *keyBuilder) Append(values ...string) {
for _, value := range values {
if len(value) == 0 {
chenzhihao marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if mkb.sb.Len() != 0 {
mkb.sb.WriteString(mkb.separator)
}
chenzhihao marked this conversation as resolved.
Show resolved Hide resolved
mkb.sb.WriteString(value)
}
}

func (mkb *keyBuilder) String() string {
return mkb.sb.String()
}
60 changes: 60 additions & 0 deletions processor/spanmetricsprocessor/keybuilder/keybuilder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keybuilder

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNew(t *testing.T) {
t.Run("should create new keybuilder", func(t *testing.T) {
assert.NotPanics(t, func() {
assert.NotNil(t, New())
})
})
}

func TestMetricKeyBuilderAppend(t *testing.T) {
tests := []struct {
name string
args [][]string
result string
}{
{
name: "should skip empty string",
args: [][]string{{"", "abc", "", "def"}},
result: fmt.Sprintf("abc%sdef", separator),
},
{
name: "should concat multiple append",
args: [][]string{{"abc", "def"}, {"", "hij"}},
result: fmt.Sprintf("abc%sdef%shij", separator, separator),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
kb := New()
for _, arg := range tt.args {
kb.Append(arg...)
}
assert.Equal(t, tt.result, kb.String())
})
}
}
Loading