diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index f33a577c511..5dd12bad634 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -29,6 +29,7 @@ import ( type Configuration struct { Servers []string `validate:"nonzero"` Keyspace string `validate:"nonzero"` + LocalDC string `yaml:"local_dc"` ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"` Timeout time.Duration `validate:"min=500"` ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"` @@ -130,7 +131,13 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig { } else { cluster.Consistency = gocql.ParseConsistency(c.Consistency) } - cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + + fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy() + if c.LocalDC != "" { + fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC) + } + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas()) + if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" { cluster.Authenticator = gocql.PasswordAuthenticator{ Username: c.Authenticator.Basic.Username, diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 5427cdec836..7652e7246c1 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -34,6 +34,7 @@ const ( suffixServers = ".servers" suffixPort = ".port" suffixKeyspace = ".keyspace" + suffixDC = ".local-dc" suffixConsistency = ".consistency" suffixProtoVer = ".proto-version" suffixSocketKeepAlive = ".socket-keep-alive" @@ -148,6 +149,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixKeyspace, nsConfig.Keyspace, "The Cassandra keyspace for Jaeger data") + flagSet.String( + nsConfig.namespace+suffixDC, + nsConfig.LocalDC, + "The name of the Cassandra local data center for DC Aware host selection") flagSet.String( nsConfig.namespace+suffixConsistency, nsConfig.Consistency, @@ -214,6 +219,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { cfg.servers = v.GetString(cfg.namespace + suffixServers) cfg.Port = v.GetInt(cfg.namespace + suffixPort) cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) + cfg.LocalDC = v.GetString(cfg.namespace + suffixDC) cfg.Consistency = v.GetString(cfg.namespace + suffixConsistency) cfg.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) cfg.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 262172b4edc..a4af1b5ec55 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -49,6 +49,7 @@ func TestOptionsWithFlags(t *testing.T) { v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ "--cas.keyspace=jaeger", + "--cas.local-dc=mojave", "--cas.servers=1.1.1.1,2.2.2.2", "--cas.connections-per-host=42", "--cas.reconnect-interval=42s", @@ -67,6 +68,7 @@ func TestOptionsWithFlags(t *testing.T) { primary := opts.GetPrimary() assert.Equal(t, "jaeger", primary.Keyspace) + assert.Equal(t, "mojave", primary.LocalDC) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, "ONE", primary.Consistency)