Skip to content

Commit

Permalink
Support customized Kafka client.id (#1507)
Browse files Browse the repository at this point in the history
Fixes #1506
  • Loading branch information
newly12 authored and vprithvi committed May 1, 2019
1 parent d60b152 commit 15fb6fe
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
7 changes: 4 additions & 3 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
SuffixTopic = ".topic"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
SuffixClientID = ".client-id"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
Expand All @@ -53,6 +55,8 @@ const (
DefaultTopic = "jaeger-spans"
// DefaultGroupID is the default consumer Group ID
DefaultGroupID = "jaeger-ingester"
// DefaultClientID is the default consumer Client ID
DefaultClientID = "jaeger-ingester"
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
Expand Down Expand Up @@ -83,6 +87,10 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixGroupID,
DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixClientID,
DefaultClientID,
"The Consumer Client ID that ingester will use")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
Expand All @@ -102,6 +110,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",")
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
Expand Down
3 changes: 3 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.topic=topic1",
"--kafka.consumer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.encoding=json",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
Expand All @@ -40,6 +41,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
Expand All @@ -54,6 +56,7 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultTopic, o.Topic)
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
assert.Equal(t, DefaultGroupID, o.GroupID)
assert.Equal(t, DefaultClientID, o.ClientID)
assert.Equal(t, DefaultParallelism, o.Parallelism)
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
Expand Down
8 changes: 5 additions & 3 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
Brokers []string
Topic string
GroupID string
ClientID string
Consumer
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
3 changes: 3 additions & 0 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()
const encoding = "json"
const groupID = "kafka-integration-test"
const clientID = "kafka-integration-test"
// A new topic is generated per execution to avoid data overlap
topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10)

Expand Down Expand Up @@ -81,6 +82,8 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
encoding,
"--kafka.consumer.group-id",
groupID,
"--kafka.consumer.client-id",
clientID,
"--ingester.parallelism",
"1000",
})
Expand Down

0 comments on commit 15fb6fe

Please sign in to comment.