Skip to content

Commit

Permalink
added more outputRecordTypes options
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Mar 14, 2023
1 parent 84c7fe8 commit 1fed7c9
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 28 deletions.
11 changes: 7 additions & 4 deletions api/v1beta1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ type FLPMetrics struct {
}

const (
OutputRecordFlows = "FLOWS"
OutputRecordAll = "ALL"
OutputRecordFlows = "FLOWS"
OutputRecordConnections = "CONNECTIONS"
OutputRecordEndedConnections = "ENDED_CONNECTIONS"
OutputRecordAll = "ALL"
)

// FlowCollectorFLP defines the desired flowlogs-pipeline state of FlowCollector
Expand Down Expand Up @@ -360,9 +362,10 @@ type FlowCollectorFLP struct {
KafkaConsumerBatchSize int `json:"kafkaConsumerBatchSize"`

// outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export
// flowLogs, or "ALL" to generate both flowLogs and newConnection, heartbeat, endConnection events
// flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, endConnection events, "ENDED_CONNECTIONS" to generate
// only endConnection events or "ALL" to generate both flowLogs and connection events
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Enum:="FLOWS";"ALL"
// +kubebuilder:validation:Enum:="FLOWS";"CONNECTIONS";"ENDED_CONNECTIONS";"ALL"
// +kubebuilder:default:=FLOWS
OutputRecordTypes *string `json:"outputRecordTypes,omitempty"`

Expand Down
7 changes: 5 additions & 2 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4149,10 +4149,13 @@ spec:
default: FLOWS
description: outputRecordTypes defines the desired record types
to generate. Possible values are "FLOWS" (default) to export
flowLogs, or "ALL" to generate both flowLogs and newConnection,
heartbeat, endConnection events
flowLogs, "CONNECTIONS" to generate newConnection, heartbeat,
endConnection events, "ENDED_CONNECTIONS" to generate only endConnection
events or "ALL" to generate both flowLogs and connection events
enum:
- FLOWS
- CONNECTIONS
- ENDED_CONNECTIONS
- ALL
type: string
port:
Expand Down
7 changes: 5 additions & 2 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4136,10 +4136,13 @@ spec:
default: FLOWS
description: outputRecordTypes defines the desired record types
to generate. Possible values are "FLOWS" (default) to export
flowLogs, or "ALL" to generate both flowLogs and newConnection,
heartbeat, endConnection events
flowLogs, "CONNECTIONS" to generate newConnection, heartbeat,
endConnection events, "ENDED_CONNECTIONS" to generate only endConnection
events or "ALL" to generate both flowLogs and connection events
enum:
- FLOWS
- CONNECTIONS
- ENDED_CONNECTIONS
- ALL
type: string
port:
Expand Down
12 changes: 2 additions & 10 deletions controllers/consoleplugin/consoleplugin_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func buildArgs(desired *flowslatest.FlowCollectorSpec) []string {

// check for connection traking to list indexes
indexFields := constants.LokiIndexFields
if desired.Processor.OutputRecordTypes != nil && *desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll {
if desired.Processor.OutputRecordTypes != nil && *desired.Processor.OutputRecordTypes != flowslatest.OutputRecordFlows {
indexFields = append(indexFields, constants.LokiConnectionIndexFields...)
}

Expand Down Expand Up @@ -305,15 +305,7 @@ func (b *builder) service(old *corev1.Service) *corev1.Service {
// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {
outputRecordTypes := []string{constants.FlowLogRecordType}
if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll {
outputRecordTypes = []string{
constants.FlowLogRecordType,
constants.NewConnectionRecordType,
constants.HeartbeatRecordType,
constants.EndConnectionRecordType,
}
}
outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor)

config := map[string]interface{}{
"recordTypes": outputRecordTypes,
Expand Down
12 changes: 4 additions & 8 deletions controllers/flowlogspipeline/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev
// Else: nothing for eBPF at the moment
}

// Connection tracking stage (only if OutputRecordTypes is set to ALL)
if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes == flowslatest.OutputRecordAll {
// Connection tracking stage (only if OutputRecordTypes is not FLOWS)
if b.desired.Processor.OutputRecordTypes != nil && *b.desired.Processor.OutputRecordTypes != flowslatest.OutputRecordFlows {
indexFields = append(indexFields, constants.LokiConnectionIndexFields...)
outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor)

endTimeout := conntrackEndTimeout
if b.desired.Processor.ConnectionEndTimeout != nil {
Expand All @@ -368,12 +369,7 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev
FieldGroupBRef: "dst",
},
},
OutputRecordTypes: []string{
constants.FlowLogRecordType,
constants.NewConnectionRecordType,
constants.HeartbeatRecordType,
constants.EndConnectionRecordType,
},
OutputRecordTypes: outputRecordTypes,
OutputFields: []api.OutputField{
{
Name: "Bytes",
Expand Down
4 changes: 2 additions & 2 deletions docs/FlowCollector.md
Original file line number Diff line number Diff line change
Expand Up @@ -6287,9 +6287,9 @@ processor defines the settings of the component that receives the flows from the
<td><b>outputRecordTypes</b></td>
<td>enum</td>
<td>
outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export flowLogs, or "ALL" to generate both flowLogs and newConnection, heartbeat, endConnection events<br/>
outputRecordTypes defines the desired record types to generate. Possible values are "FLOWS" (default) to export flowLogs, "CONNECTIONS" to generate newConnection, heartbeat, endConnection events, "ENDED_CONNECTIONS" to generate only endConnection events or "ALL" to generate both flowLogs and connection events<br/>
<br/>
<i>Enum</i>: FLOWS, ALL<br/>
<i>Enum</i>: FLOWS, CONNECTIONS, ENDED_CONNECTIONS, ALL<br/>
<i>Default</i>: FLOWS<br/>
</td>
<td>false</td>
Expand Down
31 changes: 31 additions & 0 deletions pkg/helper/flowcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helper

import (
flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1"
"github.com/netobserv/network-observability-operator/controllers/constants"
)

func GetSampling(spec *flowslatest.FlowCollectorSpec) int {
Expand Down Expand Up @@ -40,3 +41,33 @@ func LokiUseHostToken(spec *flowslatest.FlowCollectorLoki) bool {
func LokiForwardUserToken(spec *flowslatest.FlowCollectorLoki) bool {
return spec.AuthToken == flowslatest.LokiAuthForwardUserToken
}

func GetRecordTypes(processor *flowslatest.FlowCollectorFLP) []string {
outputRecordTypes := []string{constants.FlowLogRecordType}
if processor.OutputRecordTypes != nil {
switch *processor.OutputRecordTypes {
case flowslatest.OutputRecordFlows:
outputRecordTypes = []string{
constants.FlowLogRecordType,
}
case flowslatest.OutputRecordConnections:
outputRecordTypes = []string{
constants.NewConnectionRecordType,
constants.HeartbeatRecordType,
constants.EndConnectionRecordType,
}
case flowslatest.OutputRecordEndedConnections:
outputRecordTypes = []string{
constants.EndConnectionRecordType,
}
case flowslatest.OutputRecordAll:
outputRecordTypes = []string{
constants.FlowLogRecordType,
constants.NewConnectionRecordType,
constants.HeartbeatRecordType,
constants.EndConnectionRecordType,
}
}
}
return outputRecordTypes
}

0 comments on commit 1fed7c9

Please sign in to comment.