Skip to content

Commit

Permalink
NETOBSERV-974 Add SASL support (#112)
Browse files Browse the repository at this point in the history
* Add SASL support

* Address feedback, split buildFlowExporter for linter
  • Loading branch information
jotak authored May 10, 2023
1 parent 3698de1 commit 4272333
Show file tree
Hide file tree
Showing 33 changed files with 5,074 additions and 65 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ require (
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
Expand Down
136 changes: 73 additions & 63 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,79 +188,89 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
switch cfg.Export {
case "grpc":
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows)
if err != nil {
return nil, err
}
return grpcExporter.ExportFlows, nil
return buildGRPCExporter(cfg)
case "kafka":
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
var compression compress.Compression
if err := compression.UnmarshalText([]byte(cfg.KafkaCompression)); err != nil {
return nil, fmt.Errorf("wrong Kafka compression value %s. Admitted values are "+
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
}
transport := kafkago.Transport{}
if cfg.KafkaEnableTLS {
tlsConfig, err := buildTLSConfig(cfg)
if err != nil {
return nil, err
}
transport.TLS = tlsConfig
}
return (&exporter.KafkaProto{
Writer: &kafkago.Writer{
Addr: kafkago.TCP(cfg.KafkaBrokers...),
Topic: cfg.KafkaTopic,
BatchSize: cfg.KafkaBatchMessages,
// Assigning KafkaBatchSize to BatchBytes instead of BatchSize might be confusing here.
// The reason is that the "standard" Kafka name for this variable is "batch.size",
// which specifies the size of messages in terms of bytes, and not in terms of entries.
// We have decided to hide this library implementation detail and expose to the
// customer the common, standard name and meaning for batch.size
BatchBytes: int64(cfg.KafkaBatchSize),
// Segmentio's Kafka-go does not behave as standard Kafka library, and would
// throttle any Write invocation until reaching the timeout.
// Since we invoke write once each CacheActiveTimeout, we can safely disable this
// timeout throttling
// https://github.com/netobserv/flowlogs-pipeline/pull/233#discussion_r897830057
BatchTimeout: time.Nanosecond,
Async: cfg.KafkaAsync,
Compression: compression,
Transport: &transport,
Balancer: &kafkago.RoundRobin{},
},
}).ExportFlows, nil
return buildKafkaExporter(cfg)
case "ipfix+udp":
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "udp")
return buildIPFIXExporter(cfg, "udp")
case "ipfix+tcp":
return buildIPFIXExporter(cfg, "tcp")
default:
return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export)
}
}

func buildGRPCExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows)
if err != nil {
return nil, err
}
return grpcExporter.ExportFlows, nil
}

func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
var compression compress.Compression
if err := compression.UnmarshalText([]byte(cfg.KafkaCompression)); err != nil {
return nil, fmt.Errorf("wrong Kafka compression value %s. Admitted values are "+
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
}
transport := kafkago.Transport{}
if cfg.KafkaEnableTLS {
tlsConfig, err := buildTLSConfig(cfg)
if err != nil {
return nil, err
}
return ipfix.ExportFlows, nil
case "ipfix+tcp":
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "tcp")
transport.TLS = tlsConfig
}
if cfg.KafkaEnableSASL {
mechanism, err := buildSASLConfig(cfg)
if err != nil {
return nil, err
}
return ipfix.ExportFlows, nil
default:
return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export)
transport.SASL = mechanism
}
return (&exporter.KafkaProto{
Writer: &kafkago.Writer{
Addr: kafkago.TCP(cfg.KafkaBrokers...),
Topic: cfg.KafkaTopic,
BatchSize: cfg.KafkaBatchMessages,
// Assigning KafkaBatchSize to BatchBytes instead of BatchSize might be confusing here.
// The reason is that the "standard" Kafka name for this variable is "batch.size",
// which specifies the size of messages in terms of bytes, and not in terms of entries.
// We have decided to hide this library implementation detail and expose to the
// customer the common, standard name and meaning for batch.size
BatchBytes: int64(cfg.KafkaBatchSize),
// Segmentio's Kafka-go does not behave as standard Kafka library, and would
// throttle any Write invocation until reaching the timeout.
// Since we invoke write once each CacheActiveTimeout, we can safely disable this
// timeout throttling
// https://github.com/netobserv/flowlogs-pipeline/pull/233#discussion_r897830057
BatchTimeout: time.Nanosecond,
Async: cfg.KafkaAsync,
Compression: compression,
Transport: &transport,
Balancer: &kafkago.RoundRobin{},
},
}).ExportFlows, nil
}

func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, proto)
if err != nil {
return nil, err
}
return ipfix.ExportFlows, nil
}

// Run a Flows agent. The function will keep running in the same thread
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ type Config struct {
KafkaTLSUserCertPath string `env:"KAFKA_TLS_USER_CERT_PATH"`
// KafkaTLSUserKeyPath is the path to the user (client) private key for mTLS connections
KafkaTLSUserKeyPath string `env:"KAFKA_TLS_USER_KEY_PATH"`
// KafkaEnableSASL set true to enable SASL auth
KafkaEnableSASL bool `env:"KAFKA_ENABLE_SASL" envDefault:"false"`
// KafkaSASLType type of SASL mechanism: plain or scramSHA512
KafkaSASLType string `env:"KAFKA_SASL_TYPE" envDefault:"plain"`
// KafkaSASLClientIDPath is the path to the client ID (username) for SASL auth
KafkaSASLClientIDPath string `env:"KAFKA_SASL_CLIENT_ID_PATH"`
// KafkaSASLClientSecretPath is the path to the client secret (password) for SASL auth
KafkaSASLClientSecretPath string `env:"KAFKA_SASL_CLIENT_SECRET_PATH"`
// ProfilePort sets the listening port for Go's Pprof tool. If it is not set, profile is disabled
ProfilePort int `env:"PROFILE_PORT"`
}
39 changes: 39 additions & 0 deletions pkg/agent/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package agent

import (
"fmt"
"os"
"strings"

"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

func buildSASLConfig(cfg *Config) (sasl.Mechanism, error) {
// Read client ID
id, err := os.ReadFile(cfg.KafkaSASLClientIDPath)
if err != nil {
return nil, err
}
strID := strings.TrimSpace(string(id))
// Read password
pwd, err := os.ReadFile(cfg.KafkaSASLClientSecretPath)
if err != nil {
return nil, err
}
strPwd := strings.TrimSpace(string(pwd))
var mechanism sasl.Mechanism
switch cfg.KafkaSASLType {
case "plain":
mechanism = plain.Mechanism{Username: strID, Password: strPwd}
case "scramSHA512":
mechanism, err = scram.Mechanism(scram.SHA512, strID, strPwd)
default:
err = fmt.Errorf("unknown SASL type: %s", cfg.KafkaSASLType)
}
if err != nil {
return nil, err
}
return mechanism, nil
}
3 changes: 1 addition & 2 deletions pkg/agent/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func buildTLSConfig(cfg *Config) (*tls.Config, error) {
}
tlsConfig.Certificates = []tls.Certificate{pair}
}
return tlsConfig, nil
}
return nil, nil
return tlsConfig, nil
}
30 changes: 30 additions & 0 deletions vendor/github.com/segmentio/kafka-go/sasl/plain/plain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
11 changes: 11 additions & 0 deletions vendor/github.com/xdg/scram/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4272333

Please sign in to comment.