diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index ddf098d5180..8565667a19b 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -76,7 +76,7 @@ func runQueryService(t *testing.T, esURL string) *Server { })) f.InitFromViper(v, flagsSvc.Logger) // set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc - f.Options.Primary.AllowTokenFromContext = true + f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true require.NoError(t, f.Initialize(metrics.NullFactory, flagsSvc.Logger)) defer f.Close() diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 99ea50f85fe..72f566c25c5 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -39,15 +39,25 @@ const ( // IndexOptions describes the index format and rollover frequency type IndexOptions struct { - Priority int64 `mapstructure:"priority"` - DateLayout string `mapstructure:"date_layout"` - Shards int64 `mapstructure:"shards"` - Replicas int64 `mapstructure:"replicas"` - RolloverFrequency string `mapstructure:"rollover_frequency"` // "hour" or "day" + // Priority contains the priority of index template (ESv8 only). + Priority int64 `mapstructure:"priority"` + DateLayout string `mapstructure:"date_layout"` + // Shards is the number of shards per index in Elasticsearch. + Shards int64 `mapstructure:"shards"` + // Replicas is the number of replicas per index in Elasticsearch. + Replicas int64 `mapstructure:"replicas"` + // RolloverFrequency contains the rollover frequency setting used to fetch + // indices from elasticsearch. + // Valid configuration options are: [hour, day]. + // This setting does not affect the index rotation and is simply used for + // fetching indices. + RolloverFrequency string `mapstructure:"rollover_frequency"` } // Indices describes different configuration options for each index type type Indices struct { + // IndexPrefix is an optional prefix to prepend to Jaeger indices. + // For example, setting this field to "production" creates "production-jaeger-*". IndexPrefix IndexPrefix `mapstructure:"index_prefix"` Spans IndexOptions `mapstructure:"spans"` Services IndexOptions `mapstructure:"services"` @@ -70,34 +80,58 @@ func (p IndexPrefix) Apply(indexName string) string { // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string `mapstructure:"server_urls" valid:"required,url"` - RemoteReadClusters []string `mapstructure:"remote_read_clusters"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` - TokenFilePath string `mapstructure:"token_file"` - PasswordFilePath string `mapstructure:"password_file"` - AllowTokenFromContext bool `mapstructure:"-"` - Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing - SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query - MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads - Timeout time.Duration `mapstructure:"-"` - BulkSize int `mapstructure:"-"` - BulkWorkers int `mapstructure:"-"` - BulkActions int `mapstructure:"-"` - BulkFlushInterval time.Duration `mapstructure:"-"` - Indices Indices `mapstructure:"indices"` - ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` - AdaptiveSamplingLookback time.Duration `mapstructure:"-"` - Tags TagsAsFields `mapstructure:"tags_as_fields"` - Enabled bool `mapstructure:"-"` - TLS configtls.ClientConfig `mapstructure:"tls"` - UseReadWriteAliases bool `mapstructure:"use_aliases"` - CreateIndexTemplates bool `mapstructure:"create_mappings"` - UseILM bool `mapstructure:"use_ilm"` - Version uint `mapstructure:"version"` - LogLevel string `mapstructure:"log_level"` - SendGetBodyAs string `mapstructure:"send_get_body_as"` + // ---- connection related configs ---- + // Servers is a list of Elasticsearch servers. The strings must must contain full URLs + // (i.e. http://localhost:9200). + Servers []string `mapstructure:"server_urls" valid:"required,url"` + // RemoteReadClusters is a list of Elasticsearch remote cluster names for cross-cluster + // querying. + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` + Authentication Authentication `mapstructure:"auth"` + // TLS contains the TLS configuration for the connection to the ElasticSearch clusters. + TLS configtls.ClientConfig `mapstructure:"tls"` + Sniffing Sniffing `mapstructure:"sniffing"` + // SendGetBodyAs is the HTTP verb to use for requests that contain a body. + SendGetBodyAs string `mapstructure:"send_get_body_as"` + // QueryTimeout contains the timeout used for queries. A timeout of zero means no timeout. + QueryTimeout time.Duration `mapstructure:"query_timeout"` + + // ---- elasticsearch client related configs ---- + BulkProcessing BulkProcessing `mapstructure:"bulk_processing"` + // Version contains the major Elasticsearch version. If this field is not specified, + // the value will be auto-detected from Elasticsearch. + Version uint `mapstructure:"version"` + // LogLevel contains the Elasticsearch client log-level. Valid values for this field + // are: [debug, info, error] + LogLevel string `mapstructure:"log_level"` + + // ---- index related configs ---- + Indices Indices `mapstructure:"indices"` + // UseReadWriteAliases, if set to true, will use read and write aliases for indices. + // Use this option with Elasticsearch rollover API. It requires an external component + // to create aliases before startup and then performing its management. + UseReadWriteAliases bool `mapstructure:"use_aliases"` + // CreateIndexTemplates, if set to true, creates index templates at application startup. + // This configuration should be set to false when templates are installed manually. + CreateIndexTemplates bool `mapstructure:"create_mappings"` + // Option to enable Index Lifecycle Management (ILM) for Jaeger span and service indices. + // Read more about ILM at + // https://www.jaegertracing.io/docs/deployment/#enabling-ilm-support + UseILM bool `mapstructure:"use_ilm"` + + // ---- jaeger-specific configs ---- + // MaxDocCount Defines maximum number of results to fetch from storage per query. + MaxDocCount int `mapstructure:"max_doc_count"` + // MaxSpanAge configures the maximum lookback on span reads. + MaxSpanAge time.Duration `mapstructure:"max_span_age"` + // ServiceCacheTTL contains the TTL for the cache of known service names. + ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` + // AdaptiveSamplingLookback contains the duration to look back for the + // latest adaptive sampling probabilities. + AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"` + Tags TagsAsFields `mapstructure:"tags_as_fields"` + // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. + Enabled bool `mapstructure:"-"` } // TagsAsFields holds configuration for tag schema. @@ -114,6 +148,53 @@ type TagsAsFields struct { Include string `mapstructure:"include"` } +// Sniffing sets the sniffing configuration for the ElasticSearch client, which is the process +// of finding all the nodes of your cluster. Read more about sniffing at +// https://github.com/olivere/elastic/wiki/Sniffing. +type Sniffing struct { + // Enabled, if set to true, enables sniffing for the ElasticSearch client. + Enabled bool `mapstructure:"enabled"` + // UseHTTPS, if set to true, sets the HTTP scheme to HTTPS when performing sniffing. + UseHTTPS bool `mapstructure:"use_https"` +} + +type BulkProcessing struct { + // MaxBytes, contains the number of bytes which specifies when to flush. + MaxBytes int `mapstructure:"max_bytes"` + // MaxActions contain the number of added actions which specifies when to flush. + MaxActions int `mapstructure:"max_actions"` + // FlushInterval is the interval at the end of which a flush occurs. + FlushInterval time.Duration `mapstructure:"flush_interval"` + // Workers contains the number of concurrent workers allowed to be executed. + Workers int `mapstructure:"workers"` +} + +type Authentication struct { + BasicAuthentication BasicAuthentication `mapstructure:"basic"` + BearerTokenAuthentication BearerTokenAuthentication `mapstructure:"bearer_token"` +} + +type BasicAuthentication struct { + // Username contains the username required to connect to Elasticsearch. + Username string `mapstructure:"username"` + // Password contains The password required by Elasticsearch + Password string `mapstructure:"password" json:"-"` + // PasswordFilePath contains the path to a file containing password. + // This file is watched for changes. + PasswordFilePath string `mapstructure:"password_file"` +} + +// BearerTokenAuthentication contains the configuration for attaching bearer tokens +// when making HTTP requests. Note that TokenFilePath and AllowTokenFromContext +// should not both be enabled. If both TokenFilePath and AllowTokenFromContext are set, +// the TokenFilePath will be ignored. +type BearerTokenAuthentication struct { + // FilePath contains the path to a file containing a bearer token. + FilePath string `mapstructure:"file_path"` + // AllowTokenFromContext, if set to true, enables reading bearer token from the context. + AllowFromContext bool `mapstructure:"from_context"` +} + // NewClient creates a new ElasticSearch client func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { if len(c.Servers) < 1 { @@ -171,10 +252,10 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact zap.Any("response", response)) } }). - BulkSize(c.BulkSize). - Workers(c.BulkWorkers). - BulkActions(c.BulkActions). - FlushInterval(c.BulkFlushInterval). + BulkSize(c.BulkProcessing.MaxBytes). + Workers(c.BulkProcessing.Workers). + BulkActions(c.BulkProcessing.MaxActions). + FlushInterval(c.BulkProcessing.FlushInterval). Do(context.Background()) if err != nil { return nil, err @@ -220,9 +301,9 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) { var options esV8.Config options.Addresses = c.Servers - options.Username = c.Username - options.Password = c.Password - options.DiscoverNodesOnStart = c.Sniffer + options.Username = c.Authentication.BasicAuthentication.Username + options.Password = c.Authentication.BasicAuthentication.Password + options.DiscoverNodesOnStart = c.Sniffing.Enabled transport, err := GetHTTPRoundTripper(c, logger) if err != nil { return nil, err @@ -258,14 +339,14 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if len(c.RemoteReadClusters) == 0 { c.RemoteReadClusters = source.RemoteReadClusters } - if c.Username == "" { - c.Username = source.Username + if c.Authentication.BasicAuthentication.Username == "" { + c.Authentication.BasicAuthentication.Username = source.Authentication.BasicAuthentication.Username } - if c.Password == "" { - c.Password = source.Password + if c.Authentication.BasicAuthentication.Password == "" { + c.Authentication.BasicAuthentication.Password = source.Authentication.BasicAuthentication.Password } - if !c.Sniffer { - c.Sniffer = source.Sniffer + if !c.Sniffing.Enabled { + c.Sniffing.Enabled = source.Sniffing.Enabled } if c.MaxSpanAge == 0 { c.MaxSpanAge = source.MaxSpanAge @@ -281,20 +362,20 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { setDefaultIndexOptions(&c.Indices.Services, &source.Indices.Services) setDefaultIndexOptions(&c.Indices.Dependencies, &source.Indices.Dependencies) - if c.BulkSize == 0 { - c.BulkSize = source.BulkSize + if c.BulkProcessing.MaxBytes == 0 { + c.BulkProcessing.MaxBytes = source.BulkProcessing.MaxBytes } - if c.BulkWorkers == 0 { - c.BulkWorkers = source.BulkWorkers + if c.BulkProcessing.Workers == 0 { + c.BulkProcessing.Workers = source.BulkProcessing.Workers } - if c.BulkActions == 0 { - c.BulkActions = source.BulkActions + if c.BulkProcessing.MaxActions == 0 { + c.BulkProcessing.MaxActions = source.BulkProcessing.MaxActions } - if c.BulkFlushInterval == 0 { - c.BulkFlushInterval = source.BulkFlushInterval + if c.BulkProcessing.FlushInterval == 0 { + c.BulkProcessing.FlushInterval = source.BulkProcessing.FlushInterval } - if !c.SnifferTLSEnabled { - c.SnifferTLSEnabled = source.SnifferTLSEnabled + if !c.Sniffing.UseHTTPS { + c.Sniffing.UseHTTPS = source.Sniffing.UseHTTPS } if !c.Tags.AllAsFields { c.Tags.AllAsFields = source.Tags.AllAsFields @@ -361,31 +442,31 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) { // getConfigOptions wraps the configs to feed to the ElasticSearch client init func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) { options := []elastic.ClientOptionFunc{ - elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer), + elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffing.Enabled), // Disable health check when token from context is allowed, this is because at this time - // we don' have a valid token to do the check ad if we don't disable the check the service that + // we don'r have a valid token to do the check ad if we don't disable the check the service that // uses this won't start. - elastic.SetHealthcheck(!c.AllowTokenFromContext), + elastic.SetHealthcheck(!c.Authentication.BearerTokenAuthentication.AllowFromContext), } - if c.SnifferTLSEnabled { + if c.Sniffing.UseHTTPS { options = append(options, elastic.SetScheme("https")) } httpClient := &http.Client{ - Timeout: c.Timeout, + Timeout: c.QueryTimeout, } options = append(options, elastic.SetHttpClient(httpClient)) - if c.Password != "" && c.PasswordFilePath != "" { + if c.Authentication.BasicAuthentication.Password != "" && c.Authentication.BasicAuthentication.PasswordFilePath != "" { return nil, fmt.Errorf("both Password and PasswordFilePath are set") } - if c.PasswordFilePath != "" { - passwordFromFile, err := loadTokenFromFile(c.PasswordFilePath) + if c.Authentication.BasicAuthentication.PasswordFilePath != "" { + passwordFromFile, err := loadTokenFromFile(c.Authentication.BasicAuthentication.PasswordFilePath) if err != nil { return nil, fmt.Errorf("failed to load password from file: %w", err) } - c.Password = passwordFromFile + c.Authentication.BasicAuthentication.Password = passwordFromFile } - options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) + options = append(options, elastic.SetBasicAuth(c.Authentication.BasicAuthentication.Username, c.Authentication.BasicAuthentication.Password)) if c.SendGetBodyAs != "" { options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs)) @@ -465,20 +546,20 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe } token := "" - if c.TokenFilePath != "" { - if c.AllowTokenFromContext { + if c.Authentication.BearerTokenAuthentication.FilePath != "" { + if c.Authentication.BearerTokenAuthentication.AllowFromContext { logger.Warn("Token file and token propagation are both enabled, token from file won't be used") } - tokenFromFile, err := loadTokenFromFile(c.TokenFilePath) + tokenFromFile, err := loadTokenFromFile(c.Authentication.BearerTokenAuthentication.FilePath) if err != nil { return nil, err } token = tokenFromFile } - if token != "" || c.AllowTokenFromContext { + if token != "" || c.Authentication.BearerTokenAuthentication.AllowFromContext { transport = bearertoken.RoundTripper{ Transport: httpTransport, - OverrideFromCtx: c.AllowTokenFromContext, + OverrideFromCtx: c.Authentication.BearerTokenAuthentication.AllowFromContext, StaticToken: token, } } diff --git a/pkg/es/config/config_test.go b/pkg/es/config/config_test.go index 9fced5e0af6..c295d37b70c 100644 --- a/pkg/es/config/config_test.go +++ b/pkg/es/config/config_test.go @@ -117,182 +117,287 @@ func TestNewClient(t *testing.T) { { name: "success with valid configuration", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush - Version: 8, + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, + Version: 8, }, expectedError: false, }, { name: "success with valid configuration and tls enabled", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush - Version: 0, - TLS: configtls.ClientConfig{Insecure: false}, + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, + Version: 0, + TLS: configtls.ClientConfig{Insecure: false}, }, expectedError: false, }, { name: "success with valid configuration and reading token and certicate from file", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush - Version: 0, + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + FilePath: pwdtokenFile, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, + Version: 0, TLS: configtls.ClientConfig{ Insecure: true, Config: configtls.Config{ CAFile: certFilePath.Name(), }, }, - TokenFilePath: pwdtokenFile, }, expectedError: false, }, { name: "succes with invalid configuration of version higher than 8", config: &Configuration{ - Servers: []string{testServer8.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush - Version: 9, + Servers: []string{testServer8.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, + Version: 9, }, expectedError: false, }, { name: "success with valid configuration with version 1", config: &Configuration{ - Servers: []string{testServer1.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer1.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: false, }, { name: "success with valid configuration with version 2", config: &Configuration{ - Servers: []string{testServer2.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer2.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: false, }, { name: "success with valid configuration password from file", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "", - LogLevel: "debug", - Username: "user", - PasswordFilePath: pwdFile, - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "", + PasswordFilePath: pwdFile, + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: false, }, { name: "fali with configuration password and password from file are set", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: pwdFile, - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: pwdFile, + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: true, }, { name: "fail with missing server", config: &Configuration{ - Servers: []string{}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "debug", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "debug", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: true, }, { name: "fail with invalid configuration invalid loglevel", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "invalid", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "invalid", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: true, }, { name: "fail with invalid configuration invalid bulkworkers number", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "invalid", - Username: "user", - PasswordFilePath: "", - BulkWorkers: 0, - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "invalid", + BulkProcessing: BulkProcessing{ + Workers: 0, + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: true, }, { name: "success with valid configuration and info log level", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "info", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "info", + BulkProcessing: BulkProcessing{ + Workers: 0, + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: false, }, { name: "success with valid configuration and error log level", config: &Configuration{ - Servers: []string{testServer.URL}, - AllowTokenFromContext: true, - Password: "secret", - LogLevel: "error", - Username: "user", - PasswordFilePath: "", - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{testServer.URL}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "user", + Password: "secret", + PasswordFilePath: "", + }, + BearerTokenAuthentication: BearerTokenAuthentication{ + AllowFromContext: true, + }, + }, + LogLevel: "error", + BulkProcessing: BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, }, expectedError: false, }, @@ -319,10 +424,17 @@ func TestNewClient(t *testing.T) { func TestApplyDefaults(t *testing.T) { source := &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "sourceUser", + Password: "sourcePass", + }, + }, + Sniffing: Sniffing{ + Enabled: true, + UseHTTPS: true, + }, MaxSpanAge: 100, AdaptiveSamplingLookback: 50, Indices: Indices{ @@ -344,15 +456,16 @@ func TestApplyDefaults(t *testing.T) { }, Sampling: IndexOptions{}, }, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + BulkProcessing: BulkProcessing{ + MaxBytes: 1000, + Workers: 10, + MaxActions: 100, + FlushInterval: 30, + }, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", } tests := []struct { @@ -375,7 +488,11 @@ func TestApplyDefaults(t *testing.T) { name: "Some Defaults Applied", target: &Configuration{ RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "customUser", + }, + }, Indices: Indices{ Spans: IndexOptions{ Priority: 10, @@ -390,11 +507,17 @@ func TestApplyDefaults(t *testing.T) { // Other fields left default }, expected: &Configuration{ - RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", - Password: "sourcePass", - Sniffer: true, - SnifferTLSEnabled: true, + RemoteReadClusters: []string{"customCluster"}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "customUser", + Password: "sourcePass", + }, + }, + Sniffing: Sniffing{ + Enabled: true, + UseHTTPS: true, + }, MaxSpanAge: 100, AdaptiveSamplingLookback: 50, Indices: Indices{ @@ -415,23 +538,32 @@ func TestApplyDefaults(t *testing.T) { Priority: 30, }, }, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + BulkProcessing: BulkProcessing{ + MaxBytes: 1000, + Workers: 10, + MaxActions: 100, + FlushInterval: 30, + }, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, }, { name: "No Defaults Applied", target: &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Authentication: Authentication{ + BasicAuthentication: BasicAuthentication{ + Username: "sourceUser", + Password: "sourcePass", + }, + }, + Sniffing: Sniffing{ + Enabled: true, + UseHTTPS: true, + }, MaxSpanAge: 100, AdaptiveSamplingLookback: 50, Indices: Indices{ @@ -452,15 +584,16 @@ func TestApplyDefaults(t *testing.T) { Priority: 30, }, }, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + BulkProcessing: BulkProcessing{ + MaxBytes: 1000, + Workers: 10, + MaxActions: 100, + FlushInterval: 30, + }, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, expected: source, }, diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index b2f2713c00f..a204ccd6bc4 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -137,8 +137,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.primaryClient.Store(&primaryClient) - if f.primaryConfig.PasswordFilePath != "" { - primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { + primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) } @@ -152,8 +152,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.archiveClient.Store(&archiveClient) - if f.archiveConfig.PasswordFilePath != "" { - archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger) + if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { + archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) } @@ -360,15 +360,15 @@ func (f *Factory) onArchivePasswordChange() { } func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) { - newPassword, err := loadTokenFromFile(cfg.PasswordFilePath) + newPassword, err := loadTokenFromFile(cfg.Authentication.BasicAuthentication.PasswordFilePath) if err != nil { f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err)) return } f.logger.Sugar().Infof("loaded new password of length %d from file", len(newPassword)) newCfg := *cfg // copy by value - newCfg.Password = newPassword - newCfg.PasswordFilePath = "" // avoid error that both are set + newCfg.Authentication.BasicAuthentication.Password = newPassword + newCfg.Authentication.BasicAuthentication.PasswordFilePath = "" // avoid error that both are set newClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory) if err != nil { diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index b06438fedea..e62c8d39157 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -371,19 +371,31 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, require.NoError(t, os.WriteFile(pwdFile, []byte(pwd1), 0o600)) f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - Username: "user", - PasswordFilePath: pwdFile, - BulkSize: -1, // disable bulk; we want immediate flush + Servers: []string{server.URL}, + LogLevel: "debug", + Authentication: escfg.Authentication{ + BasicAuthentication: escfg.BasicAuthentication{ + Username: "user", + PasswordFilePath: pwdFile, + }, + }, + BulkProcessing: escfg.BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, } f.archiveConfig = &escfg.Configuration{ - Enabled: true, - Servers: []string{server.URL}, - LogLevel: "debug", - Username: "user", - PasswordFilePath: pwdFile, - BulkSize: -1, // disable bulk; we want immediate flush + Enabled: true, + Servers: []string{server.URL}, + LogLevel: "debug", + Authentication: escfg.Authentication{ + BasicAuthentication: escfg.BasicAuthentication{ + Username: "user", + PasswordFilePath: pwdFile, + }, + }, + BulkProcessing: escfg.BulkProcessing{ + MaxBytes: -1, // disable bulk; we want immediate flush + }, } require.NoError(t, f.Initialize(metrics.NullFactory, zaptest.NewLogger(t))) defer f.Close() @@ -447,14 +459,22 @@ func TestPasswordFromFileErrors(t *testing.T) { f := NewFactory() f.primaryConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - PasswordFilePath: pwdFile, + Servers: []string{server.URL}, + LogLevel: "debug", + Authentication: escfg.Authentication{ + BasicAuthentication: escfg.BasicAuthentication{ + PasswordFilePath: pwdFile, + }, + }, } f.archiveConfig = &escfg.Configuration{ - Servers: []string{server.URL}, - LogLevel: "debug", - PasswordFilePath: pwdFile, + Servers: []string{server.URL}, + LogLevel: "debug", + Authentication: escfg.Authentication{ + BasicAuthentication: escfg.BasicAuthentication{ + PasswordFilePath: pwdFile, + }, + }, } logger, buf := testutils.NewEchoLogger(t) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 782f24f5854..01cb2ce8d19 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -135,23 +135,23 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.String( nsConfig.namespace+suffixUsername, - nsConfig.Username, + nsConfig.Authentication.BasicAuthentication.Username, "The username required by Elasticsearch. The basic authentication also loads CA if it is specified.") flagSet.String( nsConfig.namespace+suffixPassword, - nsConfig.Password, + nsConfig.Authentication.BasicAuthentication.Password, "The password required by Elasticsearch") flagSet.String( nsConfig.namespace+suffixTokenPath, - nsConfig.TokenFilePath, + nsConfig.Authentication.BearerTokenAuthentication.FilePath, "Path to a file containing bearer token. This flag also loads CA if it is specified.") flagSet.String( nsConfig.namespace+suffixPasswordPath, - nsConfig.PasswordFilePath, + nsConfig.Authentication.BasicAuthentication.PasswordFilePath, "Path to a file containing password. This file is watched for changes.") flagSet.Bool( nsConfig.namespace+suffixSniffer, - nsConfig.Sniffer, + nsConfig.Sniffing.Enabled, "The sniffer config for Elasticsearch; client uses sniffing process to find all nodes automatically, disable if not required") flagSet.String( nsConfig.namespace+suffixServerURLs, @@ -164,7 +164,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "See Elasticsearch remote clusters and cross-cluster query api.") flagSet.Duration( nsConfig.namespace+suffixTimeout, - nsConfig.Timeout, + nsConfig.QueryTimeout, "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Int64( nsConfig.namespace+suffixNumShards, @@ -193,19 +193,19 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Priority of jaeger-dependecies index template (ESv8 only)") flagSet.Int( nsConfig.namespace+suffixBulkSize, - nsConfig.BulkSize, + nsConfig.BulkProcessing.MaxBytes, "The number of bytes that the bulk requests can take up before the bulk processor decides to commit") flagSet.Int( nsConfig.namespace+suffixBulkWorkers, - nsConfig.BulkWorkers, + nsConfig.BulkProcessing.Workers, "The number of workers that are able to receive bulk requests and eventually commit them to Elasticsearch") flagSet.Int( nsConfig.namespace+suffixBulkActions, - nsConfig.BulkActions, + nsConfig.BulkProcessing.MaxActions, "The number of requests that can be enqueued before the bulk processor decides to commit") flagSet.Duration( nsConfig.namespace+suffixBulkFlushInterval, - nsConfig.BulkFlushInterval, + nsConfig.BulkProcessing.FlushInterval, "A time.Duration after which bulk requests are committed, regardless of other thresholds. Set to zero to disable. By default, this is disabled.") flagSet.String( nsConfig.namespace+suffixIndexPrefix, @@ -217,17 +217,17 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".") flagSet.String( nsConfig.namespace+suffixIndexRolloverFrequencySpans, - defaultIndexRolloverFrequency, + nsConfig.Indices.Spans.RolloverFrequency, "Rotates jaeger-span indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") flagSet.String( nsConfig.namespace+suffixIndexRolloverFrequencyServices, - defaultIndexRolloverFrequency, + nsConfig.Indices.Services.RolloverFrequency, "Rotates jaeger-service indices over the given period. For example \"day\" creates \"jaeger-service-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") flagSet.String( nsConfig.namespace+suffixIndexRolloverFrequencySampling, - defaultIndexRolloverFrequency, + nsConfig.Indices.Sampling.RolloverFrequency, "Rotates jaeger-sampling indices over the given period. For example \"day\" creates \"jaeger-sampling-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") flagSet.Bool( @@ -268,7 +268,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "The major Elasticsearch version. If not specified, the value will be auto-detected from Elasticsearch.") flagSet.Bool( nsConfig.namespace+suffixSnifferTLSEnabled, - nsConfig.SnifferTLSEnabled, + nsConfig.Sniffing.UseHTTPS, "Option to enable TLS when sniffing an Elasticsearch Cluster ; client uses sniffing process to find all nodes automatically, disabled by default") flagSet.Int( nsConfig.namespace+suffixMaxDocCount, @@ -311,12 +311,12 @@ func (opt *Options) InitFromViper(v *viper.Viper) { } func initFromViper(cfg *namespaceConfig, v *viper.Viper) { - cfg.Username = v.GetString(cfg.namespace + suffixUsername) - cfg.Password = v.GetString(cfg.namespace + suffixPassword) - cfg.TokenFilePath = v.GetString(cfg.namespace + suffixTokenPath) - cfg.PasswordFilePath = v.GetString(cfg.namespace + suffixPasswordPath) - cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer) - cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) + cfg.Authentication.BasicAuthentication.Username = v.GetString(cfg.namespace + suffixUsername) + cfg.Authentication.BasicAuthentication.Password = v.GetString(cfg.namespace + suffixPassword) + cfg.Authentication.BearerTokenAuthentication.FilePath = v.GetString(cfg.namespace + suffixTokenPath) + cfg.Authentication.BasicAuthentication.PasswordFilePath = v.GetString(cfg.namespace + suffixPasswordPath) + cfg.Sniffing.Enabled = v.GetBool(cfg.namespace + suffixSniffer) + cfg.Sniffing.UseHTTPS = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.AdaptiveSamplingLookback = v.GetDuration(cfg.namespace + suffixAdaptiveSamplingLookback) @@ -336,11 +336,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { // cfg.Indices.Sampling does not have a separate flag cfg.Indices.Dependencies.Priority = v.GetInt64(cfg.namespace + suffixPriorityDependenciesTemplate) - cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) - cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) - cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) - cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) - cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.BulkProcessing.MaxBytes = v.GetInt(cfg.namespace + suffixBulkSize) + cfg.BulkProcessing.Workers = v.GetInt(cfg.namespace + suffixBulkWorkers) + cfg.BulkProcessing.MaxActions = v.GetInt(cfg.namespace + suffixBulkActions) + cfg.BulkProcessing.FlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) + cfg.QueryTimeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.ServiceCacheTTL = v.GetDuration(cfg.namespace + suffixServiceCacheTTL) indexPrefix := v.GetString(cfg.namespace + suffixIndexPrefix) @@ -361,7 +361,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.UseILM = v.GetBool(cfg.namespace + suffixUseILM) // TODO: Need to figure out a better way for do this. - cfg.AllowTokenFromContext = v.GetBool(bearertoken.StoragePropagationKey) + cfg.Authentication.BearerTokenAuthentication.AllowFromContext = v.GetBool(bearertoken.StoragePropagationKey) remoteReadClusters := stripWhiteSpace(v.GetString(cfg.namespace + suffixRemoteReadClusters)) if len(remoteReadClusters) > 0 { @@ -422,15 +422,23 @@ func initDateLayout(rolloverFreq, sep string) string { func DefaultConfig() config.Configuration { return config.Configuration{ - Username: "", - Password: "", - Sniffer: false, + Authentication: config.Authentication{ + BasicAuthentication: config.BasicAuthentication{ + Username: "", + Password: "", + }, + }, + Sniffing: config.Sniffing{ + Enabled: false, + }, MaxSpanAge: 72 * time.Hour, AdaptiveSamplingLookback: 72 * time.Hour, - BulkSize: 5 * 1000 * 1000, - BulkWorkers: 1, - BulkActions: 1000, - BulkFlushInterval: time.Millisecond * 200, + BulkProcessing: config.BulkProcessing{ + MaxBytes: 5 * 1000 * 1000, + Workers: 1, + MaxActions: 1000, + FlushInterval: time.Millisecond * 200, + }, Tags: config.TagsAsFields{ DotReplacement: "@", }, diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 5d93858f579..33d933b5830 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -18,9 +18,9 @@ import ( func TestOptions(t *testing.T) { opts := NewOptions("foo") primary := opts.GetPrimary() - assert.Empty(t, primary.Username) - assert.Empty(t, primary.Password) - assert.Empty(t, primary.PasswordFilePath) + assert.Empty(t, primary.Authentication.BasicAuthentication.Username) + assert.Empty(t, primary.Authentication.BasicAuthentication.Password) + assert.Empty(t, primary.Authentication.BasicAuthentication.PasswordFilePath) assert.NotEmpty(t, primary.Servers) assert.Empty(t, primary.RemoteReadClusters) assert.EqualValues(t, 5, primary.Indices.Spans.Shards) @@ -32,13 +32,13 @@ func TestOptions(t *testing.T) { assert.EqualValues(t, 1, primary.Indices.Sampling.Replicas) assert.EqualValues(t, 1, primary.Indices.Dependencies.Replicas) assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) - assert.False(t, primary.Sniffer) - assert.False(t, primary.SnifferTLSEnabled) + assert.False(t, primary.Sniffing.Enabled) + assert.False(t, primary.Sniffing.UseHTTPS) aux := opts.Get("archive") - assert.Equal(t, primary.Username, aux.Username) - assert.Equal(t, primary.Password, aux.Password) - assert.Equal(t, primary.PasswordFilePath, aux.PasswordFilePath) + assert.Equal(t, primary.Authentication.BasicAuthentication.Username, aux.Authentication.BasicAuthentication.Username) + assert.Equal(t, primary.Authentication.BasicAuthentication.Password, aux.Authentication.BasicAuthentication.Password) + assert.Equal(t, primary.Authentication.BasicAuthentication.PasswordFilePath, aux.Authentication.BasicAuthentication.PasswordFilePath) assert.Equal(t, primary.Servers, aux.Servers) } @@ -79,15 +79,15 @@ func TestOptionsWithFlags(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, "hello", primary.Username) - assert.Equal(t, "world", primary.Password) - assert.Equal(t, "/foo/bar", primary.TokenFilePath) - assert.Equal(t, "/foo/bar/baz", primary.PasswordFilePath) + assert.Equal(t, "hello", primary.Authentication.BasicAuthentication.Username) + assert.Equal(t, "world", primary.Authentication.BasicAuthentication.Password) + assert.Equal(t, "/foo/bar", primary.Authentication.BearerTokenAuthentication.FilePath) + assert.Equal(t, "/foo/bar/baz", primary.Authentication.BasicAuthentication.PasswordFilePath) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters) assert.Equal(t, 48*time.Hour, primary.MaxSpanAge) - assert.True(t, primary.Sniffer) - assert.True(t, primary.SnifferTLSEnabled) + assert.True(t, primary.Sniffing.Enabled) + assert.True(t, primary.Sniffing.UseHTTPS) assert.False(t, primary.TLS.Insecure) assert.True(t, primary.TLS.InsecureSkipVerify) assert.True(t, primary.Tags.AllAsFields) @@ -98,8 +98,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "2006010215", primary.Indices.Spans.DateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) - assert.Equal(t, "hello", aux.Username) - assert.Equal(t, "world", aux.Password) + assert.Equal(t, "hello", aux.Authentication.BasicAuthentication.Username) + assert.Equal(t, "world", aux.Authentication.BasicAuthentication.Password) assert.EqualValues(t, 5, aux.Indices.Spans.Shards) assert.EqualValues(t, 5, aux.Indices.Services.Shards) assert.EqualValues(t, 5, aux.Indices.Sampling.Shards) @@ -109,7 +109,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.EqualValues(t, 10, aux.Indices.Sampling.Replicas) assert.EqualValues(t, 10, aux.Indices.Dependencies.Replicas) assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) - assert.True(t, aux.Sniffer) + assert.True(t, aux.Sniffing.Enabled) assert.True(t, aux.Tags.AllAsFields) assert.Equal(t, "@", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File)