From 3139b300cd35f1550424f412de3a1c6af2312b81 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Wed, 26 Jun 2024 13:04:24 +0200 Subject: [PATCH] Add new filters allowing FLP-based dedup (#640) * Add new filters allowing FLP-based dedup - New "remove_entry_all_satisfied" filter type: entry is removed only if all the conditions (represented by nested rules) are satisfied. This allows to have logical AND in filter conditions, whereas previously it was only possible to have logical OR - New "conditional_sampling" filter type: allows to have random sampling based on conditions. For example, a flow matching conditions A and B may have a sampling ratio of 1:10 whereas a flow matching condition C has 1:100 sampling and all other flows are 1:1 - Introduced a "preprocess" function on rules; currently it's only used to be able to cast the `Value interface{}` as an int (otherwise it comes as a float64); but could be also used in the future for other purpose, e.g. regex pre-compiling - Add tests * Fix tests --- README.md | 2 +- docs/api.md | 40 +++- pkg/api/transform_filter.go | 84 ++++++-- pkg/config/pipeline_builder_test.go | 12 +- pkg/pipeline/transform/transform_filter.go | 188 +++++++++++------- .../transform/transform_filter_test.go | 130 +++++++++++- 6 files changed, 344 insertions(+), 112 deletions(-) diff --git a/README.md b/README.md index 61e3efccb..76a42c053 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ pipeline: filter: rules: - type: remove_entry_if_exists - removeEntryIfExists: + removeEntry: input: SrcPort ``` diff --git a/docs/api.md b/docs/api.md index f4d096f52..1be2f395a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -159,33 +159,40 @@ Following is the supported API format for filter transformations: remove_entry_if_doesnt_exist: removes the entry if the field does not exist remove_entry_if_equal: removes the entry if the field value equals specified value remove_entry_if_not_equal: removes the entry if the field value does not equal specified value + remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value) add_field_if_doesnt_exist: adds a field to the entry if the field does not exist add_field_if: add output field set to assignee if input field satisfies criteria from parameters field add_regex_if: add output field if input field satisfies regex pattern from parameters field add_label: add (input) field to list of labels with value taken from Value field (key=input, value=value) add_label_if: add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field + conditional_sampling: define conditional sampling rules removeField: configuration for remove_field rule input: entry input field value: specified value of input field: - removeEntryIfExists: configuration for remove_entry_if_exists rule - input: entry input field - value: specified value of input field: - removeEntryIfDoesntExist: configuration for remove_entry_if_doesnt_exist rule - input: entry input field - value: specified value of input field: - removeEntryIfEqual: configuration for remove_entry_if_equal rule - input: entry input field - value: specified value of input field: - removeEntryIfNotEqual: configuration for remove_entry_if_not_equal rule + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) + removeEntry: configuration for remove_entry_* rules input: entry input field value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) + removeEntryAllSatisfied: configuration for remove_entry_all_satisfied rule + type: (enum) one of the following: + remove_entry_if_exists: removes the entry if the field exists + remove_entry_if_doesnt_exist: removes the entry if the field does not exist + remove_entry_if_equal: removes the entry if the field value equals specified value + remove_entry_if_not_equal: removes the entry if the field value does not equal specified value + removeEntry: configuration for remove_entry_* rules + input: entry input field + value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) addField: configuration for add_field rule input: entry input field value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) addFieldIfDoesntExist: configuration for add_field_if_doesnt_exist rule input: entry input field value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) addFieldIf: configuration for add_field_if rule input: entry input field output: entry output field @@ -199,11 +206,24 @@ Following is the supported API format for filter transformations: addLabel: configuration for add_label rule input: entry input field value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) addLabelIf: configuration for add_label_if rule input: entry input field output: entry output field parameters: parameters specific to type assignee: value needs to assign to output field + conditionalSampling: sampling configuration rules + value: sampling value: 1 flow on is kept + rules: rules to be satisfied for this sampling configuration + type: (enum) one of the following: + remove_entry_if_exists: removes the entry if the field exists + remove_entry_if_doesnt_exist: removes the entry if the field does not exist + remove_entry_if_equal: removes the entry if the field value equals specified value + remove_entry_if_not_equal: removes the entry if the field value does not equal specified value + removeEntry: configuration for remove_entry_* rules + input: entry input field + value: specified value of input field: + castInt: set true to cast the value field as an int (numeric values are float64 otherwise) ## Transform Network API Following is the supported API format for network transformations: diff --git a/pkg/api/transform_filter.go b/pkg/api/transform_filter.go index f2b995fde..5cc84a755 100644 --- a/pkg/api/transform_filter.go +++ b/pkg/api/transform_filter.go @@ -21,6 +21,12 @@ type TransformFilter struct { Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"` } +func (tf *TransformFilter) Preprocess() { + for i := range tf.Rules { + tf.Rules[i].preprocess() + } +} + type TransformFilterEnum string const ( @@ -30,32 +36,66 @@ const ( RemoveEntryIfDoesntExist TransformFilterEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value + RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value) AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field AddRegExIf TransformFilterEnum = "add_regex_if" // add output field if input field satisfies regex pattern from parameters field AddLabel TransformFilterEnum = "add_label" // add (input) field to list of labels with value taken from Value field (key=input, value=value) AddLabelIf TransformFilterEnum = "add_label_if" // add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field + ConditionalSampling TransformFilterEnum = "conditional_sampling" // define conditional sampling rules +) + +type TransformFilterRemoveEntryEnum string + +const ( + RemoveEntryIfExistsD TransformFilterRemoveEntryEnum = "remove_entry_if_exists" // removes the entry if the field exists + RemoveEntryIfDoesntExistD TransformFilterRemoveEntryEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist + RemoveEntryIfEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value + RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value ) type TransformFilterRule struct { - Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"` - RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"` - RemoveEntryIfExists *TransformFilterGenericRule `yaml:"removeEntryIfExists,omitempty" json:"removeEntryIfExists,omitempty" doc:"configuration for remove_entry_if_exists rule"` - RemoveEntryIfDoesntExist *TransformFilterGenericRule `yaml:"removeEntryIfDoesntExist,omitempty" json:"removeEntryIfDoesntExist,omitempty" doc:"configuration for remove_entry_if_doesnt_exist rule"` - RemoveEntryIfEqual *TransformFilterGenericRule `yaml:"removeEntryIfEqual,omitempty" json:"removeEntryIfEqual,omitempty" doc:"configuration for remove_entry_if_equal rule"` - RemoveEntryIfNotEqual *TransformFilterGenericRule `yaml:"removeEntryIfNotEqual,omitempty" json:"removeEntryIfNotEqual,omitempty" doc:"configuration for remove_entry_if_not_equal rule"` - AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"` - AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"` - AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"` - AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"` - AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"` - AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"` + Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"` + RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"` + RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"` + RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"` + AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"` + AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"` + AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"` + AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"` + AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"` + AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"` + ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"` +} + +func (r *TransformFilterRule) preprocess() { + if r.RemoveField != nil { + r.RemoveField.preprocess() + } + if r.RemoveEntry != nil { + r.RemoveEntry.preprocess() + } + for i := range r.RemoveEntryAllSatisfied { + r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess() + } + for i := range r.ConditionalSampling { + r.ConditionalSampling[i].preprocess() + } } type TransformFilterGenericRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` - Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"` + CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"` +} + +func (r *TransformFilterGenericRule) preprocess() { + if r.CastInt { + if f, ok := r.Value.(float64); ok { + r.Value = int(f) + } + } } type TransformFilterRuleWithAssignee struct { @@ -64,3 +104,19 @@ type TransformFilterRuleWithAssignee struct { Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"` Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` } + +type RemoveEntryRule struct { + Type TransformFilterRemoveEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"` + RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"` +} + +type SamplingCondition struct { + Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on is kept"` + Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"` +} + +func (s *SamplingCondition) preprocess() { + for i := range s.Rules { + s.Rules[i].RemoveEntry.preprocess() + } +} diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 568006dec..6dc06df81 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -69,8 +69,8 @@ func TestGRPCPipeline(t *testing.T) { pl := NewGRPCPipeline("grpc", api.IngestGRPCProto{Port: 9050, BufferLen: 50}) pl = pl.TransformFilter("filter", api.TransformFilter{ Rules: []api.TransformFilterRule{{ - Type: "remove_entry_if_doesnt_exist", - RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, + Type: "remove_entry_if_doesnt_exist", + RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, }}, }) pl = pl.WriteStdout("stdout", api.WriteStdout{Format: "json"}) @@ -90,7 +90,7 @@ func TestGRPCPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntry":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -110,8 +110,8 @@ func TestKafkaPromPipeline(t *testing.T) { }) pl = pl.TransformFilter("filter", api.TransformFilter{ Rules: []api.TransformFilterRule{{ - Type: "remove_entry_if_doesnt_exist", - RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, + Type: "remove_entry_if_doesnt_exist", + RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, }}, }) pl = pl.ConnTrack("conntrack", api.ConnTrack{ @@ -158,7 +158,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntry":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/pipeline/transform/transform_filter.go b/pkg/pipeline/transform/transform_filter.go index db233da4d..96a200897 100644 --- a/pkg/pipeline/transform/transform_filter.go +++ b/pkg/pipeline/transform/transform_filter.go @@ -19,8 +19,10 @@ package transform import ( "fmt" + "math/rand" "regexp" "strings" + "time" "github.com/Knetic/govaluate" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -29,91 +31,24 @@ import ( "github.com/sirupsen/logrus" ) -var tlog = logrus.WithField("component", "transform.Filter") +var ( + tlog = logrus.WithField("component", "transform.Filter") + rndgen = rand.New(rand.NewSource(time.Now().UnixNano())) +) type Filter struct { Rules []api.TransformFilterRule } -// Transform transforms a flow -// -//nolint:cyclop +// Transform transforms a flow; if false is returned as a second argument, the entry is dropped func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) { tlog.Tracef("f = %v", f) outputEntry := entry.Copy() labels := make(map[string]string) - for _, rule := range f.Rules { - tlog.Tracef("rule = %v", rule) - switch rule.Type { - case api.RemoveField: - delete(outputEntry, rule.RemoveField.Input) - case api.RemoveEntryIfExists: - if _, ok := entry[rule.RemoveEntryIfExists.Input]; ok { - return nil, false - } - case api.RemoveEntryIfDoesntExist: - if _, ok := entry[rule.RemoveEntryIfDoesntExist.Input]; !ok { - return nil, false - } - case api.RemoveEntryIfEqual: - if val, ok := entry[rule.RemoveEntryIfEqual.Input]; ok { - if val == rule.RemoveEntryIfEqual.Value { - return nil, false - } - } - case api.RemoveEntryIfNotEqual: - if val, ok := entry[rule.RemoveEntryIfNotEqual.Input]; ok { - if val != rule.RemoveEntryIfNotEqual.Value { - return nil, false - } - } - case api.AddField: - outputEntry[rule.AddField.Input] = rule.AddField.Value - case api.AddFieldIfDoesntExist: - if _, ok := entry[rule.AddFieldIfDoesntExist.Input]; !ok { - outputEntry[rule.AddFieldIfDoesntExist.Input] = rule.AddFieldIfDoesntExist.Value - } - case api.AddRegExIf: - matched, err := regexp.MatchString(rule.AddRegExIf.Parameters, fmt.Sprintf("%s", outputEntry[rule.AddRegExIf.Input])) - if err != nil { - continue - } - if matched { - outputEntry[rule.AddRegExIf.Output] = outputEntry[rule.AddRegExIf.Input] - outputEntry[rule.AddRegExIf.Output+"_Matched"] = true - } - case api.AddFieldIf: - expressionString := fmt.Sprintf("val %s", rule.AddFieldIf.Parameters) - expression, err := govaluate.NewEvaluableExpression(expressionString) - if err != nil { - log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err) - continue - } - result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.AddFieldIf.Input]}) - if evaluateErr == nil && result.(bool) { - if rule.AddFieldIf.Assignee != "" { - outputEntry[rule.AddFieldIf.Output] = rule.AddFieldIf.Assignee - } else { - outputEntry[rule.AddFieldIf.Output] = outputEntry[rule.AddFieldIf.Input] - } - outputEntry[rule.AddFieldIf.Output+"_Evaluate"] = true - } - case api.AddLabel: - labels[rule.AddLabel.Input], _ = utils.ConvertToString(rule.AddLabel.Value) - case api.AddLabelIf: - // TODO perhaps add a cache of previously evaluated expressions - expressionString := fmt.Sprintf("val %s", rule.AddLabelIf.Parameters) - expression, err := govaluate.NewEvaluableExpression(expressionString) - if err != nil { - log.Warningf("Can't evaluate AddLabelIf rule: %+v expression: %v. err %v", rule, expressionString, err) - continue - } - result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.AddLabelIf.Input]}) - if evaluateErr == nil && result.(bool) { - labels[rule.AddLabelIf.Output] = rule.AddLabelIf.Assignee - } - default: - tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule) + for i := range f.Rules { + tlog.Tracef("rule = %v", f.Rules[i]) + if cont := applyRule(outputEntry, labels, &f.Rules[i]); !cont { + return nil, false } } // process accumulated labels into comma separated string @@ -133,11 +68,112 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) { return outputEntry, true } +// Apply a rule. Returns false if it must stop processing rules (e.g. if entry must be removed) +// nolint:cyclop +func applyRule(entry config.GenericMap, labels map[string]string, rule *api.TransformFilterRule) bool { + switch rule.Type { + case api.RemoveField: + delete(entry, rule.RemoveField.Input) + case api.RemoveEntryIfExists: + if _, ok := entry[rule.RemoveEntry.Input]; ok { + return false + } + case api.RemoveEntryIfDoesntExist: + if _, ok := entry[rule.RemoveEntry.Input]; !ok { + return false + } + case api.RemoveEntryIfEqual: + if val, ok := entry[rule.RemoveEntry.Input]; ok { + if val == rule.RemoveEntry.Value { + return false + } + } + case api.RemoveEntryIfNotEqual: + if val, ok := entry[rule.RemoveEntry.Input]; ok { + if val != rule.RemoveEntry.Value { + return false + } + } + case api.AddField: + entry[rule.AddField.Input] = rule.AddField.Value + case api.AddFieldIfDoesntExist: + if _, ok := entry[rule.AddFieldIfDoesntExist.Input]; !ok { + entry[rule.AddFieldIfDoesntExist.Input] = rule.AddFieldIfDoesntExist.Value + } + case api.AddRegExIf: + matched, err := regexp.MatchString(rule.AddRegExIf.Parameters, fmt.Sprintf("%s", entry[rule.AddRegExIf.Input])) + if err != nil { + return true + } + if matched { + entry[rule.AddRegExIf.Output] = entry[rule.AddRegExIf.Input] + entry[rule.AddRegExIf.Output+"_Matched"] = true + } + case api.AddFieldIf: + expressionString := fmt.Sprintf("val %s", rule.AddFieldIf.Parameters) + expression, err := govaluate.NewEvaluableExpression(expressionString) + if err != nil { + log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err) + return true + } + result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": entry[rule.AddFieldIf.Input]}) + if evaluateErr == nil && result.(bool) { + if rule.AddFieldIf.Assignee != "" { + entry[rule.AddFieldIf.Output] = rule.AddFieldIf.Assignee + } else { + entry[rule.AddFieldIf.Output] = entry[rule.AddFieldIf.Input] + } + entry[rule.AddFieldIf.Output+"_Evaluate"] = true + } + case api.AddLabel: + labels[rule.AddLabel.Input], _ = utils.ConvertToString(rule.AddLabel.Value) + case api.AddLabelIf: + // TODO perhaps add a cache of previously evaluated expressions + expressionString := fmt.Sprintf("val %s", rule.AddLabelIf.Parameters) + expression, err := govaluate.NewEvaluableExpression(expressionString) + if err != nil { + log.Warningf("Can't evaluate AddLabelIf rule: %+v expression: %v. err %v", rule, expressionString, err) + return true + } + result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": entry[rule.AddLabelIf.Input]}) + if evaluateErr == nil && result.(bool) { + labels[rule.AddLabelIf.Output] = rule.AddLabelIf.Assignee + } + case api.RemoveEntryAllSatisfied: + return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied) + case api.ConditionalSampling: + return sample(entry, rule.ConditionalSampling) + default: + tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule) + } + return true +} + +func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRule) bool { + for _, r := range rules { + // applyRule returns false if the entry must be removed + if dontRemove := applyRule(entry, nil, &api.TransformFilterRule{Type: api.TransformFilterEnum(r.Type), RemoveEntry: r.RemoveEntry}); dontRemove { + return false + } + } + return true +} + +func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool { + for _, r := range rules { + if isRemoveEntrySatisfied(entry, r.Rules) { + return r.Value == 0 || (rndgen.Intn(int(r.Value)) == 0) + } + } + return true +} + // NewTransformFilter create a new filter transform func NewTransformFilter(params config.StageParam) (Transformer, error) { tlog.Debugf("entering NewTransformFilter") rules := []api.TransformFilterRule{} if params.Transform != nil && params.Transform.Filter != nil { + params.Transform.Filter.Preprocess() rules = params.Transform.Filter.Rules } transformFilter := &Filter{ diff --git a/pkg/pipeline/transform/transform_filter_test.go b/pkg/pipeline/transform/transform_filter_test.go index 6051e7d20..56c7c044f 100644 --- a/pkg/pipeline/transform/transform_filter_test.go +++ b/pkg/pipeline/transform/transform_filter_test.go @@ -23,6 +23,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -55,7 +56,7 @@ parameters: filter: rules: - type: remove_entry_if_exists - removeEntryIfExists: + removeEntry: input: srcPort ` @@ -71,7 +72,7 @@ parameters: filter: rules: - type: remove_entry_if_doesnt_exist - removeEntryIfDoesntExist: + removeEntry: input: doesntSrcPort ` @@ -86,11 +87,11 @@ parameters: filter: rules: - type: remove_entry_if_equal - removeEntryIfEqual: + removeEntry: input: message value: "test message" - type: remove_entry_if_equal - removeEntryIfEqual: + removeEntry: input: value value: 8.0 ` @@ -106,7 +107,7 @@ parameters: filter: rules: - type: remove_entry_if_not_equal - removeEntryIfNotEqual: + removeEntry: input: message value: "test message" ` @@ -545,3 +546,122 @@ func Test_AddField(t *testing.T) { require.Equal(t, "value1", output["field1"]) require.Equal(t, "new_value", output["param1"]) } + +func Test_Transform_RemoveEntryAllSatisfied(t *testing.T) { + newFilter := Filter{ + Rules: []api.TransformFilterRule{ + { + Type: api.RemoveEntryAllSatisfied, + RemoveEntryAllSatisfied: []*api.RemoveEntryRule{ + { + Type: api.RemoveEntryIfEqualD, + RemoveEntry: &api.TransformFilterGenericRule{ + Input: "FlowDirection", + Value: 1, + }, + }, + { + Type: api.RemoveEntryIfExistsD, + RemoveEntry: &api.TransformFilterGenericRule{ + Input: "DstK8S_OwnerName", + }, + }, + }, + }, + }, + } + + _, keep := newFilter.Transform(config.GenericMap{ + "FlowDirection": 0, + "SrcK8S_OwnerName": "my-deployment", + "DstK8S_OwnerName": "my-service", + "Bytes": 10, + }) // Ingress and internal => keep + require.True(t, keep) + + _, keep = newFilter.Transform(config.GenericMap{ + "FlowDirection": 1, + "SrcK8S_OwnerName": "my-deployment", + "DstK8S_OwnerName": "my-service", + "Bytes": 10, + }) // Egress and internal => remove + require.False(t, keep) + + _, keep = newFilter.Transform(config.GenericMap{ + "FlowDirection": 1, + "SrcK8S_OwnerName": "my-deployment", + "Bytes": 10, + }) // Egress and external => keep + require.True(t, keep) + + _, keep = newFilter.Transform(config.GenericMap{ + "FlowDirection": 0, + "DstK8S_OwnerName": "my-deployment", + "Bytes": 10, + }) // Ingress and external => keep + require.True(t, keep) +} + +func Test_Transform_Sampling(t *testing.T) { + newFilter := Filter{ + Rules: []api.TransformFilterRule{ + { + Type: api.ConditionalSampling, + ConditionalSampling: []*api.SamplingCondition{ + { + Rules: []*api.RemoveEntryRule{ + { + Type: api.RemoveEntryIfEqualD, + RemoveEntry: &api.TransformFilterGenericRule{ + Input: "FlowDirection", + Value: 1, + }, + }, + { + Type: api.RemoveEntryIfExistsD, + RemoveEntry: &api.TransformFilterGenericRule{ + Input: "DstK8S_OwnerName", + }, + }, + }, + Value: 10, + }, + }, + }, + }, + } + + input := []config.GenericMap{} + for i := 0; i < 1000; i++ { + input = append(input, config.GenericMap{ + "FlowDirection": 0, + "SrcK8S_OwnerName": "my-deployment", + "DstK8S_OwnerName": "my-service", + "Bytes": 10, + }) // Ingress and internal => always keep + input = append(input, config.GenericMap{ + "FlowDirection": 1, + "SrcK8S_OwnerName": "my-deployment", + "DstK8S_OwnerName": "my-service", + "Bytes": 10, + }) // Egress and internal => sample 1:10 + } + + countIngress := 0 + countEgress := 0 + + for _, flow := range input { + if out, ok := newFilter.Transform(flow); ok { + switch out["FlowDirection"] { + case 0: + countIngress++ + case 1: + countEgress++ + } + } + } + + assert.Equal(t, countIngress, 1000) + assert.Less(t, countEgress, 300) + assert.Greater(t, countEgress, 30) +}