From 1fed7c967ec92ed616dda23e6add6d9ba043f802 Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Tue, 14 Mar 2023 17:54:44 +0100 Subject: [PATCH] added more outputRecordTypes options --- api/v1beta1/flowcollector_types.go | 11 ++++--- .../flows.netobserv.io_flowcollectors.yaml | 7 +++-- .../flows.netobserv.io_flowcollectors.yaml | 7 +++-- .../consoleplugin/consoleplugin_objects.go | 12 ++----- .../flowlogspipeline/flp_common_objects.go | 12 +++---- docs/FlowCollector.md | 4 +-- pkg/helper/flowcollector.go | 31 +++++++++++++++++++ 7 files changed, 56 insertions(+), 28 deletions(-) diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go index 13e5aebba..e63bbc135 100644 --- a/api/v1beta1/flowcollector_types.go +++ b/api/v1beta1/flowcollector_types.go @@ -283,8 +283,10 @@ type FLPMetrics struct { } const ( - OutputRecordFlows = "FLOWS" - OutputRecordAll = "ALL" + OutputRecordFlows = "FLOWS" + OutputRecordConnections = "CONNECTIONS" + OutputRecordEndedConnections = "ENDED_CONNECTIONS" + OutputRecordAll = "ALL" ) // FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector @@ -360,9 +362,10 @@ type FlowCollectorFLP struct { KafkaConsumerBatchSize int `json:"kafkaConsumerBatchSize"` // outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export - // flowLogs, or "ALL" to generate both flowLogs and newConnection, heartbeat, endConnection events + // flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, endConnection events, "ENDED_CONNECTIONS" to generate + // only endConnection events or "ALL" to generate both flowLogs and connection events // +kubebuilder:validation:Optional - // +kubebuilder:validation:Enum:="FLOWS";"ALL" + // +kubebuilder:validation:Enum:="FLOWS";"CONNECTIONS";"ENDED_CONNECTIONS";"ALL" // +kubebuilder:default:=FLOWS OutputRecordTypes *string `json:"outputRecordTypes,omitempty"` diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index 80b9ff3b3..2af654d7b 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -4149,10 +4149,13 @@ spec: default: FLOWS description: outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export - flowLogs, or "ALL" to generate both flowLogs and newConnection, - heartbeat, endConnection events + flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, + endConnection events, "ENDED_CONNECTIONS" to generate only endConnection + events or "ALL" to generate both flowLogs and connection events enum: - FLOWS + - CONNECTIONS + - ENDED_CONNECTIONS - ALL type: string port: diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index 88f93a654..92e460c05 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -4136,10 +4136,13 @@ spec: default: FLOWS description: outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export - flowLogs, or "ALL" to generate both flowLogs and newConnection, - heartbeat, endConnection events + flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, + endConnection events, "ENDED_CONNECTIONS" to generate only endConnection + events or "ALL" to generate both flowLogs and connection events enum: - FLOWS + - CONNECTIONS + - ENDED_CONNECTIONS - ALL type: string port: diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index e8850e92e..ea838a54a 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -155,7 +155,7 @@ func buildArgs(desired *flowslatest.FlowCollectorSpec) []string { // check for connection traking to list indexes indexFields := constants.LokiIndexFields - if desired.Processor.OutputRecordTypes != nil && *desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll { + if desired.Processor.OutputRecordTypes != nil && *desired.Processor.OutputRecordTypes != flowslatest.OutputRecordFlows { indexFields = append(indexFields, constants.LokiConnectionIndexFields...) } @@ -305,15 +305,7 @@ func (b *builder) service(old *corev1.Service) *corev1.Service { // returns a configmap with a digest of its configuration contents, which will be used to // detect any configuration change func (b *builder) configMap() (*corev1.ConfigMap, string) { - outputRecordTypes := []string{constants.FlowLogRecordType} - if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll { - outputRecordTypes = []string{ - constants.FlowLogRecordType, - constants.NewConnectionRecordType, - constants.HeartbeatRecordType, - constants.EndConnectionRecordType, - } - } + outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) config := map[string]interface{}{ "recordTypes": outputRecordTypes, diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go index 8c6e1f0cc..f374ec3a5 100644 --- a/controllers/flowlogspipeline/flp_common_objects.go +++ b/controllers/flowlogspipeline/flp_common_objects.go @@ -339,9 +339,10 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev // Else: nothing for eBPF at the moment } - // Connection tracking stage (only if OutputRecordTypes is set to ALL) - if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll { + // Connection tracking stage (only if OutputRecordTypes is not FLOWS) + if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes != flowslatest.OutputRecordFlows { indexFields = append(indexFields, constants.LokiConnectionIndexFields...) + outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) endTimeout := conntrackEndTimeout if b.desired.Processor.ConnectionEndTimeout != nil { @@ -368,12 +369,7 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev FieldGroupBRef: "dst", }, }, - OutputRecordTypes: []string{ - constants.FlowLogRecordType, - constants.NewConnectionRecordType, - constants.HeartbeatRecordType, - constants.EndConnectionRecordType, - }, + OutputRecordTypes: outputRecordTypes, OutputFields: []api.OutputField{ { Name: "Bytes", diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 14a33c723..49be4bec7 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -6287,9 +6287,9 @@ processor defines the settings of the component that receives the flows from the outputRecordTypes enum - outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export flowLogs, or "ALL" to generate both flowLogs and newConnection, heartbeat, endConnection events
+ outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, endConnection events, "ENDED_CONNECTIONS" to generate only endConnection events or "ALL" to generate both flowLogs and connection events

- Enum: FLOWS, ALL
+ Enum: FLOWS, CONNECTIONS, ENDED_CONNECTIONS, ALL
Default: FLOWS
false diff --git a/pkg/helper/flowcollector.go b/pkg/helper/flowcollector.go index 3741ae8c4..0777f1b3d 100644 --- a/pkg/helper/flowcollector.go +++ b/pkg/helper/flowcollector.go @@ -2,6 +2,7 @@ package helper import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1" + "github.com/netobserv/network-observability-operator/controllers/constants" ) func GetSampling(spec *flowslatest.FlowCollectorSpec) int { @@ -40,3 +41,33 @@ func LokiUseHostToken(spec *flowslatest.FlowCollectorLoki) bool { func LokiForwardUserToken(spec *flowslatest.FlowCollectorLoki) bool { return spec.AuthToken == flowslatest.LokiAuthForwardUserToken } + +func GetRecordTypes(processor *flowslatest.FlowCollectorFLP) []string { + outputRecordTypes := []string{constants.FlowLogRecordType} + if processor.OutputRecordTypes != nil { + switch *processor.OutputRecordTypes { + case flowslatest.OutputRecordFlows: + outputRecordTypes = []string{ + constants.FlowLogRecordType, + } + case flowslatest.OutputRecordConnections: + outputRecordTypes = []string{ + constants.NewConnectionRecordType, + constants.HeartbeatRecordType, + constants.EndConnectionRecordType, + } + case flowslatest.OutputRecordEndedConnections: + outputRecordTypes = []string{ + constants.EndConnectionRecordType, + } + case flowslatest.OutputRecordAll: + outputRecordTypes = []string{ + constants.FlowLogRecordType, + constants.NewConnectionRecordType, + constants.HeartbeatRecordType, + constants.EndConnectionRecordType, + } + } + } + return outputRecordTypes +}