From 91b71a713ae913b250fff1219a048f4d44ad24bf Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Tue, 30 Jul 2024 15:32:46 -0400 Subject: [PATCH] Add cumulative to delta conversion for JMX metrics. (#1264) --- .../sampleConfig/complete_linux_config.yaml | 9 +++ .../sampleConfig/jmx_config_linux.yaml | 8 +++ translator/translate/otel/common/common.go | 10 +++ .../translate/otel/common/common_test.go | 69 +++++++++++++------ translator/translate/otel/common/options.go | 30 ++++++++ .../translate/otel/common/options_test.go | 17 +++++ .../otel/pipeline/host/translator.go | 2 +- .../translate/otel/pipeline/jmx/translator.go | 2 + .../otel/pipeline/jmx/translator_test.go | 6 +- .../cumulativetodeltaprocessor/translator.go | 66 +++++++++++------- .../translator_test.go | 4 +- 11 files changed, 172 insertions(+), 51 deletions(-) create mode 100644 translator/translate/otel/common/options.go create mode 100644 translator/translate/otel/common/options_test.go diff --git a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml index 1e4bae98a2..d707817271 100644 --- a/translator/tocwconfig/sampleConfig/complete_linux_config.yaml +++ b/translator/tocwconfig/sampleConfig/complete_linux_config.yaml @@ -117,6 +117,13 @@ processors: match_type: "" initial_value: 0 max_staleness: 0s + cumulativetodelta/jmx: + exclude: + match_type: "" + include: + match_type: "" + initial_value: 0 + max_staleness: 0s ec2tagger: ec2_instance_tag_keys: - AutoScalingGroupName @@ -384,6 +391,7 @@ service: processors: - filter/jmx/0 - resource/jmx + - cumulativetodelta/jmx - transform/jmx/0 - ec2tagger receivers: @@ -394,6 +402,7 @@ service: processors: - filter/jmx/1 - resource/jmx + - cumulativetodelta/jmx - transform/jmx/1 - ec2tagger receivers: diff --git a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml index 1a6ae36898..81238e5f6a 100644 --- a/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml +++ b/translator/tocwconfig/sampleConfig/jmx_config_linux.yaml @@ -21,6 +21,13 @@ extensions: mode: EC2 region_type: ACJ processors: + cumulativetodelta/jmx: + exclude: + match_type: "" + include: + match_type: "" + initial_value: 0 + max_staleness: 0s filter/jmx: error_mode: propagate logs: {} @@ -106,6 +113,7 @@ service: processors: - filter/jmx - resource/jmx + - cumulativetodelta/jmx - transform/jmx receivers: - jmx diff --git a/translator/translate/otel/common/common.go b/translator/translate/otel/common/common.go index a19eb2ab38..14350b2022 100644 --- a/translator/translate/otel/common/common.go +++ b/translator/translate/otel/common/common.go @@ -392,3 +392,13 @@ func GetMeasurements(m map[string]any) []string { } return results } + +// IsAnySet checks if any of the provided keys are present in the configuration. +func IsAnySet(conf *confmap.Conf, keys []string) bool { + for _, key := range keys { + if conf.IsSet(key) { + return true + } + } + return false +} diff --git a/translator/translate/otel/common/common_test.go b/translator/translate/otel/common/common_test.go index 1b34d8206f..d4b9a24d30 100644 --- a/translator/translate/otel/common/common_test.go +++ b/translator/translate/otel/common/common_test.go @@ -34,7 +34,7 @@ func TestConfigKeys(t *testing.T) { } func TestGetString(t *testing.T) { - conf := confmap.NewFromStringMap(map[string]interface{}{"int": 10, "string": "test"}) + conf := confmap.NewFromStringMap(map[string]any{"int": 10, "string": "test"}) got, ok := GetString(conf, "int") require.True(t, ok) // converts int to string @@ -69,7 +69,7 @@ func TestGetArray(t *testing.T) { } func TestGetBool(t *testing.T) { - conf := confmap.NewFromStringMap(map[string]interface{}{"int": 10, "string": "test", "bool1": false, "bool2": true}) + conf := confmap.NewFromStringMap(map[string]any{"int": 10, "string": "test", "bool1": false, "bool2": true}) got, ok := GetBool(conf, "int") require.False(t, ok) require.False(t, got) @@ -88,7 +88,7 @@ func TestGetBool(t *testing.T) { } func TestGetOrDefaultBool(t *testing.T) { - conf := confmap.NewFromStringMap(map[string]interface{}{"int": 10, "string": "test", "bool1": false, "bool2": true}) + conf := confmap.NewFromStringMap(map[string]any{"int": 10, "string": "test", "bool1": false, "bool2": true}) got := GetOrDefaultBool(conf, "int", false) require.False(t, got) @@ -106,10 +106,10 @@ func TestGetOrDefaultBool(t *testing.T) { } func TestGetNumber(t *testing.T) { - test := map[string]interface{}{"int": 10, "string": "test", "bool": false, "float": 1.3} + test := map[string]any{"int": 10, "string": "test", "bool": false, "float": 1.3} marshalled, err := json.Marshal(test) require.NoError(t, err) - var unmarshalled map[string]interface{} + var unmarshalled map[string]any require.NoError(t, json.Unmarshal(marshalled, &unmarshalled)) conf := confmap.NewFromStringMap(unmarshalled) @@ -131,7 +131,7 @@ func TestGetNumber(t *testing.T) { } func TestGetDuration(t *testing.T) { - conf := confmap.NewFromStringMap(map[string]interface{}{"invalid": "invalid", "valid": 1, "zero": 0}) + conf := confmap.NewFromStringMap(map[string]any{"invalid": "invalid", "valid": 1, "zero": 0}) got, ok := GetDuration(conf, "invalid") require.False(t, ok) require.Equal(t, time.Duration(0), got) @@ -145,7 +145,7 @@ func TestGetDuration(t *testing.T) { func TestParseDuration(t *testing.T) { testCases := map[string]struct { - input interface{} + input any want time.Duration wantErr bool }{ @@ -201,50 +201,50 @@ func TestMissingKeyError(t *testing.T) { func TestGetOrDefaultDuration(t *testing.T) { sectionKeys := []string{"section::metrics_collection_interval", "backup::metrics_collection_interval"} testCases := map[string]struct { - input map[string]interface{} + input map[string]any want time.Duration }{ "WithDefault": { - input: map[string]interface{}{}, + input: map[string]any{}, want: time.Minute, }, "WithZeroInterval": { - input: map[string]interface{}{ - "backup": map[string]interface{}{ + input: map[string]any{ + "backup": map[string]any{ "metrics_collection_interval": 0, }, - "section": map[string]interface{}{ + "section": map[string]any{ "metrics_collection_interval": 0, }, }, want: time.Minute, }, "WithoutSectionOverride": { - input: map[string]interface{}{ - "backup": map[string]interface{}{ + input: map[string]any{ + "backup": map[string]any{ "metrics_collection_interval": 10, }, - "section": map[string]interface{}{}, + "section": map[string]any{}, }, want: 10 * time.Second, }, "WithInvalidSectionOverride": { - input: map[string]interface{}{ - "backup": map[string]interface{}{ + input: map[string]any{ + "backup": map[string]any{ "metrics_collection_interval": 10, }, - "section": map[string]interface{}{ + "section": map[string]any{ "metrics_collection_interval": "invalid", }, }, want: 10 * time.Second, }, "WithSectionOverride": { - input: map[string]interface{}{ - "backup": map[string]interface{}{ + input: map[string]any{ + "backup": map[string]any{ "metrics_collection_interval": 10, }, - "section": map[string]interface{}{ + "section": map[string]any{ "metrics_collection_interval": 120, }, }, @@ -357,3 +357,30 @@ func TestGetMeasurements(t *testing.T) { }) } } + +func TestIsAnySet(t *testing.T) { + conf := confmap.NewFromStringMap(map[string]any{ + "one": map[string]any{ + "endpoint": "test", + }, + "two": map[string]any{}, + }) + testCases := map[string]struct { + keys []string + want bool + }{ + "NotSet": { + keys: []string{"test"}, + want: false, + }, + "Set": { + keys: []string{"test", "one::endpoint"}, + want: true, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + assert.Equal(t, testCase.want, IsAnySet(conf, testCase.keys)) + }) + } +} diff --git a/translator/translate/otel/common/options.go b/translator/translate/otel/common/options.go new file mode 100644 index 0000000000..fc58bd9203 --- /dev/null +++ b/translator/translate/otel/common/options.go @@ -0,0 +1,30 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package common + +type TranslatorOption func(any) + +type NameSetter interface { + SetName(string) +} + +func WithName(name string) TranslatorOption { + return func(target any) { + if setter, ok := target.(NameSetter); ok { + setter.SetName(name) + } + } +} + +type NameProvider struct { + name string +} + +func (p *NameProvider) Name() string { + return p.name +} + +func (p *NameProvider) SetName(name string) { + p.name = name +} diff --git a/translator/translate/otel/common/options_test.go b/translator/translate/otel/common/options_test.go new file mode 100644 index 0000000000..1ed2534217 --- /dev/null +++ b/translator/translate/otel/common/options_test.go @@ -0,0 +1,17 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWithName(t *testing.T) { + p := &NameProvider{name: "a"} + opt := WithName("b") + opt(p) + assert.Equal(t, "b", p.Name()) +} diff --git a/translator/translate/otel/pipeline/host/translator.go b/translator/translate/otel/pipeline/host/translator.go index 9401e0abc9..1b7b01598f 100644 --- a/translator/translate/otel/pipeline/host/translator.go +++ b/translator/translate/otel/pipeline/host/translator.go @@ -74,7 +74,7 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, // we need to add delta processor because (only) diskio and net input plugins report delta metric if common.PipelineNameHostDeltaMetrics == t.name { log.Printf("D! delta processor required because metrics with diskio or net are set") - translators.Processors.Set(cumulativetodeltaprocessor.NewTranslatorWithName(t.name)) + translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDiskIONetKeys())) } if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) { diff --git a/translator/translate/otel/pipeline/jmx/translator.go b/translator/translate/otel/pipeline/jmx/translator.go index dca302dc0e..bd6cf43e08 100644 --- a/translator/translate/otel/pipeline/jmx/translator.go +++ b/translator/translate/otel/pipeline/jmx/translator.go @@ -13,6 +13,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/filterprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator" @@ -76,6 +77,7 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators Processors: common.NewTranslatorMap( filterprocessor.NewTranslator(filterprocessor.WithName(common.PipelineNameJmx), filterprocessor.WithIndex(t.index)), resourceprocessor.NewTranslator(resourceprocessor.WithName(common.PipelineNameJmx)), + cumulativetodeltaprocessor.NewTranslator(common.WithName(common.PipelineNameJmx), cumulativetodeltaprocessor.WithConfigKeys(common.JmxConfigKey)), ), Exporters: common.NewTranslatorMap(awscloudwatch.NewTranslator()), Extensions: common.NewTranslatorMap(agenthealth.NewTranslator(component.DataTypeMetrics, []string{agenthealth.OperationPutMetricData})), diff --git a/translator/translate/otel/pipeline/jmx/translator_test.go b/translator/translate/otel/pipeline/jmx/translator_test.go index fee6272793..1370111946 100644 --- a/translator/translate/otel/pipeline/jmx/translator_test.go +++ b/translator/translate/otel/pipeline/jmx/translator_test.go @@ -123,7 +123,7 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/jmx", receivers: []string{"jmx"}, - processors: []string{"filter/jmx", "resource/jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, @@ -151,7 +151,7 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/jmx", receivers: []string{"jmx"}, - processors: []string{"filter/jmx", "resource/jmx", "transform/jmx"}, + processors: []string{"filter/jmx", "resource/jmx", "cumulativetodelta/jmx", "transform/jmx"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, @@ -185,7 +185,7 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/jmx/0", receivers: []string{"jmx/0"}, - processors: []string{"filter/jmx/0", "resource/jmx", "transform/jmx/0", "ec2tagger"}, + processors: []string{"filter/jmx/0", "resource/jmx", "cumulativetodelta/jmx", "transform/jmx/0", "ec2tagger"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics"}, }, diff --git a/translator/translate/otel/processor/cumulativetodeltaprocessor/translator.go b/translator/translate/otel/processor/cumulativetodeltaprocessor/translator.go index 3d7700cb54..a53afd043c 100644 --- a/translator/translate/otel/processor/cumulativetodeltaprocessor/translator.go +++ b/translator/translate/otel/processor/cumulativetodeltaprocessor/translator.go @@ -4,7 +4,7 @@ package cumulativetodeltaprocessor import ( - "fmt" + "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" "go.opentelemetry.io/collector/component" @@ -23,38 +23,59 @@ const ( var ( netKey = common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey, common.NetKey) diskioKey = common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey, common.DiskIOKey) + + exclusions = map[string][]string{ + // DiskIO and Net Metrics are cumulative metrics + // DiskIO: https://github.com/shirou/gopsutil/blob/master/disk/disk.go#L32-L47 + // Net: https://github.com/shirou/gopsutil/blob/master/net/net.go#L13-L25 + // https://github.com/aws/amazon-cloudwatch-agent/blob/5ace5aa6d817684cf82f4e6aa82d9596fb56d74b/translator/translate/metrics/util/deltasutil.go#L33-L65 + diskioKey: {"iops_in_progress", "diskio_iops_in_progress"}, + } ) +func WithDiskIONetKeys() common.TranslatorOption { + return WithConfigKeys(diskioKey, netKey) +} + +func WithConfigKeys(keys ...string) common.TranslatorOption { + return func(target any) { + if setter, ok := target.(*translator); ok { + setter.keys = keys + } + } +} + type translator struct { - name string factory processor.Factory + common.NameProvider + keys []string } var _ common.Translator[component.Config] = (*translator)(nil) +var _ common.NameSetter = (*translator)(nil) -func NewTranslator() common.Translator[component.Config] { - return NewTranslatorWithName("") -} - -func NewTranslatorWithName(name string) common.Translator[component.Config] { - return &translator{name, cumulativetodeltaprocessor.NewFactory()} +func NewTranslator(opts ...common.TranslatorOption) common.Translator[component.Config] { + t := &translator{factory: cumulativetodeltaprocessor.NewFactory()} + for _, opt := range opts { + opt(t) + } + return t } func (t *translator) ID() component.ID { - return component.NewIDWithName(t.factory.Type(), t.name) + return component.NewIDWithName(t.factory.Type(), t.Name()) } // Translate creates a processor config based on the fields in the // Metrics section of the JSON config. func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { - if conf == nil || (!conf.IsSet(diskioKey) && !conf.IsSet(netKey)) { - return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: fmt.Sprint(diskioKey, " or ", netKey)} + if conf == nil || !common.IsAnySet(conf, t.keys) { + return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: strings.Join(t.keys, " or ")} } cfg := t.factory.CreateDefaultConfig().(*cumulativetodeltaprocessor.Config) - excludeMetrics := t.getExcludeNetAndDiskIOMetrics(conf) - + excludeMetrics := t.getExcludeMetrics(conf) if len(excludeMetrics) != 0 { cfg.Exclude.MatchType = strict cfg.Exclude.Metrics = excludeMetrics @@ -62,16 +83,13 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { return cfg, nil } -// DiskIO and Net Metrics are cumulative metrics -// DiskIO: https://github.com/shirou/gopsutil/blob/master/disk/disk.go#L32-L47 -// Net: https://github.com/shirou/gopsutil/blob/master/net/net.go#L13-L25 -// However, CloudWatch does have an upper bound https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html -// Therefore, we calculate the delta values for customers instead of using the original values -// https://github.com/aws/amazon-cloudwatch-agent/blob/5ace5aa6d817684cf82f4e6aa82d9596fb56d74b/translator/translate/metrics/util/deltasutil.go#L33-L65 -func (t *translator) getExcludeNetAndDiskIOMetrics(conf *confmap.Conf) []string { - var excludeMetricName []string - if conf.IsSet(diskioKey) { - excludeMetricName = append(excludeMetricName, "iops_in_progress", "diskio_iops_in_progress") +func (t *translator) getExcludeMetrics(conf *confmap.Conf) []string { + var excludeMetricNames []string + for _, key := range t.keys { + exclude, ok := exclusions[key] + if ok && conf.IsSet(key) { + excludeMetricNames = append(excludeMetricNames, exclude...) + } } - return excludeMetricName + return excludeMetricNames } diff --git a/translator/translate/otel/processor/cumulativetodeltaprocessor/translator_test.go b/translator/translate/otel/processor/cumulativetodeltaprocessor/translator_test.go index 23526b4742..534d5edc81 100644 --- a/translator/translate/otel/processor/cumulativetodeltaprocessor/translator_test.go +++ b/translator/translate/otel/processor/cumulativetodeltaprocessor/translator_test.go @@ -15,8 +15,8 @@ import ( ) func TestTranslator(t *testing.T) { - cdpTranslator := NewTranslator() - require.EqualValues(t, "cumulativetodelta", cdpTranslator.ID().String()) + cdpTranslator := NewTranslator(common.WithName("test"), WithDiskIONetKeys()) + require.EqualValues(t, "cumulativetodelta/test", cdpTranslator.ID().String()) testCases := map[string]struct { input map[string]interface{} want *cumulativetodeltaprocessor.Config