Skip to content

Commit

Permalink
[Metricbeat] Add Data Granularity config option for AWS Cloudwatch me…
Browse files Browse the repository at this point in the history
…trics (#33166)

* [Metricbeat] Add Data Granularity config option for AWS Cloudwatch metrics

Co-authored-by: kaiyan-sheng <kaiyan.sheng@elastic.co>
Co-authored-by: girodav <1390902+girodav@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 25, 2022
1 parent 5dd5e88 commit 4cac5d8
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 201 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Metricbeat*

- Add Data Granularity option to AWS module to allow for for fewer API calls of longer periods and keep small intervals. {issue}33133[33133] {pull}33166[33166]
- Update README file on how to run Metricbeat on Kubernetes. {pull}33308[33308]

*Packetbeat*
Expand Down
13 changes: 12 additions & 1 deletion metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ or none get collected by Metricbeat. In this case, please specify a `latency`
parameter so collection start time and end time will be shifted by the given
latency amount.

* *data_granularity*

AWS CloudWatch allows to define the granularity of the returned datapoints, by setting "Period" while querying metrics.
Please see https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDataQuery.html[MetricDataQuery parameters] for more information.

By default, metricbeat will query CloudWatch setting "Period" to Metricbeat collection period. If you wish to set a custom value for "Period", please specify a `data_granularity` parameter.
By setting `period` and `data_granularity` together, you can control, respectively, how frequently you want your metrics to be collected and how granular they have to be.

If you are concerned about reducing the cost derived by CloudWatch API calls made by Metricbeat with an extra delay in retrieving metrics as trade off, you may consider setting `data_granularity` and increase Metricbeat collection period. For example,
setting `data_granularity` to your current value for `period`, and doubling the value of `period`, may lead to a 50% savings in terms of GetMetricData API calls cost.

* *endpoint*

Most AWS services offer a regional endpoint that can be used to make requests.
Expand All @@ -69,7 +80,7 @@ For example, if tags parameter is given as `Organization=Engineering` under
`Organization` and tag value equals to `Engineering`. In order to filter for different
values for the same key, add the values to the value array (see example)

Note: tag filtering only works for metricsets with `resource_type` specified in the
Note: tag filtering only works for metricsets with `resource_type` specified in the
metricset-specific configuration.

[source,yaml]
Expand Down
13 changes: 12 additions & 1 deletion x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ or none get collected by Metricbeat. In this case, please specify a `latency`
parameter so collection start time and end time will be shifted by the given
latency amount.

* *data_granularity*

AWS CloudWatch allows to define the granularity of the returned datapoints, by setting "Period" while querying metrics.
Please see https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDataQuery.html[MetricDataQuery parameters] for more information.

By default, metricbeat will query CloudWatch setting "Period" to Metricbeat collection period. If you wish to set a custom value for "Period", please specify a `data_granularity` parameter.
By setting `period` and `data_granularity` together, you can control, respectively, how frequently you want your metrics to be collected and how granular they have to be.

If you are concerned about reducing the cost derived by CloudWatch API calls made by Metricbeat with an extra delay in retrieving metrics as trade off, you may consider setting `data_granularity` and increase Metricbeat collection period. For example,
setting `data_granularity` to your current value for `period`, and doubling the value of `period`, may lead to a 50% savings in terms of GetMetricData API calls cost.

* *endpoint*

Most AWS services offer a regional endpoint that can be used to make requests.
Expand All @@ -57,7 +68,7 @@ For example, if tags parameter is given as `Organization=Engineering` under
`Organization` and tag value equals to `Engineering`. In order to filter for different
values for the same key, add the values to the value array (see example)

Note: tag filtering only works for metricsets with `resource_type` specified in the
Note: tag filtering only works for metricsets with `resource_type` specified in the
metricset-specific configuration.

[source,yaml]
Expand Down
49 changes: 30 additions & 19 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,26 @@ type describeRegionsClient interface {

// Config defines all required and optional parameters for aws metricsets
type Config struct {
Period time.Duration `config:"period" validate:"nonzero,required"`
Regions []string `config:"regions"`
Latency time.Duration `config:"latency"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
TagsFilter []Tag `config:"tags_filter"`
Period time.Duration `config:"period" validate:"nonzero,required"`
DataGranularity time.Duration `config:"data_granularity"`
Regions []string `config:"regions"`
Latency time.Duration `config:"latency"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
TagsFilter []Tag `config:"tags_filter"`
}

// MetricSet is the base metricset for all aws metricsets
type MetricSet struct {
mb.BaseMetricSet
RegionsList []string
Endpoint string
Period time.Duration
Latency time.Duration
AwsConfig *awssdk.Config
AccountName string
AccountID string
TagsFilter []Tag
RegionsList []string
Endpoint string
Period time.Duration
DataGranularity time.Duration
Latency time.Duration
AwsConfig *awssdk.Config
AccountName string
AccountID string
TagsFilter []Tag
}

// Tag holds a configuration specific for ec2 and cloudwatch metricset.
Expand Down Expand Up @@ -91,16 +93,25 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
}

base.Logger().Debug("aws config endpoint = ", config.AWSConfig.Endpoint)
if config.DataGranularity > config.Period {
return nil, fmt.Errorf("Data Granularity cannot be larger than the period")
}

if config.DataGranularity == 0 {
config.DataGranularity = config.Period
}
metricSet := MetricSet{
BaseMetricSet: base,
Period: config.Period,
Latency: config.Latency,
AwsConfig: &awsConfig,
TagsFilter: config.TagsFilter,
Endpoint: config.AWSConfig.Endpoint,
BaseMetricSet: base,
Period: config.Period,
DataGranularity: config.DataGranularity,
Latency: config.Latency,
AwsConfig: &awsConfig,
TagsFilter: config.TagsFilter,
Endpoint: config.AWSConfig.Endpoint,
}

base.Logger().Debug("Metricset level config for period: ", metricSet.Period)
base.Logger().Debug("Metricset level config for data granularity: ", metricSet.DataGranularity)
base.Logger().Debug("Metricset level config for tags filter: ", metricSet.TagsFilter)
base.Logger().Warn("extra charges on AWS API requests will be generated by this metricset")

Expand Down
25 changes: 9 additions & 16 deletions x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,30 +181,23 @@ func (m *MetricSet) getCloudWatchBillingMetrics(
return events
}

metricDataQueriesTotal := constructMetricQueries(listMetricsOutput, m.Period)
metricDataQueriesTotal := constructMetricQueries(listMetricsOutput, m.DataGranularity)
metricDataOutput, err := aws.GetMetricDataResults(metricDataQueriesTotal, svcCloudwatch, startTime, endTime)
if err != nil {
err = fmt.Errorf("aws GetMetricDataResults failed with %w, skipping region %s", err, regionName)
m.Logger().Error(err.Error())
return nil
}

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataOutput)
if timestamp.IsZero() {
return nil
}

for _, output := range metricDataOutput {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
for valI, metricDataResultValue := range output.Values {
labels := strings.Split(*output.Label, labelSeparator)

event := aws.InitEvent("", m.AccountName, m.AccountID, timestamp)
_, _ = event.MetricSetFields.Put(labels[0], output.Values[timestampIdx])
event := aws.InitEvent("", m.AccountName, m.AccountID, output.Timestamps[valI])
_, _ = event.MetricSetFields.Put(labels[0], metricDataResultValue)

i := 1
for i < len(labels)-1 {
Expand Down Expand Up @@ -345,11 +338,11 @@ func (m *MetricSet) addCostMetrics(metrics map[string]costexplorertypes.MetricVa
return event
}

func constructMetricQueries(listMetricsOutput []types.Metric, period time.Duration) []types.MetricDataQuery {
func constructMetricQueries(listMetricsOutput []types.Metric, dataGranularity time.Duration) []types.MetricDataQuery {
var metricDataQueries []types.MetricDataQuery
metricDataQueryEmpty := types.MetricDataQuery{}
for i, listMetric := range listMetricsOutput {
metricDataQuery := createMetricDataQuery(listMetric, i, period)
metricDataQuery := createMetricDataQuery(listMetric, i, dataGranularity)
if metricDataQuery == metricDataQueryEmpty {
continue
}
Expand All @@ -358,9 +351,9 @@ func constructMetricQueries(listMetricsOutput []types.Metric, period time.Durati
return metricDataQueries
}

func createMetricDataQuery(metric types.Metric, index int, period time.Duration) types.MetricDataQuery {
func createMetricDataQuery(metric types.Metric, index int, dataGranularity time.Duration) types.MetricDataQuery {
statistic := "Maximum"
periodInSeconds := int32(period.Seconds())
dataGranularityInSeconds := int32(dataGranularity.Seconds())
id := metricsetName + strconv.Itoa(index)
metricDims := metric.Dimensions
metricName := *metric.MetricName
Expand All @@ -373,7 +366,7 @@ func createMetricDataQuery(metric types.Metric, index int, period time.Duration)
return types.MetricDataQuery{
Id: &id,
MetricStat: &types.MetricStat{
Period: &periodInSeconds,
Period: &dataGranularityInSeconds,
Stat: &statistic,
Metric: &metric,
},
Expand Down
62 changes: 27 additions & 35 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,20 @@ func (m *MetricSet) readCloudwatchConfig() (listMetricWithDetail, map[string][]n
return listMetricDetailTotal, namespaceDetailTotal
}

func createMetricDataQueries(listMetricsTotal []metricsWithStatistics, period time.Duration) []types.MetricDataQuery {
func createMetricDataQueries(listMetricsTotal []metricsWithStatistics, dataGranularity time.Duration) []types.MetricDataQuery {
var metricDataQueries []types.MetricDataQuery
for i, listMetric := range listMetricsTotal {
for j, statistic := range listMetric.statistic {
stat := statistic
metric := listMetric.cloudwatchMetric
label := constructLabel(listMetric.cloudwatchMetric, statistic)
periodInSec := int32(period.Seconds())
dataGranularityInSec := int32(dataGranularity.Seconds())

id := "cw" + strconv.Itoa(i) + "stats" + strconv.Itoa(j)
metricDataQueries = append(metricDataQueries, types.MetricDataQuery{
Id: &id,
MetricStat: &types.MetricStat{
Period: &periodInSec,
Period: &dataGranularityInSec,
Stat: &stat,
Metric: &metric,
},
Expand Down Expand Up @@ -473,10 +473,10 @@ func insertRootFields(event mb.Event, metricValue float64, labels []string) mb.E

func (m *MetricSet) createEvents(svcCloudwatch cloudwatch.GetMetricDataAPIClient, svcResourceAPI resourcegroupstaggingapi.GetResourcesAPIClient, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, error) {
// Initialize events for each identifier.
events := map[string]mb.Event{}
events := make(map[string]mb.Event)

// Construct metricDataQueries
metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.Period)
metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.DataGranularity)
m.logger.Debugf("Number of MetricDataQueries = %d", len(metricDataQueries))
if len(metricDataQueries) == 0 {
return events, nil
Expand All @@ -489,37 +489,29 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatch.GetMetricDataAPIClient
return events, fmt.Errorf("getMetricDataResults failed: %w", err)
}

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataResults)
if timestamp.IsZero() {
return nil, nil
}

// Create events when there is no tags_filter or resource_type specified.
if len(resourceTypeTagFilters) == 0 {
for _, metricDataResult := range metricDataResults {
if len(metricDataResult.Values) == 0 {
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, metricDataResult.Timestamps)
if exists {
labels := strings.Split(*metricDataResult.Label, labelSeparator)
labels := strings.Split(*metricDataResult.Label, labelSeparator)
for valI, metricDataResultValue := range metricDataResult.Values {
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
// when there is no identifier value in label, use region+accountID+label+index instead
identifier := regionName + m.AccountID + *metricDataResult.Label + fmt.Sprint("-", valI)
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, metricDataResult.Timestamps[valI])
}
events[identifier] = insertRootFields(events[identifier], metricDataResult.Values[timestampIdx], labels)
events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
continue
}

identifierValue := labels[identifierValueIdx]
identifierValue := *metricDataResult.Label + fmt.Sprint("-", valI)
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, metricDataResult.Timestamps[valI])
}
events[identifierValue] = insertRootFields(events[identifierValue], metricDataResult.Values[timestampIdx], labels)
events[identifierValue] = insertRootFields(events[identifierValue], metricDataResultValue, labels)
}
}
return events, nil
Expand Down Expand Up @@ -554,38 +546,38 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatch.GetMetricDataAPIClient
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
labels := strings.Split(*output.Label, labelSeparator)
for valI, metricDataResultValue := range output.Values {
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}

// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
// when there is no identifier value in label, use region+accountID+labels instead
identifier := regionName + m.AccountID + *output.Label + fmt.Sprint("-", valI)
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, output.Timestamps[valI])
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
continue
}

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
uniqueIdentifierValue := *output.Label + fmt.Sprint("-", valI)
if _, ok := events[uniqueIdentifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
events[uniqueIdentifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, output.Timestamps[valI])
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
events[uniqueIdentifierValue] = insertRootFields(events[uniqueIdentifierValue], metricDataResultValue, labels)

// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
insertTags(events, uniqueIdentifierValue, identifierValue, resourceTagMap)
}
}
}
Expand Down Expand Up @@ -625,7 +617,7 @@ func compareAWSDimensions(dim1 []types.Dimension, dim2 []types.Dimension) bool {
return reflect.DeepEqual(dim1NameToValue, dim2NameToValue)
}

func insertTags(events map[string]mb.Event, identifier string, resourceTagMap map[string][]resourcegroupstaggingapitypes.Tag) {
func insertTags(events map[string]mb.Event, uniqueIdentifierValue string, identifier string, resourceTagMap map[string][]resourcegroupstaggingapitypes.Tag) {
// Check if identifier includes dimensionSeparator (comma in this case),
// split the identifier and check for each sub-identifier.
// For example, identifier might be [storageType, s3BucketName].
Expand All @@ -644,7 +636,7 @@ func insertTags(events map[string]mb.Event, identifier string, resourceTagMap ma
// By default, replace dot "." using underscore "_" for tag keys.
// Note: tag values are not dedotted.
for _, tag := range tags {
_, _ = events[identifier].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), *tag.Value)
_, _ = events[uniqueIdentifierValue].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), *tag.Value)
}
continue
}
Expand Down
Loading

0 comments on commit 4cac5d8

Please sign in to comment.