From 2628271e7075159cb9b609eec36f0adfd88c14dc Mon Sep 17 00:00:00 2001 From: Kyrylo Shpytsya Date: Fri, 18 Jan 2019 00:12:49 +0200 Subject: [PATCH 1/3] Add SASL/PLAIN config to kafka --- sink/kafka.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sink/kafka.go b/sink/kafka.go index c366f8dc..10279cc4 100644 --- a/sink/kafka.go +++ b/sink/kafka.go @@ -70,6 +70,14 @@ func NewKafka() (*KafkaSink, error) { config.Net.TLS.Enable = true } + user := os.Getenv("SINK_KAFKA_USER") + if user != "" { + password := os.Getenv("SINK_KAFKA_PASSWORD") + config.SASL.Enable = true + config.SASL.User = user + config.SASL.Password = password + } + producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { log.Fatal(err) From 8b43c08d28a6d85b9b626a2d0b54ea59b7ce538f Mon Sep 17 00:00:00 2001 From: Kyrylo Shpytsya Date: Fri, 18 Jan 2019 01:22:47 +0200 Subject: [PATCH 2/3] Fix SASL/PLAIN config for vendorized sarama version --- sink/kafka.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sink/kafka.go b/sink/kafka.go index 10279cc4..57f1fe8b 100644 --- a/sink/kafka.go +++ b/sink/kafka.go @@ -73,9 +73,9 @@ func NewKafka() (*KafkaSink, error) { user := os.Getenv("SINK_KAFKA_USER") if user != "" { password := os.Getenv("SINK_KAFKA_PASSWORD") - config.SASL.Enable = true - config.SASL.User = user - config.SASL.Password = password + config.Net.SASL.Enable = true + config.Net.SASL.User = user + config.Net.SASL.Password = password } producer, err := sarama.NewSyncProducer(brokerList, config) From e556d30ce07107d8c4e83cb4e7cb2fc3944903fd Mon Sep 17 00:00:00 2001 From: Kyrylo Shpytsya Date: Sat, 19 Jan 2019 03:01:10 +0200 Subject: [PATCH 3/3] Document SASL/PLAIN config for kafka in README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 34281431..0af174ea 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ session "" { ### Kafka -To connect to Kafka with TLS, set the SINK_KAFKA_CA_CERT_PATH to the path to your CA cert file +To connect to Kafka with TLS, set the SINK_KAFKA_CA_CERT_PATH to the path to your CA cert file. +To use SASL/PLAIN authentication, set `$SINK_KAFKA_USER` and `$SINK_KAFKA_PASSWORD` environment variables. ## Usage