From 85b920ff21fe6ecf8a28e20967fb53c44dd59319 Mon Sep 17 00:00:00 2001 From: leonsodhi-lf <42376753+leonsodhi-lf@users.noreply.github.com> Date: Mon, 7 Feb 2022 18:23:14 +0000 Subject: [PATCH] Export Task ARN resource via Prometheus (#334) * Export Task ARN resource via Prometheus * Populate cluster name and task id labels --- internal/ecsservicediscovery/decoratedtask.go | 21 +++++ .../ecsservicediscovery/decoratedtask_test.go | 81 ++++++++++++---- plugins/outputs/cloudwatch/aggregator_test.go | 94 ++++++++++--------- 3 files changed, 133 insertions(+), 63 deletions(-) diff --git a/internal/ecsservicediscovery/decoratedtask.go b/internal/ecsservicediscovery/decoratedtask.go index 6962883b01..2358da9581 100644 --- a/internal/ecsservicediscovery/decoratedtask.go +++ b/internal/ecsservicediscovery/decoratedtask.go @@ -7,8 +7,10 @@ import ( "fmt" "regexp" "strconv" + "strings" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/service/ecs" ) @@ -22,6 +24,8 @@ const ( taskLaunchTypeLabel = "LaunchType" taskJobNameLabel = "job" taskMetricsPathLabel = "__metrics_path__" + taskClusterNameLabel = "TaskClusterName" + taskIdLabel = "TaskId" ec2InstanceTypeLabel = "InstanceType" ec2VpcIdLabel = "VpcId" ec2SubnetIdLabel = "SubnetId" @@ -138,6 +142,23 @@ func (t *DecoratedTask) generatePrometheusTarget( addExporterLabels(labels, taskGroupLabel, t.Task.Group) addExporterLabels(labels, taskStartedbyLabel, t.Task.StartedBy) addExporterLabels(labels, taskLaunchTypeLabel, t.Task.LaunchType) + + if arn, err := arn.Parse(*t.Task.TaskArn); err == nil { + // ARN formats: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids + splitResource := strings.Split(arn.Resource, "/")[1:] + if len(splitResource) == 1 { + // Old ARN format + taskId := splitResource[0] + addExporterLabels(labels, taskIdLabel, &taskId) + } else if len(splitResource) == 2 { + // New ARN format + clusterName := splitResource[0] + taskId := splitResource[1] + addExporterLabels(labels, taskClusterNameLabel, &clusterName) + addExporterLabels(labels, taskIdLabel, &taskId) + } + } + if t.EC2Info != nil { addExporterLabels(labels, ec2InstanceTypeLabel, &t.EC2Info.InstanceType) addExporterLabels(labels, ec2VpcIdLabel, &t.EC2Info.VpcId) diff --git a/internal/ecsservicediscovery/decoratedtask_test.go b/internal/ecsservicediscovery/decoratedtask_test.go index d22f4593c0..fc3cb7ae41 100644 --- a/internal/ecsservicediscovery/decoratedtask_test.go +++ b/internal/ecsservicediscovery/decoratedtask_test.go @@ -24,7 +24,30 @@ func TestAddExporterLabels(t *testing.T) { assert.True(t, reflect.DeepEqual(labels, expected)) } -func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName string) *DecoratedTask { +// ARN formats: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids +func TestGeneratePrometheusTargetOldARNFormat(t *testing.T) { + fullTask := buildWorkloadFargateAwsvpc(false, true, false, "") + assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp()) + + config := &ServiceDiscoveryConfig{ + DockerLabel: &DockerLabelConfig{ + JobNameLabel: "FARGATE_PROMETHEUS_JOB_NAME", + PortLabel: "FARGATE_PROMETHEUS_EXPORTER_PORT", + MetricsPathLabel: "ECS_PROMETHEUS_METRICS_PATH", + }, + } + + targets := make(map[string]*PrometheusTarget) + dockerLabelRegex := regexp.MustCompile(prometheusLabelNamePattern) + fullTask.ExporterInformation(config, dockerLabelRegex, targets) + + target, ok := targets["10.0.0.129:9406/metrics"] + assert.True(t, ok, "Missing target: 10.0.0.129:9406/metrics") + assert.Equal(t, "", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) +} + +func buildWorkloadFargateAwsvpc(useNewTaskArnFormat bool, dockerLabel bool, taskDef bool, serviceName string) *DecoratedTask { networkMode := ecs.NetworkModeAwsvpc taskAttachmentId := "775c6c63-b5f7-4a5b-8a60-8f8295a04cda" taskAttachmentType := "ElasticNetworkInterface" @@ -34,7 +57,11 @@ func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName stri taskAttachmentDetailsValue1 := "eni-03de9d47faaa2e5ec" taskAttachmentDetailsValue2 := "10.0.0.129" - taskArn := "arn:aws:ecs:us-east-2:211220956907:task/ExampleCluster/1234567890123456789" + taskArn := "arn:aws:ecs:us-east-2:211220956907:task/1234567890123456789" + if useNewTaskArnFormat { + taskArn = "arn:aws:ecs:us-east-2:211220956907:task/ExampleCluster/1234567890123456789" + } + taskDefinitionArn := "arn:aws:ecs:us-east-2:211220956907:task-definition/prometheus-java-tomcat-fargate-awsvpc:1" var taskRevision int64 = 4 port9404String := "9404" @@ -109,7 +136,7 @@ func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName stri } func Test_ExportDockerLabelBasedTarget_Fargate_AWSVPC(t *testing.T) { - fullTask := buildWorkloadFargateAwsvpc(true, false, "") + fullTask := buildWorkloadFargateAwsvpc(true, true, false, "") assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp()) config := &ServiceDiscoveryConfig{ @@ -128,25 +155,31 @@ func Test_ExportDockerLabelBasedTarget_Fargate_AWSVPC(t *testing.T) { target, ok := targets["10.0.0.129:9404/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9404/metrics") - assert.Equal(t, 5, len(target.Labels)) + assert.Equal(t, 7, len(target.Labels)) assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["job"]) assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"]) target, ok = targets["10.0.0.129:9406/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9406/metrics") - assert.Equal(t, 5, len(target.Labels)) + assert.Equal(t, 7, len(target.Labels)) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"]) assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "/metrics", target.Labels["__metrics_path__"]) assert.Equal(t, "/metrics", target.Labels["ECS_PROMETHEUS_METRICS_PATH"]) + + } func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) { - fullTask := buildWorkloadFargateAwsvpc(false, true, "") + fullTask := buildWorkloadFargateAwsvpc(true, false, true, "") assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp()) config := &ServiceDiscoveryConfig{ TaskDefinitions: []*TaskDefinitionConfig{ @@ -169,16 +202,20 @@ func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) { target, ok := targets["10.0.0.129:9404/stats/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9404/stats/metrics") - assert.Equal(t, 5, len(target.Labels)) + assert.Equal(t, 7, len(target.Labels)) assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"]) - assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) + assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "/stats/metrics", target.Labels["__metrics_path__"]) target, ok = targets["10.0.0.129:9406/stats/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9406/stats/metrics") - assert.Equal(t, 5, len(target.Labels)) + assert.Equal(t, 7, len(target.Labels)) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"]) assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) @@ -187,7 +224,7 @@ func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) { } func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) { - fullTask := buildWorkloadFargateAwsvpc(false, false, "true") + fullTask := buildWorkloadFargateAwsvpc(true, false, false, "true") assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp()) config := &ServiceDiscoveryConfig{ ServiceNamesForTasks: []*ServiceNameForTasksConfig{ @@ -213,16 +250,20 @@ func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) { target, ok := targets["10.0.0.129:9404/stats/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9404/stats/metrics") - assert.Equal(t, 6, len(target.Labels)) + assert.Equal(t, 8, len(target.Labels)) assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"]) - assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) + assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "/stats/metrics", target.Labels["__metrics_path__"]) target, ok = targets["10.0.0.129:9406/stats/metrics"] assert.True(t, ok, "Missing target: 10.0.0.129:9406/stats/metrics") - assert.Equal(t, 6, len(target.Labels)) + assert.Equal(t, 8, len(target.Labels)) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "4", target.Labels["TaskRevision"]) assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"]) assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"]) @@ -231,7 +272,7 @@ func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) { } func Test_ExportMixedSDTarget_Fargate_AWSVPC(t *testing.T) { - fullTask := buildWorkloadFargateAwsvpc(true, true, "") + fullTask := buildWorkloadFargateAwsvpc(true, true, true, "") log.Print(fullTask) assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp()) config := &ServiceDiscoveryConfig{ @@ -411,7 +452,7 @@ func testExportMixedSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networkMode st target, ok := targets["10.4.0.205:32774/metrics"] assert.True(t, ok, "Missing target: 10.4.0.205:32774/metrics") - assert.Equal(t, 8, len(target.Labels)) + assert.Equal(t, 10, len(target.Labels)) assert.Equal(t, "/metrics", target.Labels["EC2_PROMETHEUS_METRICS_PATH"]) assert.Equal(t, "9406", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "t3.medium", target.Labels["InstanceType"]) @@ -420,10 +461,12 @@ func testExportMixedSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networkMode st assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"]) assert.Equal(t, "/metrics", target.Labels["__metrics_path__"]) assert.Equal(t, "bugbash-jar-prometheus-workload-java-ec2-bridge", target.Labels["container_name"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) target, ok = targets["10.4.0.205:9494/metrics"] assert.True(t, ok, "Missing target: 10.4.0.205:9494/metrics") - assert.Equal(t, 8, len(target.Labels)) + assert.Equal(t, 10, len(target.Labels)) assert.Equal(t, "9404", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["EC2_PROMETHEUS_JOB_NAME"]) assert.Equal(t, "t3.medium", target.Labels["InstanceType"]) @@ -431,6 +474,8 @@ func testExportMixedSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networkMode st assert.Equal(t, "5", target.Labels["TaskRevision"]) assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"]) assert.Equal(t, "bugbash-tomcat-prometheus-workload-java-ec2-bridge-mapped-port", target.Labels["container_name"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["job"]) } @@ -480,7 +525,7 @@ func testExportContainerNameSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networ target, ok := targets["10.4.0.205:9494/metrics"] log.Print(target) assert.True(t, ok, "Missing target: 10.4.0.205:9494/metrics") - assert.Equal(t, 8, len(target.Labels)) + assert.Equal(t, 10, len(target.Labels)) assert.Equal(t, "9404", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"]) assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["EC2_PROMETHEUS_JOB_NAME"]) assert.Equal(t, "t3.medium", target.Labels["InstanceType"]) @@ -489,4 +534,6 @@ func testExportContainerNameSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networ assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"]) assert.Equal(t, "/metrics", target.Labels["__metrics_path__"]) assert.Equal(t, "bugbash-tomcat-prometheus-workload-java-ec2-bridge-mapped-port", target.Labels["container_name"]) + assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"]) + assert.Equal(t, "1234567890123456789", target.Labels["TaskId"]) } diff --git a/plugins/outputs/cloudwatch/aggregator_test.go b/plugins/outputs/cloudwatch/aggregator_test.go index b734ac78fa..5205d80dde 100644 --- a/plugins/outputs/cloudwatch/aggregator_test.go +++ b/plugins/outputs/cloudwatch/aggregator_test.go @@ -73,8 +73,7 @@ func TestAggregator_ProperAggregationKey(t *testing.T) { aggregator.AddMetric(m) assertNoMetricsInChan(t, metricChan) - time.Sleep(2 * aggregationInterval) - assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 1, 1, 1, 1, "", + assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) assertNoMetricsInChan(t, metricChan) @@ -110,14 +109,12 @@ func TestAggregator_MultipleAggregationPeriods(t *testing.T) { aggregator.AddMetric(m) assertNoMetricsInChan(t, metricChan) - time.Sleep(2 * aggregationInterval) - assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 3, 1, 3, 6, "", + assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 3, 1, 3, 6, "", []float64{1.0488088481701516, 2.0438317370604793, 2.992374046230249}, []float64{1, 1, 1}}) assertNoMetricsInChan(t, metricChan) - time.Sleep(2 * aggregationInterval) - assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 5, 4, 2, 9, "", + assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 5, 4, 2, 9, "", []float64{3.9828498555324616, 4.819248325194279}, []float64{1, 1}}, expectedFieldContent{"2nd value", 2, 1, 2, 3, "", []float64{1.0488088481701516, 2.0438317370604793}, []float64{1, 1}}) @@ -143,7 +140,7 @@ func TestAggregator_ShutdownBehavior(t *testing.T) { close(shutdownChan) wg.Wait() - assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) + assertMetricContent(t, metricChan, 0 * time.Second, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) assertNoMetricsInChan(t, metricChan) } @@ -208,48 +205,53 @@ func testPreparation() (chan telegraf.Metric, chan struct{}, Aggregator) { return metricChan, shutdownChan, aggregator } -func assertMetricContent(t *testing.T, metricChan <-chan telegraf.Metric, originalMetric telegraf.Metric, expectedFieldContent ...expectedFieldContent) { - log.Printf("The metric chan len is %v.", len(metricChan)) +func assertMetricContent(t *testing.T, metricChan <-chan telegraf.Metric, metricMaxWait time.Duration, originalMetric telegraf.Metric, expectedFieldContent ...expectedFieldContent) { + var aggregatedMetric telegraf.Metric + + log.Printf("Waiting for metric.") select { - case aggregatedMetric := <-metricChan: - assert.False(t, aggregatedMetric.HasTag(aggregationIntervalTagKey)) - assert.Equal(t, "true", aggregatedMetric.Tags()[highResolutionTagKey]) - - for _, fieldContent := range expectedFieldContent { - dist, ok := aggregatedMetric.Fields()[fieldContent.fieldName].(distribution.Distribution) - assert.True(t, ok) - - assert.Equal(t, fieldContent.max, dist.Maximum()) - assert.Equal(t, fieldContent.sampleCount, dist.SampleCount()) - assert.Equal(t, fieldContent.unit, dist.Unit()) - assert.Equal(t, fieldContent.min, dist.Minimum()) - assert.Equal(t, fieldContent.sum, dist.Sum()) - - values, counts := dist.ValuesAndCounts() - assert.Equal(t, len(fieldContent.expectedValues), len(values)) - assert.Equal(t, len(fieldContent.expectedCounts), len(counts)) - - sort.Float64s(fieldContent.expectedValues) - sort.Float64s(values) - assert.Equal(t, fieldContent.expectedValues, values) - - var expectedCountInts []int - for _, count := range fieldContent.expectedCounts { - expectedCountInts = append(expectedCountInts, int(count)) - } - sort.Ints(expectedCountInts) - var countInts []int - for _, count := range counts { - countInts = append(countInts, int(count)) - } - sort.Ints(countInts) - assert.Equal(t, expectedCountInts, countInts) - } + case aggregatedMetric = <-metricChan: + case <-time.After(metricMaxWait): + assert.FailNow(t, "We should've seen 1 metric by now") + } - assert.NotEqual(t, originalMetric, aggregatedMetric, "The aggregatedMetric should not exactly equal to m since the field will be distribution.Distribution") - default: - assert.Fail(t, "We should got 1 metric now") + log.Printf("Checking metric.") + + assert.False(t, aggregatedMetric.HasTag(aggregationIntervalTagKey)) + assert.Equal(t, "true", aggregatedMetric.Tags()[highResolutionTagKey]) + + for _, fieldContent := range expectedFieldContent { + dist, ok := aggregatedMetric.Fields()[fieldContent.fieldName].(distribution.Distribution) + assert.True(t, ok) + + assert.Equal(t, fieldContent.max, dist.Maximum()) + assert.Equal(t, fieldContent.sampleCount, dist.SampleCount()) + assert.Equal(t, fieldContent.unit, dist.Unit()) + assert.Equal(t, fieldContent.min, dist.Minimum()) + assert.Equal(t, fieldContent.sum, dist.Sum()) + + values, counts := dist.ValuesAndCounts() + assert.Equal(t, len(fieldContent.expectedValues), len(values)) + assert.Equal(t, len(fieldContent.expectedCounts), len(counts)) + + sort.Float64s(fieldContent.expectedValues) + sort.Float64s(values) + assert.Equal(t, fieldContent.expectedValues, values) + + var expectedCountInts []int + for _, count := range fieldContent.expectedCounts { + expectedCountInts = append(expectedCountInts, int(count)) + } + sort.Ints(expectedCountInts) + var countInts []int + for _, count := range counts { + countInts = append(countInts, int(count)) + } + sort.Ints(countInts) + assert.Equal(t, expectedCountInts, countInts) } + + assert.NotEqual(t, originalMetric, aggregatedMetric, "The aggregatedMetric should not exactly equal to m since the field will be distribution.Distribution") } func assertNoMetricsInChan(t *testing.T, metricChan <-chan telegraf.Metric) {