Skip to content

Commit

Permalink
Improve configuration parameters for Kafka jaegertracing#1359
Browse files Browse the repository at this point in the history
Signed-off-by: chandresh-pancholi <chandreshpancholi007@gmail.com>
  • Loading branch information
chandresh-pancholi committed May 9, 2019
1 parent 7ae02d4 commit a7b5a7c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
9 changes: 8 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
Brokers []string
RequiredAcks int
Compression int
CompressionLevel int
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.RequiredAcks(c.RequiredAcks)
saramaConfig.Producer.Compression = sarama.CompressionCodec(c.Compression)
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true

return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
39 changes: 31 additions & 8 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ const (
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
EncodingZipkinThrift = "zipkin-thrift"

configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixRequiredAcks = ".required.acks"
suffixCompression = ".compression"
suffixCompressionLevel = ".compression.level"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultRequiredAcks = 1
defaultComression = 0
defaultCompressionLevel = -1000
)

var (
Expand Down Expand Up @@ -69,12 +75,29 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
flagSet.Int(
configPrefix+suffixRequiredAcks,
defaultRequiredAcks,
"(experimental) Required kafka broker acknowledgement. default = 1, no response = 0, wait for local = 1, wait for all = -1",
)
flagSet.Int(
configPrefix+suffixCompression,
defaultComression,
"(experimental) Type of compression to use on messages. default = 0, none = 0, gzip = 1, snappy = 2, lz4 = 3, zstd = 4",
)
flagSet.Int(
configPrefix+suffixCompressionLevel,
defaultCompressionLevel,
"(experimental) Level of compression to use on messages. default= -1000, gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
)
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
RequiredAcks: v.GetInt(configPrefix + suffixRequiredAcks),
CompressionLevel: v.GetInt(configPrefix + suffixCompressionLevel),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down
11 changes: 10 additions & 1 deletion plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ func TestOptionsWithFlags(t *testing.T) {
command.ParseFlags([]string{
"--kafka.producer.topic=topic1",
"--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.producer.encoding=protobuf"})
"--kafka.producer.encoding=protobuf",
"--kafka.producer.required.acks=1",
"--kafka.producer.compression=0",
"--kafka.producer.compression.level=-1000"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
assert.Equal(t, 1, opts.config.RequiredAcks)
assert.Equal(t, 0, opts.config.Compression)
assert.Equal(t, -1000, opts.config.CompressionLevel)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -45,4 +51,7 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, defaultTopic, opts.topic)
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
assert.Equal(t, defaultEncoding, opts.encoding)
assert.Equal(t, defaultRequiredAcks, opts.config.RequiredAcks)
assert.Equal(t, defaultComression, opts.config.Compression)
assert.Equal(t, defaultCompressionLevel, opts.config.CompressionLevel)
}

0 comments on commit a7b5a7c

Please sign in to comment.