Skip to content

Commit

Permalink
Improve configuration parameters for Kafka jaegertracing#1359
Browse files Browse the repository at this point in the history
Signed-off-by: chandresh-pancholi <chandreshpancholi007@gmail.com>
  • Loading branch information
chandresh-pancholi committed May 7, 2019
1 parent 7ae02d4 commit 7ad7d93
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
Brokers []string
RequiredAcks int
CompressionLevel int
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.RequiredAcks(c.RequiredAcks)
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true

return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
16 changes: 16 additions & 0 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ const (
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixRequiredAcks = ".requiredacks"
suffixCompressionLevel = ".compression.level"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultRequiredAcks = 1
defaultCompressionLevel = -1000
)

var (
Expand Down Expand Up @@ -69,12 +73,24 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
flagSet.Int(
configPrefix+suffixRequiredAcks,
defaultRequiredAcks,
"(experimental) Required kafka broker acknowledgement",
)
flagSet.Int(
configPrefix+suffixCompressionLevel,
defaultCompressionLevel,
"(experimental) Select compression level",
)
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
RequiredAcks: v.GetInt(configPrefix + suffixRequiredAcks),
CompressionLevel: v.GetInt(configPrefix + suffixCompressionLevel),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down
8 changes: 7 additions & 1 deletion plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ func TestOptionsWithFlags(t *testing.T) {
command.ParseFlags([]string{
"--kafka.producer.topic=topic1",
"--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.producer.encoding=protobuf"})
"--kafka.producer.encoding=protobuf",
"--kafka.producer.requiredacks=1",
"--kafka.producer.compression.level=-1000"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
assert.Equal(t, 1, opts.config.RequiredAcks)
assert.Equal(t, -1000, opts.config.CompressionLevel)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -45,4 +49,6 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, defaultTopic, opts.topic)
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
assert.Equal(t, defaultEncoding, opts.encoding)
assert.Equal(t, defaultRequiredAcks, opts.config.RequiredAcks)
assert.Equal(t, defaultCompressionLevel, opts.config.CompressionLevel)
}

0 comments on commit 7ad7d93

Please sign in to comment.