Skip to content

Commit

Permalink
Add new filters allowing FLP-based dedup (#640)
Browse files Browse the repository at this point in the history
* Add new filters allowing FLP-based dedup

- New "remove_entry_all_satisfied" filter type: entry is removed only if
  all the conditions (represented by nested rules) are satisfied. This
allows to have logical AND in filter conditions, whereas previously it
was only possible to have logical OR

- New "conditional_sampling" filter type: allows to have random sampling
  based on conditions. For example, a flow matching conditions A and B
may have a sampling ratio of 1:10 whereas a flow matching condition C
has 1:100 sampling and all other flows are 1:1

- Introduced a "preprocess" function on rules; currently it's only used
  to be able to cast the `Value interface{}` as an int (otherwise it
comes as a float64); but could be also used in the future for other
purpose, e.g. regex pre-compiling

- Add tests

* Fix tests
  • Loading branch information
jotak authored Jun 26, 2024
1 parent 14e9b59 commit 3139b30
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 112 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ pipeline:
filter:
rules:
- type: remove_entry_if_exists
removeEntryIfExists:
removeEntry:
input: SrcPort
```

Expand Down
40 changes: 30 additions & 10 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,40 @@ Following is the supported API format for filter transformations:
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
add_regex_if: add output field if input field satisfies regex pattern from parameters field
add_label: add (input) field to list of labels with value taken from Value field (key=input, value=value)
add_label_if: add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
conditional_sampling: define conditional sampling rules
removeField: configuration for remove_field rule
input: entry input field
value: specified value of input field:
removeEntryIfExists: configuration for remove_entry_if_exists rule
input: entry input field
value: specified value of input field:
removeEntryIfDoesntExist: configuration for remove_entry_if_doesnt_exist rule
input: entry input field
value: specified value of input field:
removeEntryIfEqual: configuration for remove_entry_if_equal rule
input: entry input field
value: specified value of input field:
removeEntryIfNotEqual: configuration for remove_entry_if_not_equal rule
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
removeEntryAllSatisfied: configuration for remove_entry_all_satisfied rule
type: (enum) one of the following:
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addField: configuration for add_field rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addFieldIfDoesntExist: configuration for add_field_if_doesnt_exist rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addFieldIf: configuration for add_field_if rule
input: entry input field
output: entry output field
Expand All @@ -199,11 +206,24 @@ Following is the supported API format for filter transformations:
addLabel: configuration for add_label rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addLabelIf: configuration for add_label_if rule
input: entry input field
output: entry output field
parameters: parameters specific to type
assignee: value needs to assign to output field
conditionalSampling: sampling configuration rules
value: sampling value: 1 flow on <sampling> is kept
rules: rules to be satisfied for this sampling configuration
type: (enum) one of the following:
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
</pre>
## Transform Network API
Following is the supported API format for network transformations:
Expand Down
84 changes: 70 additions & 14 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

func (tf *TransformFilter) Preprocess() {
for i := range tf.Rules {
tf.Rules[i].preprocess()
}
}

type TransformFilterEnum string

const (
Expand All @@ -30,32 +36,66 @@ const (
RemoveEntryIfDoesntExist TransformFilterEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
AddRegExIf TransformFilterEnum = "add_regex_if" // add output field if input field satisfies regex pattern from parameters field
AddLabel TransformFilterEnum = "add_label" // add (input) field to list of labels with value taken from Value field (key=input, value=value)
AddLabelIf TransformFilterEnum = "add_label_if" // add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
ConditionalSampling TransformFilterEnum = "conditional_sampling" // define conditional sampling rules
)

type TransformFilterRemoveEntryEnum string

const (
RemoveEntryIfExistsD TransformFilterRemoveEntryEnum = "remove_entry_if_exists" // removes the entry if the field exists
RemoveEntryIfDoesntExistD TransformFilterRemoveEntryEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist
RemoveEntryIfEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
)

type TransformFilterRule struct {
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntryIfExists *TransformFilterGenericRule `yaml:"removeEntryIfExists,omitempty" json:"removeEntryIfExists,omitempty" doc:"configuration for remove_entry_if_exists rule"`
RemoveEntryIfDoesntExist *TransformFilterGenericRule `yaml:"removeEntryIfDoesntExist,omitempty" json:"removeEntryIfDoesntExist,omitempty" doc:"configuration for remove_entry_if_doesnt_exist rule"`
RemoveEntryIfEqual *TransformFilterGenericRule `yaml:"removeEntryIfEqual,omitempty" json:"removeEntryIfEqual,omitempty" doc:"configuration for remove_entry_if_equal rule"`
RemoveEntryIfNotEqual *TransformFilterGenericRule `yaml:"removeEntryIfNotEqual,omitempty" json:"removeEntryIfNotEqual,omitempty" doc:"configuration for remove_entry_if_not_equal rule"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"`
AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"`
AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"`
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"`
AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"`
AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"`
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
}

func (r *TransformFilterRule) preprocess() {
if r.RemoveField != nil {
r.RemoveField.preprocess()
}
if r.RemoveEntry != nil {
r.RemoveEntry.preprocess()
}
for i := range r.RemoveEntryAllSatisfied {
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
}
for i := range r.ConditionalSampling {
r.ConditionalSampling[i].preprocess()
}
}

type TransformFilterGenericRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
}

func (r *TransformFilterGenericRule) preprocess() {
if r.CastInt {
if f, ok := r.Value.(float64); ok {
r.Value = int(f)
}
}
}

type TransformFilterRuleWithAssignee struct {
Expand All @@ -64,3 +104,19 @@ type TransformFilterRuleWithAssignee struct {
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
}

type RemoveEntryRule struct {
Type TransformFilterRemoveEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
}

type SamplingCondition struct {
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
}

func (s *SamplingCondition) preprocess() {
for i := range s.Rules {
s.Rules[i].RemoveEntry.preprocess()
}
}
12 changes: 6 additions & 6 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestGRPCPipeline(t *testing.T) {
pl := NewGRPCPipeline("grpc", api.IngestGRPCProto{Port: 9050, BufferLen: 50})
pl = pl.TransformFilter("filter", api.TransformFilter{
Rules: []api.TransformFilterRule{{
Type: "remove_entry_if_doesnt_exist",
RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
Type: "remove_entry_if_doesnt_exist",
RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
}},
})
pl = pl.WriteStdout("stdout", api.WriteStdout{Format: "json"})
Expand All @@ -90,7 +90,7 @@ func TestGRPCPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))
require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntry":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand All @@ -110,8 +110,8 @@ func TestKafkaPromPipeline(t *testing.T) {
})
pl = pl.TransformFilter("filter", api.TransformFilter{
Rules: []api.TransformFilterRule{{
Type: "remove_entry_if_doesnt_exist",
RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
Type: "remove_entry_if_doesnt_exist",
RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
}},
})
pl = pl.ConnTrack("conntrack", api.ConnTrack{
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))
require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntry":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 3139b30

Please sign in to comment.