From b1ecb5b51b9b640be71b3667da7b61e387674e66 Mon Sep 17 00:00:00 2001 From: WalkerWang731 Date: Thu, 14 Jan 2021 05:11:16 +0800 Subject: [PATCH 1/5] [agent] Add metrics to show connections status between agent and collectors (#2657) * add metrics that show agent connection collector status Signed-off-by: WalkerWang731 * update comment Signed-off-by: WalkerWang731 * exec make fmt Signed-off-by: WalkerWang731 * simplify function and add testing relevant code in the builder_test.go Signed-off-by: WalkerWang731 * add comment in connect_metrics.go Signed-off-by: WalkerWang731 * simplify code and changed use expvar to show target Signed-off-by: WalkerWang731 * simplify code and changed use expvar to show target Signed-off-by: WalkerWang731 * exec make fmt Signed-off-by: WalkerWang731 * Fix collector panic due to sarama sdk returning nil error (#2654) Signed-off-by: luhualin Co-authored-by: luhualin Signed-off-by: WalkerWang731 * Fix flaky tbuffered server test (#2635) * Fix flaky tbuffered server test Signed-off-by: Pavel Kositsyn * Apply suggestions from code review - more readable comments Co-authored-by: Yuri Shkuro Signed-off-by: Pavel Kositsyn Co-authored-by: Yuri Shkuro Signed-off-by: WalkerWang731 * Add github actions for integration tests (#2649) * Add github action for jaeger integration tests Signed-off-by: Ashmita Bohara * Create separate workflow for each integration test Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Clean-up GH action names (#2661) Signed-off-by: WalkerWang731 * Fix for failures in badger integration tests (#2660) Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Add protogen validation test (#2662) Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Add github action for jaeger all-in-one image (#2663) * Add github action for jaeger all-in-one image Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara * Feedbacks changes Signed-off-by: Ashmita Bohara * Make steps self-explantory Signed-off-by: Ashmita Bohara * Fix git tags issue Signed-off-by: Ashmita Bohara * Fix ES integration test Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Update comment that looks confusing during builds Signed-off-by: Yuri Shkuro Signed-off-by: WalkerWang731 * Use GitHub actions based build badges Signed-off-by: Yuri Shkuro Signed-off-by: WalkerWang731 * Fix and minor improvements to all-in-one github action (#2667) Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Fix docker login issue with all-in-one build (#2668) * Fix docker login issue with all-in-one build Signed-off-by: Ashmita Bohara * Fix docker login issue with all-in-one build Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Fix issue with all-in-one build (#2669) Signed-off-by: Ashmita Bohara Signed-off-by: WalkerWang731 * Update cmd/agent/app/reporter/connect_metrics.go accept suggestions Co-authored-by: Yuri Shkuro Signed-off-by: WalkerWang731 * Update cmd/agent/app/reporter/connect_metrics.go accept suggestions Co-authored-by: Yuri Shkuro Signed-off-by: WalkerWang731 * simplify the code that remove ConnectMetricsParams{} and integrate ConnectMetrics{} Signed-off-by: WalkerWang731 * simplify the code that remove ConnectMetricsParams{} and integrate ConnectMetrics{} Signed-off-by: WalkerWang731 * merage from the lastest master branch and exec make fmt Signed-off-by: walker.wangxy * add comment on ConnectMetrics Signed-off-by: WalkerWang731 * clear up redundant codes Signed-off-by: WalkerWang731 Co-authored-by: WalkerWang731 Co-authored-by: Betula-L Co-authored-by: luhualin Co-authored-by: Pavel Kositsyn Co-authored-by: Yuri Shkuro Co-authored-by: Ashmita Co-authored-by: Yuri Shkuro Co-authored-by: walker.wangxy --- cmd/agent/app/reporter/connect_metrics.go | 54 +++++++++++++ .../app/reporter/connect_metrics_test.go | 81 +++++++++++++++++++ cmd/agent/app/reporter/grpc/builder.go | 39 ++++++++- cmd/agent/app/reporter/grpc/builder_test.go | 4 +- .../app/reporter/grpc/collector_proxy.go | 2 +- 5 files changed, 173 insertions(+), 7 deletions(-) create mode 100644 cmd/agent/app/reporter/connect_metrics.go create mode 100644 cmd/agent/app/reporter/connect_metrics_test.go diff --git a/cmd/agent/app/reporter/connect_metrics.go b/cmd/agent/app/reporter/connect_metrics.go new file mode 100644 index 00000000000..6efe555a6ba --- /dev/null +++ b/cmd/agent/app/reporter/connect_metrics.go @@ -0,0 +1,54 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" +) + +type connectMetrics struct { + // used for reflect current connection stability + Reconnects metrics.Counter `metric:"collector_reconnects" help:"Number of successful connections (including reconnects) to the collector."` + + // Connection status that jaeger-agent to jaeger-collector, 1 is connected, 0 is disconnected + Status metrics.Gauge `metric:"collector_connected" help:"Status of connection between the agent and the collector; 1 is connected, 0 is disconnected"` +} + +// ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API +type ConnectMetrics struct { + Logger *zap.Logger // required + MetricsFactory metrics.Factory // required + connectMetrics *connectMetrics +} + +// NewConnectMetrics will be initialize ConnectMetrics +func (r *ConnectMetrics) NewConnectMetrics() { + r.connectMetrics = new(connectMetrics) + r.MetricsFactory = r.MetricsFactory.Namespace(metrics.NSOptions{Name: "connection_status"}) + metrics.MustInit(r.connectMetrics, r.MetricsFactory, nil) +} + +// OnConnectionStatusChange used for pass the status parameter when connection is changed +// 0 is disconnected, 1 is connected +// For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0` +func (r *ConnectMetrics) OnConnectionStatusChange(connected bool) { + if connected { + r.connectMetrics.Status.Update(1) + r.connectMetrics.Reconnects.Inc(1) + } else { + r.connectMetrics.Status.Update(0) + } +} diff --git a/cmd/agent/app/reporter/connect_metrics_test.go b/cmd/agent/app/reporter/connect_metrics_test.go new file mode 100644 index 00000000000..09bec30987e --- /dev/null +++ b/cmd/agent/app/reporter/connect_metrics_test.go @@ -0,0 +1,81 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" +) + +type connectMetricsTest struct { + mf *metricstest.Factory +} + +func testConnectMetrics(fn func(tr *connectMetricsTest, r *ConnectMetrics)) { + testConnectMetricsWithParams(&ConnectMetrics{}, fn) +} + +func testConnectMetricsWithParams(cm *ConnectMetrics, fn func(tr *connectMetricsTest, r *ConnectMetrics)) { + mf := metricstest.NewFactory(time.Hour) + cm.MetricsFactory = mf + cm.NewConnectMetrics() + + tr := &connectMetricsTest{ + mf: mf, + } + + fn(tr, cm) +} + +func testCollectorConnected(r *ConnectMetrics) { + r.OnConnectionStatusChange(true) +} + +func testCollectorAborted(r *ConnectMetrics) { + r.OnConnectionStatusChange(false) +} + +func TestConnectMetrics(t *testing.T) { + + testConnectMetrics(func(tr *connectMetricsTest, r *ConnectMetrics) { + getGauge := func() map[string]int64 { + _, gauges := tr.mf.Snapshot() + return gauges + } + + getCount := func() map[string]int64 { + counts, _ := tr.mf.Snapshot() + return counts + } + + // testing connect aborted + testCollectorAborted(r) + assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"]) + + // testing connect connected + testCollectorConnected(r) + assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"]) + assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"]) + + // testing reconnect counts + testCollectorAborted(r) + testCollectorConnected(r) + assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"]) + + }) +} diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index d1a2a04ec04..91dc6171efa 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -17,16 +17,20 @@ package grpc import ( "context" "errors" + "expvar" "fmt" "strings" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/discovery" "github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver" @@ -43,6 +47,9 @@ type ConnBuilder struct { DiscoveryMinPeers int Notifier discovery.Notifier Discoverer discovery.Discoverer + + // for unit test and provide ConnectMetrics and outside call + ConnectMetrics *reporter.ConnectMetrics } // NewConnBuilder creates a new grpc connection builder. @@ -51,7 +58,7 @@ func NewConnBuilder() *ConnBuilder { } // CreateConnection creates the gRPC connection -func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, error) { +func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) { var dialOptions []grpc.DialOption var dialTarget string if b.TLS.Enabled { // user requested a secure connection @@ -97,14 +104,38 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er return nil, err } - go func(cc *grpc.ClientConn) { + if b.ConnectMetrics == nil { + cm := reporter.ConnectMetrics{ + Logger: logger, + MetricsFactory: mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}), + } + cm.NewConnectMetrics() + b.ConnectMetrics = &cm + } + + go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) { logger.Info("Checking connection to collector") + var egt *expvar.String + r := expvar.Get("gRPCTarget") + if r == nil { + egt = expvar.NewString("gRPCTarget") + } else { + egt = r.(*expvar.String) + } + for { s := cc.GetState() + if s == connectivity.Ready { + cm.OnConnectionStatusChange(true) + egt.Set(cc.Target()) + } else { + cm.OnConnectionStatusChange(false) + } + logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s)) cc.WaitForStateChange(context.Background(), s) } - }(conn) + }(conn, b.ConnectMetrics) return conn, nil } diff --git a/cmd/agent/app/reporter/grpc/builder_test.go b/cmd/agent/app/reporter/grpc/builder_test.go index 31949754a2a..4df5c395ba4 100644 --- a/cmd/agent/app/reporter/grpc/builder_test.go +++ b/cmd/agent/app/reporter/grpc/builder_test.go @@ -59,7 +59,7 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) - r, err := cfg.CreateConnection(zap.NewNop()) + r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) require.NoError(t, err) assert.NotNil(t, r) } @@ -149,7 +149,7 @@ func TestBuilderWithCollectors(t *testing.T) { cfg.Notifier = test.notifier cfg.Discoverer = test.discoverer - conn, err := cfg.CreateConnection(zap.NewNop()) + conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) if test.expectedError == "" { require.NoError(t, err) require.NotNil(t, conn) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 6c5b19f4e65..2d9dc94783f 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -37,7 +37,7 @@ type ProxyBuilder struct { // NewCollectorProxy creates ProxyBuilder func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { - conn, err := builder.CreateConnection(logger) + conn, err := builder.CreateConnection(logger, mFactory) if err != nil { return nil, err } From 893c8371d18f61be973ad2831fb362a1362350a1 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 17:59:43 -0500 Subject: [PATCH 2/5] [refactor] Minor clean-up of #2657 (#2728) * [refactor] Minor clean-up of #2657 Signed-off-by: Yuri Shkuro * add test Signed-off-by: Yuri Shkuro --- cmd/agent/app/reporter/connect_metrics.go | 36 ++++++--- .../app/reporter/connect_metrics_test.go | 73 +++++++------------ cmd/agent/app/reporter/grpc/builder.go | 28 ++----- 3 files changed, 56 insertions(+), 81 deletions(-) diff --git a/cmd/agent/app/reporter/connect_metrics.go b/cmd/agent/app/reporter/connect_metrics.go index 6efe555a6ba..2c416ffbb6f 100644 --- a/cmd/agent/app/reporter/connect_metrics.go +++ b/cmd/agent/app/reporter/connect_metrics.go @@ -15,8 +15,9 @@ package reporter import ( + "expvar" + "github.com/uber/jaeger-lib/metrics" - "go.uber.org/zap" ) type connectMetrics struct { @@ -29,26 +30,37 @@ type connectMetrics struct { // ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API type ConnectMetrics struct { - Logger *zap.Logger // required - MetricsFactory metrics.Factory // required - connectMetrics *connectMetrics + metrics connectMetrics + target *expvar.String } // NewConnectMetrics will be initialize ConnectMetrics -func (r *ConnectMetrics) NewConnectMetrics() { - r.connectMetrics = new(connectMetrics) - r.MetricsFactory = r.MetricsFactory.Namespace(metrics.NSOptions{Name: "connection_status"}) - metrics.MustInit(r.connectMetrics, r.MetricsFactory, nil) +func NewConnectMetrics(mf metrics.Factory) *ConnectMetrics { + cm := &ConnectMetrics{} + metrics.MustInit(&cm.metrics, mf.Namespace(metrics.NSOptions{Name: "connection_status"}), nil) + + if r := expvar.Get("gRPCTarget"); r == nil { + cm.target = expvar.NewString("gRPCTarget") + } else { + cm.target = r.(*expvar.String) + } + + return cm } // OnConnectionStatusChange used for pass the status parameter when connection is changed // 0 is disconnected, 1 is connected // For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0` -func (r *ConnectMetrics) OnConnectionStatusChange(connected bool) { +func (cm *ConnectMetrics) OnConnectionStatusChange(connected bool) { if connected { - r.connectMetrics.Status.Update(1) - r.connectMetrics.Reconnects.Inc(1) + cm.metrics.Status.Update(1) + cm.metrics.Reconnects.Inc(1) } else { - r.connectMetrics.Status.Update(0) + cm.metrics.Status.Update(0) } } + +// RecordTarget writes the current connection target to an expvar field. +func (cm *ConnectMetrics) RecordTarget(target string) { + cm.target.Set(target) +} diff --git a/cmd/agent/app/reporter/connect_metrics_test.go b/cmd/agent/app/reporter/connect_metrics_test.go index 09bec30987e..d5ff2007f21 100644 --- a/cmd/agent/app/reporter/connect_metrics_test.go +++ b/cmd/agent/app/reporter/connect_metrics_test.go @@ -15,6 +15,7 @@ package reporter import ( + "expvar" "testing" "time" @@ -22,60 +23,38 @@ import ( "github.com/uber/jaeger-lib/metrics/metricstest" ) -type connectMetricsTest struct { - mf *metricstest.Factory -} - -func testConnectMetrics(fn func(tr *connectMetricsTest, r *ConnectMetrics)) { - testConnectMetricsWithParams(&ConnectMetrics{}, fn) -} - -func testConnectMetricsWithParams(cm *ConnectMetrics, fn func(tr *connectMetricsTest, r *ConnectMetrics)) { +func TestConnectMetrics(t *testing.T) { mf := metricstest.NewFactory(time.Hour) - cm.MetricsFactory = mf - cm.NewConnectMetrics() + cm := NewConnectMetrics(mf) - tr := &connectMetricsTest{ - mf: mf, + getGauge := func() map[string]int64 { + _, gauges := mf.Snapshot() + return gauges } - fn(tr, cm) -} - -func testCollectorConnected(r *ConnectMetrics) { - r.OnConnectionStatusChange(true) -} - -func testCollectorAborted(r *ConnectMetrics) { - r.OnConnectionStatusChange(false) -} - -func TestConnectMetrics(t *testing.T) { - - testConnectMetrics(func(tr *connectMetricsTest, r *ConnectMetrics) { - getGauge := func() map[string]int64 { - _, gauges := tr.mf.Snapshot() - return gauges - } + getCount := func() map[string]int64 { + counts, _ := mf.Snapshot() + return counts + } - getCount := func() map[string]int64 { - counts, _ := tr.mf.Snapshot() - return counts - } + // no connection + cm.OnConnectionStatusChange(false) + assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"]) - // testing connect aborted - testCollectorAborted(r) - assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"]) + // first connection + cm.OnConnectionStatusChange(true) + assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"]) + assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"]) - // testing connect connected - testCollectorConnected(r) - assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"]) - assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"]) + // reconnect + cm.OnConnectionStatusChange(false) + cm.OnConnectionStatusChange(true) + assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"]) - // testing reconnect counts - testCollectorAborted(r) - testCollectorConnected(r) - assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"]) + cm.RecordTarget("collector-host") + assert.Equal(t, `"collector-host"`, expvar.Get("gRPCTarget").String()) - }) + // since expvars are singletons, the second constructor should grab the same var + cm2 := NewConnectMetrics(mf) + assert.Same(t, cm.target, cm2.target) } diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index 91dc6171efa..4f220c0a9b9 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -17,11 +17,10 @@ package grpc import ( "context" "errors" - "expvar" "fmt" "strings" - "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "google.golang.org/grpc" @@ -47,9 +46,6 @@ type ConnBuilder struct { DiscoveryMinPeers int Notifier discovery.Notifier Discoverer discovery.Discoverer - - // for unit test and provide ConnectMetrics and outside call - ConnectMetrics *reporter.ConnectMetrics } // NewConnBuilder creates a new grpc connection builder. @@ -104,30 +100,18 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Fact return nil, err } - if b.ConnectMetrics == nil { - cm := reporter.ConnectMetrics{ - Logger: logger, - MetricsFactory: mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}), - } - cm.NewConnectMetrics() - b.ConnectMetrics = &cm - } + connectMetrics := reporter.NewConnectMetrics( + mFactory.Namespace(metrics.NSOptions{Tags: map[string]string{"protocol": "grpc"}}), + ) go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) { logger.Info("Checking connection to collector") - var egt *expvar.String - r := expvar.Get("gRPCTarget") - if r == nil { - egt = expvar.NewString("gRPCTarget") - } else { - egt = r.(*expvar.String) - } for { s := cc.GetState() if s == connectivity.Ready { cm.OnConnectionStatusChange(true) - egt.Set(cc.Target()) + cm.RecordTarget(cc.Target()) } else { cm.OnConnectionStatusChange(false) } @@ -135,7 +119,7 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Fact logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s)) cc.WaitForStateChange(context.Background(), s) } - }(conn, b.ConnectMetrics) + }(conn, connectMetrics) return conn, nil } From 2ffdaae188084e72f7eb29f6a8764fb312a8e651 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 13 Jan 2021 20:26:46 -0500 Subject: [PATCH 3/5] [tests] Increase timeout for TestGRPCResolverRoundRobin (#2729) * [tests] Add logging for TestGRPCResolverRoundRobin Signed-off-by: Yuri Shkuro * rename var for clarity Signed-off-by: Yuri Shkuro * cleanup Signed-off-by: Yuri Shkuro * increase timeout Signed-off-by: Yuri Shkuro --- pkg/discovery/grpcresolver/grpc_resolver.go | 27 ++++++++++--------- .../grpcresolver/grpc_resolver_test.go | 12 ++++++--- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/pkg/discovery/grpcresolver/grpc_resolver.go b/pkg/discovery/grpcresolver/grpc_resolver.go index ba345c90280..2329870f7ca 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver.go +++ b/pkg/discovery/grpcresolver/grpc_resolver.go @@ -123,22 +123,25 @@ func (r *Resolver) Close() { r.closing.Wait() } +// rendezvousHash is the core of the algorithm. It takes input addresses, +// assigns each of them a hash, sorts them by those hash values, and +// returns top N of entries from the sorted list, up to minPeers parameter. func (r *Resolver) rendezvousHash(addresses []string) []string { hasher := fnv.New32() - hosts := hostScores{} - for _, address := range addresses { - hosts = append(hosts, hostScore{ + hosts := make(hostScores, len(addresses)) + for i, address := range addresses { + hosts[i] = hostScore{ address: address, score: hashAddr(hasher, []byte(address), r.salt), - }) + } } sort.Sort(hosts) - size := min(r.discoveryMinPeers, len(hosts)) - addressesPerHost := make([]string, size) - for i := 0; i < size; i++ { - addressesPerHost[i] = hosts[i].address + n := min(r.discoveryMinPeers, len(hosts)) + topN := make([]string, n) + for i := 0; i < n; i++ { + topN[i] = hosts[i].address } - return addressesPerHost + return topN } func min(a, b int) int { @@ -162,9 +165,9 @@ func (r *Resolver) updateAddresses(hostPorts []string) { } func generateAddresses(instances []string) []resolver.Address { - var addrs []resolver.Address - for _, instance := range instances { - addrs = append(addrs, resolver.Address{Addr: instance}) + addrs := make([]resolver.Address, len(instances)) + for i, instance := range instances { + addrs[i] = resolver.Address{Addr: instance} } return addrs } diff --git a/pkg/discovery/grpcresolver/grpc_resolver_test.go b/pkg/discovery/grpcresolver/grpc_resolver_test.go index f6bcdd7cf67..7840efbee52 100644 --- a/pkg/discovery/grpcresolver/grpc_resolver_test.go +++ b/pkg/discovery/grpcresolver/grpc_resolver_test.go @@ -84,7 +84,7 @@ func makeSureConnectionsUp(t *testing.T, count int, testc grpctest.TestServiceCl // Make sure connections to all servers are up. for si := 0; si < count; si++ { connected := false - for i := 0; i < 1000; i++ { + for i := 0; i < 3000; i++ { // 3000 * 10ms = 30s _, err := testc.EmptyCall(context.Background(), &grpctest.Empty{}, grpc.Peer(&p)) if err != nil { continue @@ -92,11 +92,12 @@ func makeSureConnectionsUp(t *testing.T, count int, testc grpctest.TestServiceCl if _, ok := addrs[p.Addr.String()]; !ok { addrs[p.Addr.String()] = struct{}{} connected = true + t.Logf("connected to peer #%d (%v) on iteration %d", si, p.Addr, i) break } time.Sleep(time.Millisecond * 10) } - assert.True(t, connected, "Connection was still not up") + assert.True(t, connected, "Connection #%d was still not up. Connections so far: %+v", si, addrs) } } @@ -135,10 +136,13 @@ func TestGRPCResolverRoundRobin(t *testing.T) { minPeers int connections int // expected number of unique connections to servers }{ - {3, 3}, {5, 5}, {7, 5}, + {minPeers: 3, connections: 3}, + {minPeers: 5, connections: 3}, + // note: test cannot succeed with minPeers < connections because resolver + // will never return more than minPeers addresses. } for _, test := range tests { - t.Run(fmt.Sprintf("minPeers=%d", test.minPeers), func(t *testing.T) { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { res := New(notifier, discoverer, zap.NewNop(), test.minPeers) defer resolver.UnregisterForTesting(res.Scheme()) From b73b0c8c4fb23ba2b50dc6673e689b020228659a Mon Sep 17 00:00:00 2001 From: Mykhailo Semenchenko <45467774+th3M1ke@users.noreply.github.com> Date: Thu, 14 Jan 2021 17:45:56 +0200 Subject: [PATCH 4/5] Add ability to use JS file for UI configuration (jaeger-ui/#123) (#2707) * Add ability to use JS file for UI configuration (#123 from jaeger-ui) Signed-off-by: Mykhailo Semenchenko * Update UI config file injection Signed-off-by: Mykhailo Semenchenko * Update update test, refactor loadUIConfig function Signed-off-by: Mykhailo Semenchenko * Update errors text, minor refactor Signed-off-by: Mykhailo Semenchenko * Remove debug code Signed-off-by: Mykhailo Semenchenko * Update jaeger-ui to latest master with index.html changes Signed-off-by: Mykhailo Semenchenko --- cmd/query/app/fixture/index.html | 8 +- cmd/query/app/fixture/ui-config-malformed.js | 10 +++ cmd/query/app/fixture/ui-config-menu.js | 10 +++ cmd/query/app/fixture/ui-config.js | 5 ++ cmd/query/app/static_handler.go | 59 ++++++++----- cmd/query/app/static_handler_test.go | 90 ++++++++++++++++---- jaeger-ui | 2 +- 7 files changed, 143 insertions(+), 41 deletions(-) create mode 100644 cmd/query/app/fixture/ui-config-malformed.js create mode 100644 cmd/query/app/fixture/ui-config-menu.js create mode 100644 cmd/query/app/fixture/ui-config.js diff --git a/cmd/query/app/fixture/index.html b/cmd/query/app/fixture/index.html index 1fa312504ce..0bce65817e0 100644 --- a/cmd/query/app/fixture/index.html +++ b/cmd/query/app/fixture/index.html @@ -3,6 +3,10 @@ Test Page - - + diff --git a/cmd/query/app/fixture/ui-config-malformed.js b/cmd/query/app/fixture/ui-config-malformed.js new file mode 100644 index 00000000000..1956f1b31e6 --- /dev/null +++ b/cmd/query/app/fixture/ui-config-malformed.js @@ -0,0 +1,10 @@ +() => { + return { + menu: [ + { + label: "GitHub", + url: "https://github.com/jaegertracing/jaeger" + } + ] + } +} diff --git a/cmd/query/app/fixture/ui-config-menu.js b/cmd/query/app/fixture/ui-config-menu.js new file mode 100644 index 00000000000..74863a63ee4 --- /dev/null +++ b/cmd/query/app/fixture/ui-config-menu.js @@ -0,0 +1,10 @@ +function UIConfig(){ + return { + menu: [ + { + label: "GitHub", + url: "https://github.com/jaegertracing/jaeger" + } + ] + } +} diff --git a/cmd/query/app/fixture/ui-config.js b/cmd/query/app/fixture/ui-config.js new file mode 100644 index 00000000000..edd2a0bfcd8 --- /dev/null +++ b/cmd/query/app/fixture/ui-config.js @@ -0,0 +1,5 @@ +function UIConfig(){ + return { + x: "y" + } +} diff --git a/cmd/query/app/static_handler.go b/cmd/query/app/static_handler.go index bf8632c1859..e60a451b979 100644 --- a/cmd/query/app/static_handler.go +++ b/cmd/query/app/static_handler.go @@ -16,6 +16,7 @@ package app import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -40,6 +41,7 @@ var ( // The following patterns are searched and replaced in the index.html as a way of customizing the UI. configPattern = regexp.MustCompile("JAEGER_CONFIG *= *DEFAULT_CONFIG;") + configJsPattern = regexp.MustCompile(`(?im)^\s*\/\/\s*JAEGER_CONFIG_JS.*\n.*`) versionPattern = regexp.MustCompile("JAEGER_VERSION *= *DEFAULT_VERSION;") basePathPattern = regexp.MustCompile(` Date: Thu, 14 Jan 2021 23:55:19 +0800 Subject: [PATCH 5/5] Add support for Kafka SASL/PLAIN authentication via SCRAM-SHA-256 or SCRAM-SHA-512 mechanism (#2724) * add that suppot Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or SCRAM-SHA-512 mechanism Signed-off-by: WalkerWang731 * rename XDGSCRAMClient to scramClient and remove paramater that no point on factory_test.go Signed-off-by: WalkerWang731 * add type assertion Signed-off-by: WalkerWang731 * replacement UserName to Username Signed-off-by: WalkerWang731 --- cmd/ingester/app/flags_test.go | 11 +-- .../exporter/kafkaexporter/kafka_exporter.go | 2 +- .../receiver/kafkareceiver/kafka_receiver.go | 2 +- go.mod | 1 + go.sum | 2 + pkg/kafka/auth/config.go | 10 ++- pkg/kafka/auth/options.go | 24 ++++--- pkg/kafka/auth/plaintext.go | 68 +++++++++++++++++-- plugin/storage/kafka/options_test.go | 11 +-- 9 files changed, 103 insertions(+), 28 deletions(-) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 93be0d13f54..5659dbc4bc4 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, } diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go index 4cf4c583b66..49e84f20941 100644 --- a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go +++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go @@ -77,7 +77,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { if opts.Config.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.Config.PlainText.UserName, + Username: opts.Config.PlainText.Username, Password: opts.Config.PlainText.Password, } } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 912dc906de9..514b5e99a78 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -84,7 +84,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { if opts.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.PlainText.UserName, + Username: opts.PlainText.Username, Password: opts.PlainText.Password, } } diff --git a/go.mod b/go.mod index 5c6299e7588..f099265a018 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/uber/jaeger-lib v2.4.0+incompatible github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.mongodb.org/mongo-driver v1.3.2 // indirect go.uber.org/atomic v1.6.0 go.uber.org/automaxprocs v1.3.0 diff --git a/go.sum b/go.sum index 7317ca9694d..67bd1db2f53 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index c9e42e0c0a5..a133b1a29de 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config setKerberosConfiguration(&config.Kerberos, saramaConfig) return nil case plaintext: - setPlainTextConfiguration(&config.PlainText, saramaConfig) + err := setPlainTextConfiguration(&config.PlainText, saramaConfig) + if err != nil { + return err + } return nil default: return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication) @@ -81,7 +84,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName) config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm) config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab) - config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName) + config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUsername) config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword) config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig) config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab) @@ -97,6 +100,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.TLS.Enabled = true } - config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName) + config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) + config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism) } diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go index e2e7edb4804..44e860ced72 100644 --- a/pkg/kafka/auth/options.go +++ b/pkg/kafka/auth/options.go @@ -30,7 +30,7 @@ const ( suffixKerberosServiceName = ".service-name" suffixKerberosRealm = ".realm" suffixKerberosUseKeyTab = ".use-keytab" - suffixKerberosUserName = ".username" + suffixKerberosUsername = ".username" suffixKerberosPassword = ".password" suffixKerberosConfig = ".config-file" suffixKerberosKeyTab = ".keytab-file" @@ -43,12 +43,14 @@ const ( defaultKerberosUsername = "" defaultKerberosKeyTab = "/etc/security/kafka.keytab" - plainTextPrefix = ".plaintext" - suffixPlainTextUserName = ".username" - suffixPlainTextPassword = ".password" + plainTextPrefix = ".plaintext" + suffixPlainTextUsername = ".username" + suffixPlainTextPassword = ".password" + suffixPlainTextMechanism = ".mechanism" - defaultPlainTextUserName = "" - defaultPlainTextPassword = "" + defaultPlainTextUsername = "" + defaultPlainTextPassword = "" + defaultPlainTextMechanism = "PLAIN" ) func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { @@ -65,7 +67,7 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { defaultKerberosPassword, "The Kerberos password used for authenticate with KDC") flagSet.String( - configPrefix+kerberosPrefix+suffixKerberosUserName, + configPrefix+kerberosPrefix+suffixKerberosUsername, defaultKerberosUsername, "The Kerberos username used for authenticate with KDC") flagSet.String( @@ -84,13 +86,17 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) { flagSet.String( - configPrefix+plainTextPrefix+suffixPlainTextUserName, - defaultPlainTextUserName, + configPrefix+plainTextPrefix+suffixPlainTextUsername, + defaultPlainTextUsername, "The plaintext Username for SASL/PLAIN authentication") flagSet.String( configPrefix+plainTextPrefix+suffixPlainTextPassword, defaultPlainTextPassword, "The plaintext Password for SASL/PLAIN authentication") + flagSet.String( + configPrefix+plainTextPrefix+suffixPlainTextMechanism, + defaultPlainTextMechanism, + "The plaintext Mechanism for SASL/PLAIN authentication, e.g. 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'") } // AddFlags add configuration flags to a flagSet. diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index 277d5d3c58f..1bb95334ec7 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -15,17 +15,77 @@ package auth import ( + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "strings" + "github.com/Shopify/sarama" + "github.com/xdg/scram" ) +// scramClient is the client to use when the auth mechanism is SCRAM +type scramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin prepares the client for the SCRAM exchange +// with the server with a user name and a password +func (x *scramClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step steps client through the SCRAM exchange. It is +// called repeatedly until it errors or `Done` returns true. +func (x *scramClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done should return true when the SCRAM conversation +// is over. +func (x *scramClient) Done() bool { + return x.ClientConversation.Done() +} + // PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka type PlainTextConfig struct { - UserName string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + Mechanism string `mapstructure:"mechanism"` } -func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) { +var _ sarama.SCRAMClient = (*scramClient)(nil) + +func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true - saramaConfig.Net.SASL.User = config.UserName + saramaConfig.Net.SASL.User = config.Username saramaConfig.Net.SASL.Password = config.Password + switch strings.ToUpper(config.Mechanism) { + case "SCRAM-SHA-256": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + case "SCRAM-SHA-512": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + case "PLAIN": + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + + default: + return fmt.Errorf("config plaintext.mechanism error: %s, only support 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'", config.Mechanism) + + } + return nil } diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 5fa6f249301..f90b6040650 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -173,29 +173,30 @@ func TestRequiredAcksFailures(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, }