Skip to content

Commit

Permalink
Export Task ARN resource via Prometheus (#334)
Browse files Browse the repository at this point in the history
* Export Task ARN resource via Prometheus
* Populate cluster name and task id labels
  • Loading branch information
leonsodhi-lf authored Feb 7, 2022
1 parent 49550ff commit 85b920f
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 63 deletions.
21 changes: 21 additions & 0 deletions internal/ecsservicediscovery/decoratedtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/ecs"
)

Expand All @@ -22,6 +24,8 @@ const (
taskLaunchTypeLabel = "LaunchType"
taskJobNameLabel = "job"
taskMetricsPathLabel = "__metrics_path__"
taskClusterNameLabel = "TaskClusterName"
taskIdLabel = "TaskId"
ec2InstanceTypeLabel = "InstanceType"
ec2VpcIdLabel = "VpcId"
ec2SubnetIdLabel = "SubnetId"
Expand Down Expand Up @@ -138,6 +142,23 @@ func (t *DecoratedTask) generatePrometheusTarget(
addExporterLabels(labels, taskGroupLabel, t.Task.Group)
addExporterLabels(labels, taskStartedbyLabel, t.Task.StartedBy)
addExporterLabels(labels, taskLaunchTypeLabel, t.Task.LaunchType)

if arn, err := arn.Parse(*t.Task.TaskArn); err == nil {
// ARN formats: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids
splitResource := strings.Split(arn.Resource, "/")[1:]
if len(splitResource) == 1 {
// Old ARN format
taskId := splitResource[0]
addExporterLabels(labels, taskIdLabel, &taskId)
} else if len(splitResource) == 2 {
// New ARN format
clusterName := splitResource[0]
taskId := splitResource[1]
addExporterLabels(labels, taskClusterNameLabel, &clusterName)
addExporterLabels(labels, taskIdLabel, &taskId)
}
}

if t.EC2Info != nil {
addExporterLabels(labels, ec2InstanceTypeLabel, &t.EC2Info.InstanceType)
addExporterLabels(labels, ec2VpcIdLabel, &t.EC2Info.VpcId)
Expand Down
81 changes: 64 additions & 17 deletions internal/ecsservicediscovery/decoratedtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,30 @@ func TestAddExporterLabels(t *testing.T) {
assert.True(t, reflect.DeepEqual(labels, expected))
}

func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName string) *DecoratedTask {
// ARN formats: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-account-settings.html#ecs-resource-ids
func TestGeneratePrometheusTargetOldARNFormat(t *testing.T) {
fullTask := buildWorkloadFargateAwsvpc(false, true, false, "")
assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp())

config := &ServiceDiscoveryConfig{
DockerLabel: &DockerLabelConfig{
JobNameLabel: "FARGATE_PROMETHEUS_JOB_NAME",
PortLabel: "FARGATE_PROMETHEUS_EXPORTER_PORT",
MetricsPathLabel: "ECS_PROMETHEUS_METRICS_PATH",
},
}

targets := make(map[string]*PrometheusTarget)
dockerLabelRegex := regexp.MustCompile(prometheusLabelNamePattern)
fullTask.ExporterInformation(config, dockerLabelRegex, targets)

target, ok := targets["10.0.0.129:9406/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9406/metrics")
assert.Equal(t, "", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
}

func buildWorkloadFargateAwsvpc(useNewTaskArnFormat bool, dockerLabel bool, taskDef bool, serviceName string) *DecoratedTask {
networkMode := ecs.NetworkModeAwsvpc
taskAttachmentId := "775c6c63-b5f7-4a5b-8a60-8f8295a04cda"
taskAttachmentType := "ElasticNetworkInterface"
Expand All @@ -34,7 +57,11 @@ func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName stri
taskAttachmentDetailsValue1 := "eni-03de9d47faaa2e5ec"
taskAttachmentDetailsValue2 := "10.0.0.129"

taskArn := "arn:aws:ecs:us-east-2:211220956907:task/ExampleCluster/1234567890123456789"
taskArn := "arn:aws:ecs:us-east-2:211220956907:task/1234567890123456789"
if useNewTaskArnFormat {
taskArn = "arn:aws:ecs:us-east-2:211220956907:task/ExampleCluster/1234567890123456789"
}

taskDefinitionArn := "arn:aws:ecs:us-east-2:211220956907:task-definition/prometheus-java-tomcat-fargate-awsvpc:1"
var taskRevision int64 = 4
port9404String := "9404"
Expand Down Expand Up @@ -109,7 +136,7 @@ func buildWorkloadFargateAwsvpc(dockerLabel bool, taskDef bool, serviceName stri
}

func Test_ExportDockerLabelBasedTarget_Fargate_AWSVPC(t *testing.T) {
fullTask := buildWorkloadFargateAwsvpc(true, false, "")
fullTask := buildWorkloadFargateAwsvpc(true, true, false, "")
assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp())

config := &ServiceDiscoveryConfig{
Expand All @@ -128,25 +155,31 @@ func Test_ExportDockerLabelBasedTarget_Fargate_AWSVPC(t *testing.T) {
target, ok := targets["10.0.0.129:9404/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9404/metrics")

assert.Equal(t, 5, len(target.Labels))
assert.Equal(t, 7, len(target.Labels))
assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["job"])
assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"])

target, ok = targets["10.0.0.129:9406/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9406/metrics")
assert.Equal(t, 5, len(target.Labels))
assert.Equal(t, 7, len(target.Labels))
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"])
assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "/metrics", target.Labels["__metrics_path__"])
assert.Equal(t, "/metrics", target.Labels["ECS_PROMETHEUS_METRICS_PATH"])


}

func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) {
fullTask := buildWorkloadFargateAwsvpc(false, true, "")
fullTask := buildWorkloadFargateAwsvpc(true, false, true, "")
assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp())
config := &ServiceDiscoveryConfig{
TaskDefinitions: []*TaskDefinitionConfig{
Expand All @@ -169,16 +202,20 @@ func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) {
target, ok := targets["10.0.0.129:9404/stats/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9404/stats/metrics")

assert.Equal(t, 5, len(target.Labels))
assert.Equal(t, 7, len(target.Labels))
assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"])
assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "/stats/metrics", target.Labels["__metrics_path__"])

target, ok = targets["10.0.0.129:9406/stats/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9406/stats/metrics")
assert.Equal(t, 5, len(target.Labels))
assert.Equal(t, 7, len(target.Labels))
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"])
assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
Expand All @@ -187,7 +224,7 @@ func Test_ExportTaskDefBasedTarget_Fargate_AWSVPC(t *testing.T) {
}

func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) {
fullTask := buildWorkloadFargateAwsvpc(false, false, "true")
fullTask := buildWorkloadFargateAwsvpc(true, false, false, "true")
assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp())
config := &ServiceDiscoveryConfig{
ServiceNamesForTasks: []*ServiceNameForTasksConfig{
Expand All @@ -213,16 +250,20 @@ func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) {
target, ok := targets["10.0.0.129:9404/stats/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9404/stats/metrics")

assert.Equal(t, 6, len(target.Labels))
assert.Equal(t, 8, len(target.Labels))
assert.Equal(t, "java-tomcat-fargate-awsvpc", target.Labels["FARGATE_PROMETHEUS_JOB_NAME"])
assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "bugbash-tomcat-fargate-awsvpc-with-docker-label", target.Labels["container_name"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "9404", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "/stats/metrics", target.Labels["__metrics_path__"])

target, ok = targets["10.0.0.129:9406/stats/metrics"]
assert.True(t, ok, "Missing target: 10.0.0.129:9406/stats/metrics")
assert.Equal(t, 6, len(target.Labels))
assert.Equal(t, 8, len(target.Labels))
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "4", target.Labels["TaskRevision"])
assert.Equal(t, "bugbash-jar-fargate-awsvpc-with-dockerlabel", target.Labels["container_name"])
assert.Equal(t, "9406", target.Labels["FARGATE_PROMETHEUS_EXPORTER_PORT"])
Expand All @@ -231,7 +272,7 @@ func Test_exportServiceEndpointBasedTarget_Fargate_AWSVPC(t *testing.T) {
}

func Test_ExportMixedSDTarget_Fargate_AWSVPC(t *testing.T) {
fullTask := buildWorkloadFargateAwsvpc(true, true, "")
fullTask := buildWorkloadFargateAwsvpc(true, true, true, "")
log.Print(fullTask)
assert.Equal(t, "10.0.0.129", fullTask.getPrivateIp())
config := &ServiceDiscoveryConfig{
Expand Down Expand Up @@ -411,7 +452,7 @@ func testExportMixedSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networkMode st
target, ok := targets["10.4.0.205:32774/metrics"]
assert.True(t, ok, "Missing target: 10.4.0.205:32774/metrics")

assert.Equal(t, 8, len(target.Labels))
assert.Equal(t, 10, len(target.Labels))
assert.Equal(t, "/metrics", target.Labels["EC2_PROMETHEUS_METRICS_PATH"])
assert.Equal(t, "9406", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "t3.medium", target.Labels["InstanceType"])
Expand All @@ -420,17 +461,21 @@ func testExportMixedSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networkMode st
assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"])
assert.Equal(t, "/metrics", target.Labels["__metrics_path__"])
assert.Equal(t, "bugbash-jar-prometheus-workload-java-ec2-bridge", target.Labels["container_name"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])

target, ok = targets["10.4.0.205:9494/metrics"]
assert.True(t, ok, "Missing target: 10.4.0.205:9494/metrics")
assert.Equal(t, 8, len(target.Labels))
assert.Equal(t, 10, len(target.Labels))
assert.Equal(t, "9404", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["EC2_PROMETHEUS_JOB_NAME"])
assert.Equal(t, "t3.medium", target.Labels["InstanceType"])
assert.Equal(t, "subnet-0d0b0212d14b70250", target.Labels["SubnetId"])
assert.Equal(t, "5", target.Labels["TaskRevision"])
assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"])
assert.Equal(t, "bugbash-tomcat-prometheus-workload-java-ec2-bridge-mapped-port", target.Labels["container_name"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["job"])
}

Expand Down Expand Up @@ -480,7 +525,7 @@ func testExportContainerNameSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networ
target, ok := targets["10.4.0.205:9494/metrics"]
log.Print(target)
assert.True(t, ok, "Missing target: 10.4.0.205:9494/metrics")
assert.Equal(t, 8, len(target.Labels))
assert.Equal(t, 10, len(target.Labels))
assert.Equal(t, "9404", target.Labels["EC2_PROMETHEUS_EXPORTER_PORT"])
assert.Equal(t, "bugbash-tomcat-ec2-bridge-mapped-port", target.Labels["EC2_PROMETHEUS_JOB_NAME"])
assert.Equal(t, "t3.medium", target.Labels["InstanceType"])
Expand All @@ -489,4 +534,6 @@ func testExportContainerNameSDTarget_EC2_Bridge_DynamicPort(t *testing.T, networ
assert.Equal(t, "vpc-03e9f55a92516a5e4", target.Labels["VpcId"])
assert.Equal(t, "/metrics", target.Labels["__metrics_path__"])
assert.Equal(t, "bugbash-tomcat-prometheus-workload-java-ec2-bridge-mapped-port", target.Labels["container_name"])
assert.Equal(t, "ExampleCluster", target.Labels["TaskClusterName"])
assert.Equal(t, "1234567890123456789", target.Labels["TaskId"])
}
94 changes: 48 additions & 46 deletions plugins/outputs/cloudwatch/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func TestAggregator_ProperAggregationKey(t *testing.T) {

aggregator.AddMetric(m)
assertNoMetricsInChan(t, metricChan)
time.Sleep(2 * aggregationInterval)
assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 1, 1, 1, 1, "",
assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 1, 1, 1, 1, "",
[]float64{1.0488088481701516}, []float64{1}})

assertNoMetricsInChan(t, metricChan)
Expand Down Expand Up @@ -110,14 +109,12 @@ func TestAggregator_MultipleAggregationPeriods(t *testing.T) {
aggregator.AddMetric(m)

assertNoMetricsInChan(t, metricChan)
time.Sleep(2 * aggregationInterval)
assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 3, 1, 3, 6, "",
assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 3, 1, 3, 6, "",
[]float64{1.0488088481701516, 2.0438317370604793, 2.992374046230249}, []float64{1, 1, 1}})

assertNoMetricsInChan(t, metricChan)

time.Sleep(2 * aggregationInterval)
assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 5, 4, 2, 9, "",
assertMetricContent(t, metricChan, aggregationInterval * 2, m, expectedFieldContent{"value", 5, 4, 2, 9, "",
[]float64{3.9828498555324616, 4.819248325194279}, []float64{1, 1}},
expectedFieldContent{"2nd value", 2, 1, 2, 3, "",
[]float64{1.0488088481701516, 2.0438317370604793}, []float64{1, 1}})
Expand All @@ -143,7 +140,7 @@ func TestAggregator_ShutdownBehavior(t *testing.T) {
close(shutdownChan)
wg.Wait()

assertMetricContent(t, metricChan, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertMetricContent(t, metricChan, 0 * time.Second, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}})
assertNoMetricsInChan(t, metricChan)
}

Expand Down Expand Up @@ -208,48 +205,53 @@ func testPreparation() (chan telegraf.Metric, chan struct{}, Aggregator) {
return metricChan, shutdownChan, aggregator
}

func assertMetricContent(t *testing.T, metricChan <-chan telegraf.Metric, originalMetric telegraf.Metric, expectedFieldContent ...expectedFieldContent) {
log.Printf("The metric chan len is %v.", len(metricChan))
func assertMetricContent(t *testing.T, metricChan <-chan telegraf.Metric, metricMaxWait time.Duration, originalMetric telegraf.Metric, expectedFieldContent ...expectedFieldContent) {
var aggregatedMetric telegraf.Metric

log.Printf("Waiting for metric.")
select {
case aggregatedMetric := <-metricChan:
assert.False(t, aggregatedMetric.HasTag(aggregationIntervalTagKey))
assert.Equal(t, "true", aggregatedMetric.Tags()[highResolutionTagKey])

for _, fieldContent := range expectedFieldContent {
dist, ok := aggregatedMetric.Fields()[fieldContent.fieldName].(distribution.Distribution)
assert.True(t, ok)

assert.Equal(t, fieldContent.max, dist.Maximum())
assert.Equal(t, fieldContent.sampleCount, dist.SampleCount())
assert.Equal(t, fieldContent.unit, dist.Unit())
assert.Equal(t, fieldContent.min, dist.Minimum())
assert.Equal(t, fieldContent.sum, dist.Sum())

values, counts := dist.ValuesAndCounts()
assert.Equal(t, len(fieldContent.expectedValues), len(values))
assert.Equal(t, len(fieldContent.expectedCounts), len(counts))

sort.Float64s(fieldContent.expectedValues)
sort.Float64s(values)
assert.Equal(t, fieldContent.expectedValues, values)

var expectedCountInts []int
for _, count := range fieldContent.expectedCounts {
expectedCountInts = append(expectedCountInts, int(count))
}
sort.Ints(expectedCountInts)
var countInts []int
for _, count := range counts {
countInts = append(countInts, int(count))
}
sort.Ints(countInts)
assert.Equal(t, expectedCountInts, countInts)
}
case aggregatedMetric = <-metricChan:
case <-time.After(metricMaxWait):
assert.FailNow(t, "We should've seen 1 metric by now")
}

assert.NotEqual(t, originalMetric, aggregatedMetric, "The aggregatedMetric should not exactly equal to m since the field will be distribution.Distribution")
default:
assert.Fail(t, "We should got 1 metric now")
log.Printf("Checking metric.")

assert.False(t, aggregatedMetric.HasTag(aggregationIntervalTagKey))
assert.Equal(t, "true", aggregatedMetric.Tags()[highResolutionTagKey])

for _, fieldContent := range expectedFieldContent {
dist, ok := aggregatedMetric.Fields()[fieldContent.fieldName].(distribution.Distribution)
assert.True(t, ok)

assert.Equal(t, fieldContent.max, dist.Maximum())
assert.Equal(t, fieldContent.sampleCount, dist.SampleCount())
assert.Equal(t, fieldContent.unit, dist.Unit())
assert.Equal(t, fieldContent.min, dist.Minimum())
assert.Equal(t, fieldContent.sum, dist.Sum())

values, counts := dist.ValuesAndCounts()
assert.Equal(t, len(fieldContent.expectedValues), len(values))
assert.Equal(t, len(fieldContent.expectedCounts), len(counts))

sort.Float64s(fieldContent.expectedValues)
sort.Float64s(values)
assert.Equal(t, fieldContent.expectedValues, values)

var expectedCountInts []int
for _, count := range fieldContent.expectedCounts {
expectedCountInts = append(expectedCountInts, int(count))
}
sort.Ints(expectedCountInts)
var countInts []int
for _, count := range counts {
countInts = append(countInts, int(count))
}
sort.Ints(countInts)
assert.Equal(t, expectedCountInts, countInts)
}

assert.NotEqual(t, originalMetric, aggregatedMetric, "The aggregatedMetric should not exactly equal to m since the field will be distribution.Distribution")
}

func assertNoMetricsInChan(t *testing.T, metricChan <-chan telegraf.Metric) {
Expand Down

0 comments on commit 85b920f

Please sign in to comment.