Skip to content

Commit

Permalink
Merge branch 'master' into Add-ability-to-use-JS-file-for-UI-configur…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
th3M1ke committed Jan 18, 2021
2 parents 12394b3 + 8fb235a commit 5a46489
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 50 deletions.
66 changes: 66 additions & 0 deletions cmd/agent/app/reporter/connect_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"expvar"

"github.com/uber/jaeger-lib/metrics"
)

type connectMetrics struct {
// used for reflect current connection stability
Reconnects metrics.Counter `metric:"collector_reconnects" help:"Number of successful connections (including reconnects) to the collector."`

// Connection status that jaeger-agent to jaeger-collector, 1 is connected, 0 is disconnected
Status metrics.Gauge `metric:"collector_connected" help:"Status of connection between the agent and the collector; 1 is connected, 0 is disconnected"`
}

// ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API
type ConnectMetrics struct {
metrics connectMetrics
target *expvar.String
}

// NewConnectMetrics will be initialize ConnectMetrics
func NewConnectMetrics(mf metrics.Factory) *ConnectMetrics {
cm := &ConnectMetrics{}
metrics.MustInit(&cm.metrics, mf.Namespace(metrics.NSOptions{Name: "connection_status"}), nil)

if r := expvar.Get("gRPCTarget"); r == nil {
cm.target = expvar.NewString("gRPCTarget")
} else {
cm.target = r.(*expvar.String)
}

return cm
}

// OnConnectionStatusChange used for pass the status parameter when connection is changed
// 0 is disconnected, 1 is connected
// For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0`
func (cm *ConnectMetrics) OnConnectionStatusChange(connected bool) {
if connected {
cm.metrics.Status.Update(1)
cm.metrics.Reconnects.Inc(1)
} else {
cm.metrics.Status.Update(0)
}
}

// RecordTarget writes the current connection target to an expvar field.
func (cm *ConnectMetrics) RecordTarget(target string) {
cm.target.Set(target)
}
60 changes: 60 additions & 0 deletions cmd/agent/app/reporter/connect_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"expvar"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics/metricstest"
)

func TestConnectMetrics(t *testing.T) {
mf := metricstest.NewFactory(time.Hour)
cm := NewConnectMetrics(mf)

getGauge := func() map[string]int64 {
_, gauges := mf.Snapshot()
return gauges
}

getCount := func() map[string]int64 {
counts, _ := mf.Snapshot()
return counts
}

// no connection
cm.OnConnectionStatusChange(false)
assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"])

// first connection
cm.OnConnectionStatusChange(true)
assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"])
assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"])

// reconnect
cm.OnConnectionStatusChange(false)
cm.OnConnectionStatusChange(true)
assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"])

cm.RecordTarget("collector-host")
assert.Equal(t, `"collector-host"`, expvar.Get("gRPCTarget").String())

// since expvars are singletons, the second constructor should grab the same var
cm2 := NewConnectMetrics(mf)
assert.Same(t, cm.target, cm2.target)
}
21 changes: 18 additions & 3 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"strings"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver"
Expand All @@ -51,7 +54,7 @@ func NewConnBuilder() *ConnBuilder {
}

// CreateConnection creates the gRPC connection
func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, error) {
func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) {
var dialOptions []grpc.DialOption
var dialTarget string
if b.TLS.Enabled { // user requested a secure connection
Expand Down Expand Up @@ -97,14 +100,26 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
return nil, err
}

go func(cc *grpc.ClientConn) {
connectMetrics := reporter.NewConnectMetrics(
mFactory.Namespace(metrics.NSOptions{Tags: map[string]string{"protocol": "grpc"}}),
)

go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) {
logger.Info("Checking connection to collector")

for {
s := cc.GetState()
if s == connectivity.Ready {
cm.OnConnectionStatusChange(true)
cm.RecordTarget(cc.Target())
} else {
cm.OnConnectionStatusChange(false)
}

logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(context.Background(), s)
}
}(conn)
}(conn, connectMetrics)

return conn, nil
}
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestBuilderFromConfig(t *testing.T) {
t,
[]string{"127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
r, err := cfg.CreateConnection(zap.NewNop())
r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
assert.NotNil(t, r)
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestBuilderWithCollectors(t *testing.T) {
cfg.Notifier = test.notifier
cfg.Discoverer = test.discoverer

conn, err := cfg.CreateConnection(zap.NewNop())
conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
if test.expectedError == "" {
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ProxyBuilder struct {

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
conn, err := builder.CreateConnection(logger)
conn, err := builder.CreateConnection(logger, mFactory)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) {

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {

if opts.Config.Authentication == "plaintext" {
cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
Username: opts.Config.PlainText.UserName,
Username: opts.Config.PlainText.Username,
Password: opts.Config.PlainText.Password,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {

if opts.Authentication == "plaintext" {
cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
Username: opts.PlainText.UserName,
Username: opts.PlainText.Username,
Password: opts.PlainText.Password,
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.3.2 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
27 changes: 15 additions & 12 deletions pkg/discovery/grpcresolver/grpc_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,25 @@ func (r *Resolver) Close() {
r.closing.Wait()
}

// rendezvousHash is the core of the algorithm. It takes input addresses,
// assigns each of them a hash, sorts them by those hash values, and
// returns top N of entries from the sorted list, up to minPeers parameter.
func (r *Resolver) rendezvousHash(addresses []string) []string {
hasher := fnv.New32()
hosts := hostScores{}
for _, address := range addresses {
hosts = append(hosts, hostScore{
hosts := make(hostScores, len(addresses))
for i, address := range addresses {
hosts[i] = hostScore{
address: address,
score: hashAddr(hasher, []byte(address), r.salt),
})
}
}
sort.Sort(hosts)
size := min(r.discoveryMinPeers, len(hosts))
addressesPerHost := make([]string, size)
for i := 0; i < size; i++ {
addressesPerHost[i] = hosts[i].address
n := min(r.discoveryMinPeers, len(hosts))
topN := make([]string, n)
for i := 0; i < n; i++ {
topN[i] = hosts[i].address
}
return addressesPerHost
return topN
}

func min(a, b int) int {
Expand All @@ -162,9 +165,9 @@ func (r *Resolver) updateAddresses(hostPorts []string) {
}

func generateAddresses(instances []string) []resolver.Address {
var addrs []resolver.Address
for _, instance := range instances {
addrs = append(addrs, resolver.Address{Addr: instance})
addrs := make([]resolver.Address, len(instances))
for i, instance := range instances {
addrs[i] = resolver.Address{Addr: instance}
}
return addrs
}
12 changes: 8 additions & 4 deletions pkg/discovery/grpcresolver/grpc_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,20 @@ func makeSureConnectionsUp(t *testing.T, count int, testc grpctest.TestServiceCl
// Make sure connections to all servers are up.
for si := 0; si < count; si++ {
connected := false
for i := 0; i < 1000; i++ {
for i := 0; i < 3000; i++ { // 3000 * 10ms = 30s
_, err := testc.EmptyCall(context.Background(), &grpctest.Empty{}, grpc.Peer(&p))
if err != nil {
continue
}
if _, ok := addrs[p.Addr.String()]; !ok {
addrs[p.Addr.String()] = struct{}{}
connected = true
t.Logf("connected to peer #%d (%v) on iteration %d", si, p.Addr, i)
break
}
time.Sleep(time.Millisecond * 10)
}
assert.True(t, connected, "Connection was still not up")
assert.True(t, connected, "Connection #%d was still not up. Connections so far: %+v", si, addrs)
}
}

Expand Down Expand Up @@ -135,10 +136,13 @@ func TestGRPCResolverRoundRobin(t *testing.T) {
minPeers int
connections int // expected number of unique connections to servers
}{
{3, 3}, {5, 5}, {7, 5},
{minPeers: 3, connections: 3},
{minPeers: 5, connections: 3},
// note: test cannot succeed with minPeers < connections because resolver
// will never return more than minPeers addresses.
}
for _, test := range tests {
t.Run(fmt.Sprintf("minPeers=%d", test.minPeers), func(t *testing.T) {
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
res := New(notifier, discoverer, zap.NewNop(), test.minPeers)
defer resolver.UnregisterForTesting(res.Scheme())

Expand Down
10 changes: 7 additions & 3 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
setKerberosConfiguration(&config.Kerberos, saramaConfig)
return nil
case plaintext:
setPlainTextConfiguration(&config.PlainText, saramaConfig)
err := setPlainTextConfiguration(&config.PlainText, saramaConfig)
if err != nil {
return err
}
return nil
default:
return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication)
Expand All @@ -81,7 +84,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName)
config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm)
config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUsername)
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)
Expand All @@ -97,6 +100,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.TLS.Enabled = true
}

config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism)
}
Loading

0 comments on commit 5a46489

Please sign in to comment.