Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Nov 17, 2020
1 parent dc66376 commit 7f4a58a
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 56 deletions.
102 changes: 78 additions & 24 deletions processor/metricstransformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// NewNameFieldName is the mapstructure field name for NewName field
NewNameFieldName = "new_name"

// AggregationTypeFieldName is the mapstructure field name for AggregationType field
AggregationTypeFieldName = "aggregation_type"

// LabelFieldName is the mapstructure field name for Label field
LabelFieldName = "label"

Expand All @@ -44,16 +47,6 @@ const (
NewValueFieldName = "new_value"
)

const (
// StrictMatchType is the FilterType for filtering by exact string matches.
StrictMatchType = "strict"

// RegexpMatchType is the FilterType for filtering by regexp string matches.
RegexpMatchType = "regexp"
)

var MatchTypes = []string{StrictMatchType, RegexpMatchType}

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Expand Down Expand Up @@ -93,7 +86,7 @@ type FilterConfig struct {
Include string `mapstructure:"include"`

// MatchType determines how the Include string is matched: <strict|regexp>.
MatchType string `mapstructure:"match_type"`
MatchType MatchType `mapstructure:"match_type"`
}

// Operation defines the specific operation performed on the selected metrics.
Expand Down Expand Up @@ -139,12 +132,6 @@ type ValueAction struct {
// ConfigAction is the enum to capture the two types of actions to perform on a metric.
type ConfigAction string

// OperationAction is the enum to capture the thress types of actions to perform for an operation.
type OperationAction string

// AggregationType os the enum to capture the three types of aggregation for the aggregation operation.
type AggregationType string

const (
// Insert adds a new metric to the batch with a new name.
Insert ConfigAction = "insert"
Expand All @@ -154,10 +141,24 @@ const (

// Combine combines multiple metrics into a single metric.
Combine ConfigAction = "combine"
)

// ToggleScalarDataType changes the data type from int64 to double, or vice-versa
ToggleScalarDataType OperationAction = "toggle_scalar_data_type"
var Actions = []ConfigAction{Insert, Update, Combine}

func (ca ConfigAction) isValid() bool {
for _, configAction := range Actions {
if ca == configAction {
return true
}
}

return false
}

// OperationAction is the enum to capture the thress types of actions to perform for an operation.
type OperationAction string

const (
// AddLabel adds a new label to an existing metric.
AddLabel OperationAction = "add_label"

Expand All @@ -167,25 +168,78 @@ const (
// DeleteLabelValue deletes a label value by also removing all the points associated with this label value
DeleteLabelValue OperationAction = "delete_label_value"

// ToggleScalarDataType changes the data type from int64 to double, or vice-versa
ToggleScalarDataType OperationAction = "toggle_scalar_data_type"

// AggregateLabels aggregates away all labels other than the ones in Operation.LabelSet
// by the method indicated by Operation.AggregationType.
AggregateLabels OperationAction = "aggregate_labels"

// AggregateLabelValues aggregates away the values in Operation.AggregatedValues
// by the method indicated by Operation.AggregationType.
AggregateLabelValues OperationAction = "aggregate_label_values"
)

// Mean indicates taking the mean of the aggregated data.
Mean AggregationType = "mean"
var OperationActions = []OperationAction{AddLabel, UpdateLabel, DeleteLabelValue, ToggleScalarDataType, AggregateLabels, AggregateLabelValues}

// Max indicates taking the max of the aggregated data.
Max AggregationType = "max"
func (oa OperationAction) isValid() bool {
for _, operationAction := range OperationActions {
if oa == operationAction {
return true
}
}

return false
}

// AggregationType is the enum to capture the three types of aggregation for the aggregation operation.
type AggregationType string

const (
// Sum indicates taking the sum of the aggregated data.
Sum AggregationType = "sum"

// Mean indicates taking the mean of the aggregated data.
Mean AggregationType = "mean"

// Min indicates taking the minimum of the aggregated data.
Min AggregationType = "min"

// Max indicates taking the max of the aggregated data.
Max AggregationType = "max"
)

var Actions = []ConfigAction{Insert, Update, Combine}
var AggregationTypes = []AggregationType{Sum, Mean, Min, Max}

func (at AggregationType) isValid() bool {
for _, aggregationType := range AggregationTypes {
if at == aggregationType {
return true
}
}

return false
}

// MatchType is the enum to capture the two types of matching metric(s) that should have operations applied to them.
type MatchType string

const (
// StrictMatchType is the FilterType for filtering by exact string matches.
StrictMatchType MatchType = "strict"

// RegexpMatchType is the FilterType for filtering by regexp string matches.
RegexpMatchType MatchType = "regexp"
)

var MatchTypes = []MatchType{StrictMatchType, RegexpMatchType}

func (mt MatchType) isValid() bool {
for _, matchType := range MatchTypes {
if mt == matchType {
return true
}
}

return false
}
3 changes: 1 addition & 2 deletions processor/metricstransformprocessor/datapoint_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ func (mtp *metricsTransformProcessor) mergeTimeseries(groupedTimeseries map[stri

// sortTimeseries performs an in place sort of a list of timeseries by start timestamp
// Returns the sorted timeseries
func (mtp *metricsTransformProcessor) sortTimeseries(timeseries []*metricspb.TimeSeries) []*metricspb.TimeSeries {
func (mtp *metricsTransformProcessor) sortTimeseries(timeseries []*metricspb.TimeSeries) {
sort.Slice(timeseries, func(i, j int) bool {
return mtp.compareTimestamps(timeseries[i].StartTimestamp, timeseries[j].StartTimestamp)
})
return timeseries
}

// groupPointsByTimestamp groups points by timestamp
Expand Down
42 changes: 18 additions & 24 deletions processor/metricstransformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,8 @@ func validateConfiguration(config *Config) error {
return fmt.Errorf("cannot supply both %q and %q, use %q with %q match type", IncludeFieldName, MetricNameFieldName, IncludeFieldName, StrictMatchType)
}

if transform.MetricIncludeFilter.MatchType != "" {
var validMatchType bool
for _, matchType := range MatchTypes {
if transform.MetricIncludeFilter.MatchType == matchType {
validMatchType = true
break
}
}

if !validMatchType {
return fmt.Errorf("%q must be in %q", MatchTypeFieldName, MatchTypes)
}
if transform.MetricIncludeFilter.MatchType != "" && !transform.MetricIncludeFilter.MatchType.isValid() {
return fmt.Errorf("%q must be in %q", MatchTypeFieldName, MatchTypes)
}

if transform.MetricIncludeFilter.MatchType == RegexpMatchType {
Expand All @@ -103,31 +93,35 @@ func validateConfiguration(config *Config) error {
}
}

var validAction bool
for _, action := range Actions {
if transform.Action == action {
validAction = true
break
}
}

if !validAction {
if !transform.Action.isValid() {
return fmt.Errorf("%q must be in %q", ActionFieldName, Actions)
}

if transform.Action == Insert && transform.NewName == "" {
return fmt.Errorf("missing required field %q while %q is %v", NewNameFieldName, ActionFieldName, Insert)
}

if transform.AggregationType != "" && !transform.AggregationType.isValid() {
return fmt.Errorf("%q must be in %q", AggregationTypeFieldName, AggregationTypes)
}

for i, op := range transform.Operations {
if !op.Action.isValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, ActionFieldName, OperationActions)
}

if op.Action == UpdateLabel && op.Label == "" {
return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", LabelFieldName, ActionFieldName, UpdateLabel, i)
return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, LabelFieldName, ActionFieldName, UpdateLabel)
}
if op.Action == AddLabel && op.NewLabel == "" {
return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", NewLabelFieldName, ActionFieldName, AddLabel, i)
return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, NewLabelFieldName, ActionFieldName, AddLabel)
}
if op.Action == AddLabel && op.NewValue == "" {
return fmt.Errorf("missing required field %q while %q is %v in the %vth operation", NewValueFieldName, ActionFieldName, AddLabel, i)
return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, NewValueFieldName, ActionFieldName, AddLabel)
}

if op.AggregationType != "" && !op.AggregationType.isValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, AggregationTypeFieldName, AggregationTypes)
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions processor/metricstransformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,28 @@ func TestCreateProcessors(t *testing.T) {
{
configName: "config_invalid_label.yaml",
succeed: false,
errorMessage: fmt.Sprintf("missing required field %q while %q is %v in the %vth operation", LabelFieldName, ActionFieldName, UpdateLabel, 0),
errorMessage: fmt.Sprintf("operation %v: missing required field %q while %q is %v", 1, LabelFieldName, ActionFieldName, UpdateLabel),
},
{
configName: "config_invalid_regexp.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q, error parsing regexp: missing closing ]: `[\\da`", IncludeFieldName),
},
{
configName: "config_invalid_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q must be in %q", AggregationTypeFieldName, AggregationTypes),
},
{
configName: "config_invalid_operation_action.yaml",
succeed: false,
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, ActionFieldName, OperationActions),
},
{
configName: "config_invalid_operation_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, AggregationTypeFieldName, AggregationTypes),
},
}

for _, test := range tests {
Expand Down Expand Up @@ -148,7 +163,7 @@ func TestFactory_validateConfiguration(t *testing.T) {
},
}
err := validateConfiguration(&v1)
assert.Equal(t, "missing required field \"new_label\" while \"action\" is add_label in the 0th operation", err.Error())
assert.Equal(t, "operation 1: missing required field \"new_label\" while \"action\" is add_label", err.Error())

v2 := Config{
Transforms: []Transform{
Expand All @@ -166,7 +181,7 @@ func TestFactory_validateConfiguration(t *testing.T) {
}

err = validateConfiguration(&v2)
assert.Equal(t, "missing required field \"new_value\" while \"action\" is add_label in the 0th operation", err.Error())
assert.Equal(t, "operation 1: missing required field \"new_value\" while \"action\" is add_label", err.Error())
}

func TestCreateProcessorsFilledData(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ func (mtp *metricsTransformProcessor) combine(matchedMetrics []*match, transform

groupedTimeseries := mtp.groupTimeseries(allTimeseries, len(combinedMetric.MetricDescriptor.LabelKeys))
aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, transform.AggregationType, combinedMetric.MetricDescriptor.Type)
combinedMetric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries)

mtp.sortTimeseries(aggregatedTimeseries)
combinedMetric.Timeseries = aggregatedTimeseries

return combinedMetric
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (mtp *metricsTransformProcessor) aggregateLabelValuesOp(metric *metricspb.M
aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, mtpOp.configOperation.AggregationType, metric.MetricDescriptor.Type)
aggregatedTimeseries = append(aggregatedTimeseries, unchangedTimeseries...)

metric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries)
mtp.sortTimeseries(aggregatedTimeseries)
metric.Timeseries = aggregatedTimeseries
}

// groupTimeseriesByNewLabelValue groups all timeseries in the metric that will be aggregated together based on the entire label values after replacing the aggregatedValues by newValue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func (mtp *metricsTransformProcessor) aggregateLabelsOp(metric *metricspb.Metric
aggregatedTimeseries := mtp.mergeTimeseries(groupedTimeseries, mtpOp.configOperation.AggregationType, metric.MetricDescriptor.Type)

metric.MetricDescriptor.LabelKeys = labels
metric.Timeseries = mtp.sortTimeseries(aggregatedTimeseries)

mtp.sortTimeseries(aggregatedTimeseries)
metric.Timeseries = aggregatedTimeseries
}

// groupTimeseries groups all the provided timeseries that will be aggregated together based on all the label values.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
receivers:
examplereceiver:

processors:
metricstransform:
transforms:
- include: old_name
action: combine
new_name: new_name
aggregation_type: invalid

exporters:
exampleexporter:

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [metricstransform]
exporters: [exampleexporter]
metrics:
receivers: [examplereceiver]
processors: [metricstransform]
exporters: [exampleexporter]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
receivers:
examplereceiver:

processors:
metricstransform:
transforms:
- include: old_name
action: update
operations:
- action: invalid

exporters:
exampleexporter:

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [metricstransform]
exporters: [exampleexporter]
metrics:
receivers: [examplereceiver]
processors: [metricstransform]
exporters: [exampleexporter]
Loading

0 comments on commit 7f4a58a

Please sign in to comment.