From 7251d5efd4b848d4b636a50a2fc527d87ce19276 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Wed, 20 Jul 2022 15:04:06 +0300 Subject: [PATCH 1/6] return typed model instead of generic map --- pkg/confgen/flowlogs2metrics_config.go | 174 +++++++++++++------------ 1 file changed, 88 insertions(+), 86 deletions(-) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index ca9d03981..0340695c6 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -21,6 +21,8 @@ import ( "fmt" "io/ioutil" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + config2 "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -28,72 +30,72 @@ import ( func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { config := map[string]interface{}{ "log-level": "error", - "pipeline": []map[string]string{ - {"name": "ingest_collector"}, - {"name": "transform_generic", - "follows": "ingest_collector", + "pipeline": []config2.Stage{ + {Name: "ingest_collector"}, + {Name: "transform_generic", + Follows: "ingest_collector", }, - {"name": "transform_network", - "follows": "transform_generic", + {Name: "transform_network", + Follows: "transform_generic", }, - {"name": "extract_aggregate", - "follows": "transform_network", + {Name: "extract_aggregate", + Follows: "transform_network", }, - {"name": "encode_prom", - "follows": "extract_aggregate", + {Name: "encode_prom", + Follows: "extract_aggregate", }, - {"name": "write_loki", - "follows": "transform_network", + {Name: "write_loki", + Follows: "transform_network", }, }, - "parameters": []map[string]interface{}{ - {"name": "ingest_collector", - "ingest": map[string]interface{}{ - "type": "collector", - "collector": map[string]interface{}{ - "port": cg.config.Ingest.Collector.Port, - "portLegacy": cg.config.Ingest.Collector.PortLegacy, - "hostname": cg.config.Ingest.Collector.HostName, + "parameters": []config2.StageParam{ + {Name: "ingest_collector", + Ingest: &config2.Ingest{ + Type: "collector", + Collector: &api.IngestCollector{ + Port: cg.config.Ingest.Collector.Port, + PortLegacy: cg.config.Ingest.Collector.PortLegacy, + HostName: cg.config.Ingest.Collector.HostName, }, }, }, - {"name": "transform_generic", - "transform": map[string]interface{}{ - "type": "generic", - "generic": map[string]interface{}{ - "policy": "replace_keys", - "rules": cg.config.Transform.Generic.Rules, + {Name: "transform_generic", + Transform: &config2.Transform{ + Type: "generic", + Generic: &api.TransformGeneric{ + Policy: "replace_keys", + Rules: cg.config.Transform.Generic.Rules, }, }, }, - {"name": "transform_network", - "transform": map[string]interface{}{ - "type": "network", - "network": map[string]interface{}{ - "rules": cg.transformRules, + {Name: "transform_network", + Transform: &config2.Transform{ + Type: "network", + Network: &api.TransformNetwork{ + Rules: cg.transformRules, }, }, }, - {"name": "extract_aggregate", - "extract": map[string]interface{}{ - "type": "aggregates", - "aggregates": cg.aggregateDefinitions, + {Name: "extract_aggregate", + Extract: &config2.Extract{ + Type: "aggregates", + Aggregates: cg.aggregateDefinitions, }, }, - {"name": "encode_prom", - "encode": map[string]interface{}{ - "type": "prom", - "prom": map[string]interface{}{ - "port": cg.config.Encode.Prom.Port, - "prefix": cg.config.Encode.Prom.Prefix, - "metrics": cg.promMetrics, + {Name: "encode_prom", + Encode: &config2.Encode{ + Type: "prom", + Prom: &api.PromEncode{ + Port: cg.config.Encode.Prom.Port, + Prefix: cg.config.Encode.Prom.Prefix, + Metrics: cg.promMetrics, }, }, }, - {"name": "write_loki", - "write": map[string]interface{}{ - "type": cg.config.Write.Type, - "loki": cg.config.Write.Loki, + {Name: "write_loki", + Write: &config2.Write{ + Type: cg.config.Write.Type, + Loki: &cg.config.Write.Loki, }, }, }, @@ -102,68 +104,68 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { } func (cg *ConfGen) GenerateTruncatedConfig(stages []string) map[string]interface{} { - parameters := make([]map[string]interface{}, len(stages)) + parameters := make([]config2.StageParam, len(stages)) for i, stage := range stages { switch stage { case "ingest": - parameters[i] = map[string]interface{}{ - "name": "ingest_collector", - "ingest": map[string]interface{}{ - "type": "collector", - "collector": map[string]interface{}{ - "port": cg.config.Ingest.Collector.Port, - "portLegacy": cg.config.Ingest.Collector.PortLegacy, - "hostname": cg.config.Ingest.Collector.HostName, + parameters[i] = config2.StageParam{ + Name: "ingest_collector", + Ingest: &config2.Ingest{ + Type: "collector", + Collector: &api.IngestCollector{ + Port: cg.config.Ingest.Collector.Port, + PortLegacy: cg.config.Ingest.Collector.PortLegacy, + HostName: cg.config.Ingest.Collector.HostName, }, }, } case "transform_generic": - parameters[i] = map[string]interface{}{ - "name": "transform_generic", - "transform": map[string]interface{}{ - "type": "generic", - "generic": map[string]interface{}{ - "policy": "replace_keys", - "rules": cg.config.Transform.Generic.Rules, + parameters[i] = config2.StageParam{ + Name: "transform_generic", + Transform: &config2.Transform{ + Type: "generic", + Generic: &api.TransformGeneric{ + Policy: "replace_keys", + Rules: cg.config.Transform.Generic.Rules, }, }, } case "transform_network": - parameters[i] = map[string]interface{}{ - "name": "transform_network", - "transform": map[string]interface{}{ - "type": "network", - "network": map[string]interface{}{ - "rules": cg.transformRules, + parameters[i] = config2.StageParam{ + Name: "transform_network", + Transform: &config2.Transform{ + Type: "network", + Network: &api.TransformNetwork{ + Rules: cg.transformRules, }, }, } case "extract_aggregate": - parameters[i] = map[string]interface{}{ - "name": "extract_aggregate", - "extract": map[string]interface{}{ - "type": "aggregates", - "aggregates": cg.aggregateDefinitions, + parameters[i] = config2.StageParam{ + Name: "extract_aggregate", + Extract: &config2.Extract{ + Type: "aggregates", + Aggregates: cg.aggregateDefinitions, }, } case "encode_prom": - parameters[i] = map[string]interface{}{ - "name": "encode_prom", - "encode": map[string]interface{}{ - "type": "prom", - "prom": map[string]interface{}{ - "port": cg.config.Encode.Prom.Port, - "prefix": cg.config.Encode.Prom.Prefix, - "metrics": cg.promMetrics, + parameters[i] = config2.StageParam{ + Name: "encode_prom", + Encode: &config2.Encode{ + Type: "prom", + Prom: &api.PromEncode{ + Port: cg.config.Encode.Prom.Port, + Prefix: cg.config.Encode.Prom.Prefix, + Metrics: cg.promMetrics, }, }, } case "write_loki": - parameters[i] = map[string]interface{}{ - "name": "write_loki", - "write": map[string]interface{}{ - "type": cg.config.Write.Type, - "loki": cg.config.Write.Loki, + parameters[i] = config2.StageParam{ + Name: "write_loki", + Write: &config2.Write{ + Type: cg.config.Write.Type, + Loki: &cg.config.Write.Loki, }, } } From 60d7ac202b34ed3ba72617240871cd8296110253 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Wed, 20 Jul 2022 15:19:29 +0300 Subject: [PATCH 2/6] exported a few more functions --- pkg/confgen/confgen.go | 10 +++++----- pkg/confgen/confgen_test.go | 2 +- pkg/confgen/config.go | 2 +- pkg/confgen/config_test.go | 2 +- pkg/confgen/dedup.go | 10 +++++----- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index aac44ca94..7ee0d3dde 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -71,13 +71,13 @@ type DefFile struct { func (cg *ConfGen) Run() error { var err error - cg.config, err = cg.parseConfigFile(Opt.SrcFolder + "/" + configFileName) + cg.config, err = cg.ParseConfigFile(Opt.SrcFolder + "/" + configFileName) if err != nil { - log.Debugf("cg.parseConfigFile err: %v ", err) + log.Debugf("cg.ParseConfigFile err: %v ", err) return err } - definitionFiles := cg.getDefinitionFiles(Opt.SrcFolder) + definitionFiles := cg.GetDefinitionFiles(Opt.SrcFolder) for _, definitionFile := range definitionFiles { err := cg.parseFile(definitionFile) if err != nil { @@ -86,7 +86,7 @@ func (cg *ConfGen) Run() error { } } - cg.dedupe() + cg.Dedupe() if len(Opt.GenerateStages) != 0 { config := cg.GenerateTruncatedConfig(Opt.GenerateStages) @@ -215,7 +215,7 @@ func (cg *ConfGen) parseFile(fileName string) error { return nil } -func (*ConfGen) getDefinitionFiles(rootPath string) []string { +func (*ConfGen) GetDefinitionFiles(rootPath string) []string { var files []string diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index e691f1e78..f2b416a72 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -113,7 +113,7 @@ func Test_getDefinitionFiles(t *testing.T) { require.NoError(t, err) err = os.WriteFile(filepath.Join(dirPath, filename), []byte(networkDefinitionConfiguration), 0644) require.NoError(t, err) - files := cg.getDefinitionFiles(dirPath) + files := cg.GetDefinitionFiles(dirPath) require.Equal(t, 1, len(files)) expected := []string{path.Join(dirPath, filename)} require.ElementsMatch(t, expected, files) diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index 31e0ce1b1..fda43b389 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -68,7 +68,7 @@ type Config struct { Visualization ConfigVisualization `yaml:"visualization"` } -func (cg *ConfGen) parseConfigFile(fileName string) (*Config, error) { +func (cg *ConfGen) ParseConfigFile(fileName string) (*Config, error) { // parse config file yaml var config Config yamlFile, err := ioutil.ReadFile(fileName) diff --git a/pkg/confgen/config_test.go b/pkg/confgen/config_test.go index 87998d5be..2b3937f26 100644 --- a/pkg/confgen/config_test.go +++ b/pkg/confgen/config_test.go @@ -62,7 +62,7 @@ func Test_parseConfigFile(t *testing.T) { cg := getConfGen() err := os.WriteFile(filename, []byte(testConfig), 0644) require.Equal(t, err, nil) - config, err := cg.parseConfigFile(filename) + config, err := cg.ParseConfigFile(filename) require.NoError(t, err) require.Equal(t, config, expectedConfig()) } diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index 7e0acdd6d..0255468eb 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -25,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) dedupe() { +func (cg *ConfGen) Dedupe() { cg.transformRules = dedupeNetworkTransformRules(cg.transformRules) cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions) } @@ -54,16 +54,16 @@ func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTra // dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates. // The reason is that aggregate.AggregateDefinition is not hashable due to its AggregateBy field which is a slice. func dedupeAggregateDefinitions(aggregateDefinitions aggregate.Definitions) aggregate.Definitions { - var dedpueSlice []api.AggregateDefinition + var dedupeSlice []api.AggregateDefinition for i, aggregateDefinition := range aggregateDefinitions { - if containsAggregateDefinitions(dedpueSlice, aggregateDefinition) { + if containsAggregateDefinitions(dedupeSlice, aggregateDefinition) { // duplicate aggregateDefinition log.Debugf("Remove duplicate AggregateDefinitions %v at index %v", aggregateDefinition, i) continue } - dedpueSlice = append(dedpueSlice, aggregateDefinition) + dedupeSlice = append(dedupeSlice, aggregateDefinition) } - return dedpueSlice + return dedupeSlice } func containsAggregateDefinitions(slice []api.AggregateDefinition, searchItem api.AggregateDefinition) bool { From c4ec1c2dc4d69351517adbb955d0deec1f5c376c Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Thu, 21 Jul 2022 09:50:43 +0300 Subject: [PATCH 3/6] allowed missing config.yaml file --- go.mod | 2 +- pkg/confgen/config.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 33bbee57c..0c6c89177 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.32.1 github.com/segmentio/kafka-go v0.4.28 @@ -72,7 +73,6 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index fda43b389..9d149169e 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -19,8 +19,10 @@ package confgen import ( "io/ioutil" + "os" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -70,7 +72,12 @@ type Config struct { func (cg *ConfGen) ParseConfigFile(fileName string) (*Config, error) { // parse config file yaml + // provide a minimal config for when config file is missing (as for Netobserv Openshift Operator) var config Config + if _, err := os.Stat(fileName); errors.Is(err, os.ErrNotExist) { + log.Errorf("config file %s does not exist", fileName) + return &Config{}, nil + } yamlFile, err := ioutil.ReadFile(fileName) if err != nil { log.Debugf("ioutil.ReadFile err: %v ", err) From e0c9f0e596909c11275d91ae3bb7c6fc0209898a Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Thu, 21 Jul 2022 11:27:10 +0300 Subject: [PATCH 4/6] added ConfigFileSgtruct --- docs/api.md | 1 - pkg/api/encode_prom.go | 10 ++-- pkg/confgen/config.go | 5 +- pkg/confgen/flowlogs2metrics_config.go | 18 ++++---- pkg/config/config.go | 64 ++++++++++++++------------ 5 files changed, 53 insertions(+), 45 deletions(-) diff --git a/docs/api.md b/docs/api.md index ba43e959b..aef9da87e 100644 --- a/docs/api.md +++ b/docs/api.md @@ -145,7 +145,6 @@ Following is the supported API format for writing to standard output:
  stdout:
          format: the format of each line: printf (default - writes using golang's default map printing), fields (writes one key and value field per line) or json
-
 
## Aggregate metrics API Following is the supported API format for specifying metrics aggregations: diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index e069a90d1..f534d36b7 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -18,10 +18,10 @@ package api type PromEncode struct { - Metrics PromMetricsItems `yaml:"metrics" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"` - Port int `yaml:"port" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"` - Prefix string `yaml:"prefix" json:"prefix,omitempty" doc:"prefix added to each metric name"` - ExpiryTime int `yaml:"expiryTime" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"` + Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"` + Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"` + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"` + ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"` } type PromEncodeOperationEnum struct { @@ -40,7 +40,7 @@ type PromMetricsItem struct { Filter PromMetricsFilter `yaml:"filter" json:"filter" doc:"the criterion to filter entries by"` ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` + Buckets []float64 `yaml:"buckets,omitempty" json:"buckets,omitempty" doc:"histogram buckets"` } type PromMetricsItems []PromMetricsItem diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index 9d149169e..a4743e279 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -75,7 +75,10 @@ func (cg *ConfGen) ParseConfigFile(fileName string) (*Config, error) { // provide a minimal config for when config file is missing (as for Netobserv Openshift Operator) var config Config if _, err := os.Stat(fileName); errors.Is(err, os.ErrNotExist) { - log.Errorf("config file %s does not exist", fileName) + if len(Opt.GenerateStages) == 0 { + log.Errorf("config file %s does not exist", fileName) + return nil, err + } return &Config{}, nil } yamlFile, err := ioutil.ReadFile(fileName) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 0340695c6..a4100fbb2 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -27,10 +27,10 @@ import ( "gopkg.in/yaml.v2" ) -func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { - config := map[string]interface{}{ - "log-level": "error", - "pipeline": []config2.Stage{ +func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { + config := config2.ConfigFileStruct{ + LogLevel: "error", + Pipeline: []config2.Stage{ {Name: "ingest_collector"}, {Name: "transform_generic", Follows: "ingest_collector", @@ -48,7 +48,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { Follows: "transform_network", }, }, - "parameters": []config2.StageParam{ + Parameters: []config2.StageParam{ {Name: "ingest_collector", Ingest: &config2.Ingest{ Type: "collector", @@ -103,7 +103,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() map[string]interface{} { return config } -func (cg *ConfGen) GenerateTruncatedConfig(stages []string) map[string]interface{} { +func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileStruct { parameters := make([]config2.StageParam, len(stages)) for i, stage := range stages { switch stage { @@ -171,13 +171,13 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) map[string]interface } } log.Debugf("parameters = %v \n", parameters) - config := map[string]interface{}{ - "parameters": parameters, + config := config2.ConfigFileStruct{ + Parameters: parameters, } return config } -func (cg *ConfGen) writeConfigFile(fileName string, config map[string]interface{}) error { +func (cg *ConfGen) writeConfigFile(fileName string, config config2.ConfigFileStruct) error { configData, err := yaml.Marshal(&config) if err != nil { return err diff --git a/pkg/config/config.go b/pkg/config/config.go index bb06a312d..42eb1dfd8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,57 +40,63 @@ type Health struct { Port string } +type ConfigFileStruct struct { + LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` + Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` + Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` +} + type Stage struct { - Name string `json:"name"` - Follows string `json:"follows,omitempty"` + Name string `yaml:"name" json:"name"` + Follows string `yaml:"follows,omitempty" json:"follows,omitempty"` } type StageParam struct { - Name string `json:"name"` - Ingest *Ingest `json:"ingest,omitempty"` - Transform *Transform `json:"transform,omitempty"` - Extract *Extract `json:"extract,omitempty"` - Encode *Encode `json:"encode,omitempty"` - Write *Write `json:"write,omitempty"` + Name string `yaml:"name" json:"name"` + Ingest *Ingest `yaml:"ingest,omitempty" json:"ingest,omitempty"` + Transform *Transform `yaml:"transform,omitempty" json:"transform,omitempty"` + Extract *Extract `yaml:"extract,omitempty" json:"extract,omitempty"` + Encode *Encode `yaml:"encode,omitempty" json:"encode,omitempty"` + Write *Write `yaml:"write,omitempty" json:"write,omitempty"` } type Ingest struct { - Type string `json:"type"` - File *File `json:"file,omitempty"` - Collector *api.IngestCollector `json:"collector,omitempty"` - Kafka *api.IngestKafka `json:"kafka,omitempty"` - GRPC *api.IngestGRPCProto `json:"grpc,omitempty"` + Type string `yaml:"type" json:"type"` + File *File `yaml:"file,omitempty" json:"file,omitempty"` + Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"` + Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"` + GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"` } type File struct { - Filename string `json:"filename"` - Decoder api.Decoder `json:"decoder"` - Loop bool `json:"loop"` - Chunks int `json:"chunks"` + Filename string `yaml:"filename" json:"filename"` + Decoder api.Decoder `yaml:"decoder" json:"decoder"` + Loop bool `yaml:"loop" json:"loop"` + Chunks int `yaml:"chunks" json:"chunks"` } type Transform struct { - Type string `json:"type"` - Generic *api.TransformGeneric `json:"generic,omitempty"` - Filter *api.TransformFilter `json:"filter,omitempty"` - Network *api.TransformNetwork `json:"network,omitempty"` + Type string `yaml:"type" json:"type"` + Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"` + Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"` + Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"` } type Extract struct { - Type string `json:"type"` - Aggregates []api.AggregateDefinition `json:"aggregates,omitempty"` + Type string `yaml:"type" json:"type"` + Aggregates []api.AggregateDefinition `yaml:"aggregates,omitempty" json:"aggregates,omitempty"` } type Encode struct { - Type string `json:"type"` - Prom *api.PromEncode `json:"prom,omitempty"` - Kafka *api.EncodeKafka `json:"kafka,omitempty"` + Type string `yaml:"type" json:"type"` + Prom *api.PromEncode `yaml:"prom,omitempty" json:"prom,omitempty"` + Kafka *api.EncodeKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"` } type Write struct { - Type string `json:"type"` - Loki *api.WriteLoki `json:"loki,omitempty"` - Stdout *api.WriteStdout `json:"stdout,omitempty"` + Type string `yaml:"type" json:"type"` + Loki *api.WriteLoki `yaml:"loki,omitempty" json:"loki,omitempty"` + Stdout *api.WriteStdout `yaml:"stdout,omitempty" json:"stdout,omitempty"` } // ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json From 317ed52934610452399377a72499f190797019f7 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Thu, 21 Jul 2022 11:31:16 +0300 Subject: [PATCH 5/6] restored output of Buckets --- pkg/api/encode_prom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index f534d36b7..8fee48da4 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -40,7 +40,7 @@ type PromMetricsItem struct { Filter PromMetricsFilter `yaml:"filter" json:"filter" doc:"the criterion to filter entries by"` ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets,omitempty" json:"buckets,omitempty" doc:"histogram buckets"` + Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` } type PromMetricsItems []PromMetricsItem From 87ce1ec3f44a203164df8c760fb0eeca18b817c3 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Sun, 24 Jul 2022 13:20:41 +0300 Subject: [PATCH 6/6] changed config2 to config --- pkg/confgen/flowlogs2metrics_config.go | 58 +++++++++++++------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index a4100fbb2..4f9fe3f1a 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -22,15 +22,15 @@ import ( "io/ioutil" "github.com/netobserv/flowlogs-pipeline/pkg/api" - config2 "github.com/netobserv/flowlogs-pipeline/pkg/config" + config "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) -func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { - config := config2.ConfigFileStruct{ +func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config.ConfigFileStruct { + configStruct := config.ConfigFileStruct{ LogLevel: "error", - Pipeline: []config2.Stage{ + Pipeline: []config.Stage{ {Name: "ingest_collector"}, {Name: "transform_generic", Follows: "ingest_collector", @@ -48,9 +48,9 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { Follows: "transform_network", }, }, - Parameters: []config2.StageParam{ + Parameters: []config.StageParam{ {Name: "ingest_collector", - Ingest: &config2.Ingest{ + Ingest: &config.Ingest{ Type: "collector", Collector: &api.IngestCollector{ Port: cg.config.Ingest.Collector.Port, @@ -60,7 +60,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { }, }, {Name: "transform_generic", - Transform: &config2.Transform{ + Transform: &config.Transform{ Type: "generic", Generic: &api.TransformGeneric{ Policy: "replace_keys", @@ -69,7 +69,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { }, }, {Name: "transform_network", - Transform: &config2.Transform{ + Transform: &config.Transform{ Type: "network", Network: &api.TransformNetwork{ Rules: cg.transformRules, @@ -77,13 +77,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { }, }, {Name: "extract_aggregate", - Extract: &config2.Extract{ + Extract: &config.Extract{ Type: "aggregates", Aggregates: cg.aggregateDefinitions, }, }, {Name: "encode_prom", - Encode: &config2.Encode{ + Encode: &config.Encode{ Type: "prom", Prom: &api.PromEncode{ Port: cg.config.Encode.Prom.Port, @@ -93,24 +93,24 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config2.ConfigFileStruct { }, }, {Name: "write_loki", - Write: &config2.Write{ + Write: &config.Write{ Type: cg.config.Write.Type, Loki: &cg.config.Write.Loki, }, }, }, } - return config + return configStruct } -func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileStruct { - parameters := make([]config2.StageParam, len(stages)) +func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config.ConfigFileStruct { + parameters := make([]config.StageParam, len(stages)) for i, stage := range stages { switch stage { case "ingest": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "ingest_collector", - Ingest: &config2.Ingest{ + Ingest: &config.Ingest{ Type: "collector", Collector: &api.IngestCollector{ Port: cg.config.Ingest.Collector.Port, @@ -120,9 +120,9 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileSt }, } case "transform_generic": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "transform_generic", - Transform: &config2.Transform{ + Transform: &config.Transform{ Type: "generic", Generic: &api.TransformGeneric{ Policy: "replace_keys", @@ -131,9 +131,9 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileSt }, } case "transform_network": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "transform_network", - Transform: &config2.Transform{ + Transform: &config.Transform{ Type: "network", Network: &api.TransformNetwork{ Rules: cg.transformRules, @@ -141,17 +141,17 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileSt }, } case "extract_aggregate": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "extract_aggregate", - Extract: &config2.Extract{ + Extract: &config.Extract{ Type: "aggregates", Aggregates: cg.aggregateDefinitions, }, } case "encode_prom": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "encode_prom", - Encode: &config2.Encode{ + Encode: &config.Encode{ Type: "prom", Prom: &api.PromEncode{ Port: cg.config.Encode.Prom.Port, @@ -161,9 +161,9 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileSt }, } case "write_loki": - parameters[i] = config2.StageParam{ + parameters[i] = config.StageParam{ Name: "write_loki", - Write: &config2.Write{ + Write: &config.Write{ Type: cg.config.Write.Type, Loki: &cg.config.Write.Loki, }, @@ -171,13 +171,13 @@ func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config2.ConfigFileSt } } log.Debugf("parameters = %v \n", parameters) - config := config2.ConfigFileStruct{ + configStruct := config.ConfigFileStruct{ Parameters: parameters, } - return config + return configStruct } -func (cg *ConfGen) writeConfigFile(fileName string, config config2.ConfigFileStruct) error { +func (cg *ConfGen) writeConfigFile(fileName string, config config.ConfigFileStruct) error { configData, err := yaml.Marshal(&config) if err != nil { return err