From 7f4a58ac682e6760d9af33399b66d2b352cc0180 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Tue, 17 Nov 2020 14:10:39 +1100 Subject: [PATCH] Address review comments --- processor/metricstransformprocessor/config.go | 102 +++++++++++++----- .../datapoint_aggregation.go | 3 +- .../metricstransformprocessor/factory.go | 42 ++++---- .../metricstransformprocessor/factory_test.go | 21 +++- .../metrics_transform_processor.go | 4 +- .../operation_aggregate_label_values.go | 3 +- .../operation_aggregate_labels.go | 4 +- .../config_invalid_aggregationtype.yaml | 24 +++++ .../config_invalid_operation_action.yaml | 24 +++++ ...fig_invalid_operation_aggregationtype.yaml | 25 +++++ 10 files changed, 196 insertions(+), 56 deletions(-) create mode 100644 processor/metricstransformprocessor/testdata/config_invalid_aggregationtype.yaml create mode 100644 processor/metricstransformprocessor/testdata/config_invalid_operation_action.yaml create mode 100644 processor/metricstransformprocessor/testdata/config_invalid_operation_aggregationtype.yaml diff --git a/processor/metricstransformprocessor/config.go b/processor/metricstransformprocessor/config.go index bdbdd0f30e0a..57b6b461754f 100644 --- a/processor/metricstransformprocessor/config.go +++ b/processor/metricstransformprocessor/config.go @@ -34,6 +34,9 @@ const ( // NewNameFieldName is the mapstructure field name for NewName field NewNameFieldName = "new_name" + // AggregationTypeFieldName is the mapstructure field name for AggregationType field + AggregationTypeFieldName = "aggregation_type" + // LabelFieldName is the mapstructure field name for Label field LabelFieldName = "label" @@ -44,16 +47,6 @@ const ( NewValueFieldName = "new_value" ) -const ( - // StrictMatchType is the FilterType for filtering by exact string matches. - StrictMatchType = "strict" - - // RegexpMatchType is the FilterType for filtering by regexp string matches. - RegexpMatchType = "regexp" -) - -var MatchTypes = []string{StrictMatchType, RegexpMatchType} - // Config defines configuration for Resource processor. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` @@ -93,7 +86,7 @@ type FilterConfig struct { Include string `mapstructure:"include"` // MatchType determines how the Include string is matched: . - MatchType string `mapstructure:"match_type"` + MatchType MatchType `mapstructure:"match_type"` } // Operation defines the specific operation performed on the selected metrics. @@ -139,12 +132,6 @@ type ValueAction struct { // ConfigAction is the enum to capture the two types of actions to perform on a metric. type ConfigAction string -// OperationAction is the enum to capture the thress types of actions to perform for an operation. -type OperationAction string - -// AggregationType os the enum to capture the three types of aggregation for the aggregation operation. -type AggregationType string - const ( // Insert adds a new metric to the batch with a new name. Insert ConfigAction = "insert" @@ -154,10 +141,24 @@ const ( // Combine combines multiple metrics into a single metric. Combine ConfigAction = "combine" +) - // ToggleScalarDataType changes the data type from int64 to double, or vice-versa - ToggleScalarDataType OperationAction = "toggle_scalar_data_type" +var Actions = []ConfigAction{Insert, Update, Combine} + +func (ca ConfigAction) isValid() bool { + for _, configAction := range Actions { + if ca == configAction { + return true + } + } + + return false +} + +// OperationAction is the enum to capture the thress types of actions to perform for an operation. +type OperationAction string +const ( // AddLabel adds a new label to an existing metric. AddLabel OperationAction = "add_label" @@ -167,6 +168,9 @@ const ( // DeleteLabelValue deletes a label value by also removing all the points associated with this label value DeleteLabelValue OperationAction = "delete_label_value" + // ToggleScalarDataType changes the data type from int64 to double, or vice-versa + ToggleScalarDataType OperationAction = "toggle_scalar_data_type" + // AggregateLabels aggregates away all labels other than the ones in Operation.LabelSet // by the method indicated by Operation.AggregationType. AggregateLabels OperationAction = "aggregate_labels" @@ -174,18 +178,68 @@ const ( // AggregateLabelValues aggregates away the values in Operation.AggregatedValues // by the method indicated by Operation.AggregationType. AggregateLabelValues OperationAction = "aggregate_label_values" +) - // Mean indicates taking the mean of the aggregated data. - Mean AggregationType = "mean" +var OperationActions = []OperationAction{AddLabel, UpdateLabel, DeleteLabelValue, ToggleScalarDataType, AggregateLabels, AggregateLabelValues} - // Max indicates taking the max of the aggregated data. - Max AggregationType = "max" +func (oa OperationAction) isValid() bool { + for _, operationAction := range OperationActions { + if oa == operationAction { + return true + } + } + + return false +} + +// AggregationType is the enum to capture the three types of aggregation for the aggregation operation. +type AggregationType string +const ( // Sum indicates taking the sum of the aggregated data. Sum AggregationType = "sum" + // Mean indicates taking the mean of the aggregated data. + Mean AggregationType = "mean" + // Min indicates taking the minimum of the aggregated data. Min AggregationType = "min" + + // Max indicates taking the max of the aggregated data. + Max AggregationType = "max" ) -var Actions = []ConfigAction{Insert, Update, Combine} +var AggregationTypes = []AggregationType{Sum, Mean, Min, Max} + +func (at AggregationType) isValid() bool { + for _, aggregationType := range AggregationTypes { + if at == aggregationType { + return true + } + } + + return false +} + +// MatchType is the enum to capture the two types of matching metric(s) that should have operations applied to them. +type MatchType string + +const ( + // StrictMatchType is the FilterType for filtering by exact string matches. + StrictMatchType MatchType = "strict" + + // RegexpMatchType is the FilterType for filtering by regexp string matches. + RegexpMatchType MatchType = "regexp" +) + +var MatchTypes = []MatchType{StrictMatchType, RegexpMatchType} + +func (mt MatchType) isValid() bool { + for _, matchType := range MatchTypes { + if mt == matchType { + return true + } + } + + return false +} diff --git a/processor/metricstransformprocessor/datapoint_aggregation.go b/processor/metricstransformprocessor/datapoint_aggregation.go index 2e5b6ce2d387..6117393ad039 100644 --- a/processor/metricstransformprocessor/datapoint_aggregation.go +++ b/processor/metricstransformprocessor/datapoint_aggregation.go @@ -51,11 +51,10 @@ func (mtp *metricsTransformProcessor) mergeTimeseries(groupedTimeseries map[stri // sortTimeseries performs an in place sort of a list of timeseries by start timestamp // Returns the sorted timeseries -func (mtp *metricsTransformProcessor) sortTimeseries(timeseries []*metricspb.TimeSeries) []*metricspb.TimeSeries { +func (mtp *metricsTransformProcessor) sortTimeseries(timeseries []*metricspb.TimeSeries) { sort.Slice(timeseries, func(i, j int) bool { return mtp.compareTimestamps(timeseries[i].StartTimestamp, timeseries[j].StartTimestamp) }) - return timeseries } // groupPointsByTimestamp groups points by timestamp diff --git a/processor/metricstransformprocessor/factory.go b/processor/metricstransformprocessor/factory.go index 5f1500f5cf69..8a59cd992932 100644 --- a/processor/metricstransformprocessor/factory.go +++ b/processor/metricstransformprocessor/factory.go @@ -82,18 +82,8 @@ func validateConfiguration(config *Config) error { return fmt.Errorf("cannot supply both %q and %q, use %q with %q match type", IncludeFieldName, MetricNameFieldName, IncludeFieldName, StrictMatchType) } - if transform.MetricIncludeFilter.MatchType != "" { - var validMatchType bool - for _, matchType := range MatchTypes { - if transform.MetricIncludeFilter.MatchType == matchType { - validMatchType = true - break - } - } - - if !validMatchType { - return fmt.Errorf("%q must be in %q", MatchTypeFieldName, MatchTypes) - } + if transform.MetricIncludeFilter.MatchType != "" && !transform.MetricIncludeFilter.MatchType.isValid() { + return fmt.Errorf("%q must be in %q", MatchTypeFieldName, MatchTypes) } if transform.MetricIncludeFilter.MatchType == RegexpMatchType { @@ -103,15 +93,7 @@ func validateConfiguration(config *Config) error { } } - var validAction bool - for _, action := range Actions { - if transform.Action == action { - validAction = true - break - } - } - - if !validAction { + if !transform.Action.isValid() { return fmt.Errorf("%q must be in %q", ActionFieldName, Actions) } @@ -119,15 +101,27 @@ func validateConfiguration(config *Config) error { return fmt.Errorf("missing required field %q while %q is %v", NewNameFieldName, ActionFieldName, Insert) } + if transform.AggregationType != "" && !transform.AggregationType.isValid() { + return fmt.Errorf("%q must be in %q", AggregationTypeFieldName, AggregationTypes) + } + for i, op := range transform.Operations { + if !op.Action.isValid() { + return fmt.Errorf("operation %v: %q must be in %q", i+1, ActionFieldName, OperationActions) + } + if op.Action == UpdateLabel && op.Label == "" { - return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", LabelFieldName, ActionFieldName, UpdateLabel, i) + return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, LabelFieldName, ActionFieldName, UpdateLabel) } if op.Action == AddLabel && op.NewLabel == "" { - return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", NewLabelFieldName, ActionFieldName, AddLabel, i) + return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, NewLabelFieldName, ActionFieldName, AddLabel) } if op.Action == AddLabel && op.NewValue == "" { - return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", NewValueFieldName, ActionFieldName, AddLabel, i) + return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, NewValueFieldName, ActionFieldName, AddLabel) + } + + if op.AggregationType != "" && !op.AggregationType.isValid() { + return fmt.Errorf("operation %v: %q must be in %q", i+1, AggregationTypeFieldName, AggregationTypes) } } } diff --git a/processor/metricstransformprocessor/factory_test.go b/processor/metricstransformprocessor/factory_test.go index 21e8894d5007..6a7bc0f6d218 100644 --- a/processor/metricstransformprocessor/factory_test.go +++ b/processor/metricstransformprocessor/factory_test.go @@ -87,13 +87,28 @@ func TestCreateProcessors(t *testing.T) { { configName: "config_invalid_label.yaml", succeed: false, - errorMessage: fmt.Sprintf("missing required field %q while %q is %v in the %vth operation", LabelFieldName, ActionFieldName, UpdateLabel, 0), + errorMessage: fmt.Sprintf("operation %v: missing required field %q while %q is %v", 1, LabelFieldName, ActionFieldName, UpdateLabel), }, { configName: "config_invalid_regexp.yaml", succeed: false, errorMessage: fmt.Sprintf("%q, error parsing regexp: missing closing ]: `[\\da`", IncludeFieldName), }, + { + configName: "config_invalid_aggregationtype.yaml", + succeed: false, + errorMessage: fmt.Sprintf("%q must be in %q", AggregationTypeFieldName, AggregationTypes), + }, + { + configName: "config_invalid_operation_action.yaml", + succeed: false, + errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, ActionFieldName, OperationActions), + }, + { + configName: "config_invalid_operation_aggregationtype.yaml", + succeed: false, + errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, AggregationTypeFieldName, AggregationTypes), + }, } for _, test := range tests { @@ -148,7 +163,7 @@ func TestFactory_validateConfiguration(t *testing.T) { }, } err := validateConfiguration(&v1) - assert.Equal(t, "missing required field \"new_label\" while \"action\" is add_label in the 0th operation", err.Error()) + assert.Equal(t, "operation 1: missing required field \"new_label\" while \"action\" is add_label", err.Error()) v2 := Config{ Transforms: []Transform{ @@ -166,7 +181,7 @@ func TestFactory_validateConfiguration(t *testing.T) { } err = validateConfiguration(&v2) - assert.Equal(t, "missing required field \"new_value\" while \"action\" is add_label in the 0th operation", err.Error()) + assert.Equal(t, "operation 1: missing required field \"new_value\" while \"action\" is add_label", err.Error()) } func TestCreateProcessorsFilledData(t *testing.T) { diff --git a/processor/metricstransformprocessor/metrics_transform_processor.go b/processor/metricstransformprocessor/metrics_transform_processor.go index 093bbc2d6fcd..7614ef12b24a 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor.go +++ b/processor/metricstransformprocessor/metrics_transform_processor.go @@ -254,7 +254,9 @@ func (mtp *metricsTransformProcessor) combine(matchedMetrics []*match, transform groupedTimeseries := mtp.groupTimeseries(allTimeseries, len(combinedMetric.MetricDescriptor.LabelKeys)) aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, transform.AggregationType, combinedMetric.MetricDescriptor.Type) - combinedMetric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries) + + mtp.sortTimeseries(aggregatedTimeseries) + combinedMetric.Timeseries = aggregatedTimeseries return combinedMetric } diff --git a/processor/metricstransformprocessor/operation_aggregate_label_values.go b/processor/metricstransformprocessor/operation_aggregate_label_values.go index dd8462f6e150..c44cfa61767d 100644 --- a/processor/metricstransformprocessor/operation_aggregate_label_values.go +++ b/processor/metricstransformprocessor/operation_aggregate_label_values.go @@ -35,7 +35,8 @@ func (mtp *metricsTransformProcessor) aggregateLabelValuesOp(metric *metricspb.M aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, mtpOp.configOperation.AggregationType, metric.MetricDescriptor.Type) aggregatedTimeseries = append(aggregatedTimeseries, unchangedTimeseries...) - metric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries) + mtp.sortTimeseries(aggregatedTimeseries) + metric.Timeseries = aggregatedTimeseries } // groupTimeseriesByNewLabelValue groups all timeseries in the metric that will be aggregated together based on the entire label values after replacing the aggregatedValues by newValue. diff --git a/processor/metricstransformprocessor/operation_aggregate_labels.go b/processor/metricstransformprocessor/operation_aggregate_labels.go index d77a1f909b6b..06205f5b2cc1 100644 --- a/processor/metricstransformprocessor/operation_aggregate_labels.go +++ b/processor/metricstransformprocessor/operation_aggregate_labels.go @@ -28,7 +28,9 @@ func (mtp *metricsTransformProcessor) aggregateLabelsOp(metric *metricspb.Metric aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, mtpOp.configOperation.AggregationType, metric.MetricDescriptor.Type) metric.MetricDescriptor.LabelKeys = labels - metric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries) + + mtp.sortTimeseries(aggregatedTimeseries) + metric.Timeseries = aggregatedTimeseries } // groupTimeseries groups all the provided timeseries that will be aggregated together based on all the label values. diff --git a/processor/metricstransformprocessor/testdata/config_invalid_aggregationtype.yaml b/processor/metricstransformprocessor/testdata/config_invalid_aggregationtype.yaml new file mode 100644 index 000000000000..6abc7f22d436 --- /dev/null +++ b/processor/metricstransformprocessor/testdata/config_invalid_aggregationtype.yaml @@ -0,0 +1,24 @@ +receivers: + examplereceiver: + +processors: + metricstransform: + transforms: + - include: old_name + action: combine + new_name: new_name + aggregation_type: invalid + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] + metrics: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] \ No newline at end of file diff --git a/processor/metricstransformprocessor/testdata/config_invalid_operation_action.yaml b/processor/metricstransformprocessor/testdata/config_invalid_operation_action.yaml new file mode 100644 index 000000000000..c4800372b67e --- /dev/null +++ b/processor/metricstransformprocessor/testdata/config_invalid_operation_action.yaml @@ -0,0 +1,24 @@ +receivers: + examplereceiver: + +processors: + metricstransform: + transforms: + - include: old_name + action: update + operations: + - action: invalid + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] + metrics: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] \ No newline at end of file diff --git a/processor/metricstransformprocessor/testdata/config_invalid_operation_aggregationtype.yaml b/processor/metricstransformprocessor/testdata/config_invalid_operation_aggregationtype.yaml new file mode 100644 index 000000000000..6ef28a41d953 --- /dev/null +++ b/processor/metricstransformprocessor/testdata/config_invalid_operation_aggregationtype.yaml @@ -0,0 +1,25 @@ +receivers: + examplereceiver: + +processors: + metricstransform: + transforms: + - include: old_name + action: update + operations: + - action: aggregate_labels + aggregation_type: invalid + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] + metrics: + receivers: [examplereceiver] + processors: [metricstransform] + exporters: [exampleexporter] \ No newline at end of file