From 393772ff8ecb313e3e531e33a038a57b56e08f16 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Mon, 31 Aug 2020 20:40:00 +1000 Subject: [PATCH 01/16] Configurable limits Signed-off-by: albertteoh --- .../app/internal/reader/es/esspanreader/query.go | 8 ++++---- .../reader/es/esspanreader/query_test.go | 4 ++-- .../reader/es/esspanreader/span_reader.go | 11 +++++++---- pkg/es/config/config.go | 10 ++++++++++ plugin/storage/es/factory.go | 1 + plugin/storage/es/options.go | 10 +++++++++- plugin/storage/es/spanstore/reader.go | 14 ++++++++------ plugin/storage/es/spanstore/reader_test.go | 3 ++- plugin/storage/es/spanstore/service_operation.go | 16 ++++++++-------- 9 files changed, 51 insertions(+), 26 deletions(-) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go index fd94be96683..b61650fa7b4 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go @@ -187,15 +187,15 @@ func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQu } } -func getServicesSearchBody() esclient.SearchBody { - aggs := fmt.Sprintf(getServicesAggregation, defaultDocCount) +func getServicesSearchBody(aggregationSize int) esclient.SearchBody { + aggs := fmt.Sprintf(getServicesAggregation, aggregationSize) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), } } -func getOperationsSearchBody(serviceName string) esclient.SearchBody { - aggs := fmt.Sprintf(getOperationsAggregation, defaultDocCount) +func getOperationsSearchBody(serviceName string, aggregationSize int) esclient.SearchBody { + aggs := fmt.Sprintf(getOperationsAggregation, aggregationSize) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), Query: &esclient.Query{ diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go index b127113af2b..3d37dca7a81 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go @@ -231,14 +231,14 @@ const ( ) func TestGetServicesSearchBody(t *testing.T) { - sb := getServicesSearchBody() + sb := getServicesSearchBody(defaultAggregationSize) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery)) } func TestGetOperationsSearchBody(t *testing.T) { - sb := getOperationsSearchBody("foo") + sb := getOperationsSearchBody("foo", defaultAggregationSize) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery)) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index d10aa60fb3b..74cdff7352c 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -34,7 +34,7 @@ const ( defaultNumTraces = 20 // default number of documents to fetch in a query // see search.max_buckets and index.max_result_window - defaultDocCount = 10_000 + defaultAggregationSize = 10_000 spanIndexBaseName = "jaeger-span" serviceIndexBaseName = "jaeger-service" @@ -65,6 +65,7 @@ type Reader struct { // maximum number of spans to fetch per query in multi search maxNumberOfSpans int archive bool + aggregationSize int } var _ spanstore.Reader = (*Reader)(nil) @@ -77,6 +78,7 @@ type Config struct { MaxSpanAge time.Duration MaxNumSpans int TagDotReplacement string + AggregationSize int } // NewEsSpanReader creates Elasticseach span reader. @@ -90,6 +92,7 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co converter: dbmodel.NewToDomain(config.TagDotReplacement), spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), + aggregationSize: config.AggregationSize, } } @@ -162,7 +165,7 @@ func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa // GetServices implements spanstore.Reader func (r *Reader) GetServices(ctx context.Context) ([]string, error) { - searchBody := getServicesSearchBody() + searchBody := getServicesSearchBody(r.aggregationSize) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -182,7 +185,7 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) { // GetOperations implements spanstore.Reader func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { - searchBody := getOperationsSearchBody(query.ServiceName) + searchBody := getOperationsSearchBody(query.ServiceName, r.aggregationSize) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -290,7 +293,7 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID, s := esclient.SearchBody{ Indices: indices, Query: traceIDQuery(traceID), - Size: defaultDocCount, + Size: defaultAggregationSize, TerminateAfter: r.maxNumberOfSpans, } if !r.archive { diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 0a0562fffd4..d0b74d5ab9d 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -65,6 +65,7 @@ type Configuration struct { UseReadWriteAliases bool `mapstructure:"use_aliases"` CreateIndexTemplates bool `mapstructure:"create_mappings"` Version uint `mapstructure:"version"` + AggregationSize int `mapstructure:"-"` } // TagsAsFields holds configuration for tag schema. @@ -98,6 +99,7 @@ type ClientBuilder interface { IsCreateIndexTemplates() bool GetVersion() uint TagKeysAsFields() ([]string, error) + GetAggregationSize() int } // NewClient creates a new ElasticSearch client @@ -233,6 +235,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Tags.File == "" { c.Tags.File = source.Tags.File } + if c.AggregationSize == 0 { + c.AggregationSize = source.AggregationSize + } } // GetNumShards returns number of shards from Configuration @@ -291,6 +296,11 @@ func (c *Configuration) GetTokenFilePath() string { return c.TokenFilePath } +// GetAggregationSize returns the number of results (buckets) to return from a query +func (c *Configuration) GetAggregationSize() int { + return c.AggregationSize +} + // IsStorageEnabled determines whether storage is enabled func (c *Configuration) IsStorageEnabled() bool { return c.Enabled diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 53522305386..04e3c9a126b 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -148,6 +148,7 @@ func createSpanReader( TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, + AggregationSize: cfg.GetAggregationSize(), }), nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 7b751a93341..959ef0b6a49 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -53,8 +53,10 @@ const ( suffixCreateIndexTemplate = ".create-index-templates" suffixEnabled = ".enabled" suffixVersion = ".version" + suffixAggregationSize = ".aggregation.size" - defaultServerURL = "http://127.0.0.1:9200" + defaultServerURL = "http://127.0.0.1:9200" + defaultAggregationSize = 10000 // the default elasticsearch allowed limit ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -97,6 +99,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { CreateIndexTemplates: true, Version: 0, Servers: []string{defaultServerURL}, + AggregationSize: defaultAggregationSize, }, namespace: primaryNamespace, }, @@ -237,6 +240,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixSnifferTLSEnabled, nsConfig.SnifferTLSEnabled, "Option to enable TLS when sniffing an Elasticsearch Cluster ; client uses sniffing process to find all nodes automatically, disabled by default") + flagSet.Int( + nsConfig.namespace+suffixAggregationSize, + nsConfig.AggregationSize, + "The aggregation size to set in Elasticsearch queries to limit the number of results returned; used primarily for querying for distinct services and operations.") if nsConfig.namespace == archiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, @@ -279,6 +286,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) cfg.CreateIndexTemplates = v.GetBool(cfg.namespace + suffixCreateIndexTemplate) cfg.Version = uint(v.GetInt(cfg.namespace + suffixVersion)) + cfg.AggregationSize = v.GetInt(cfg.namespace + suffixAggregationSize) // TODO: Need to figure out a better way for do this. cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 038bbe5130b..f1a27321a36 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -58,7 +58,6 @@ const ( tagKeyField = "key" tagValueField = "value" - defaultDocCount = 10000 // the default elasticsearch allowed limit defaultNumTraces = 100 ) @@ -101,6 +100,7 @@ type SpanReader struct { spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn sourceFn sourceFn + aggregationSize int } // SpanReaderParams holds constructor params for NewSpanReader @@ -114,6 +114,7 @@ type SpanReaderParams struct { TagDotReplacement string Archive bool UseReadWriteAliases bool + AggregationSize int } // NewSpanReader returns a new SpanReader with a metrics. @@ -127,7 +128,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), - sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), + sourceFn: getSourceFn(p.Archive, p.MaxNumSpans, p.AggregationSize), + aggregationSize: p.AggregationSize, } } @@ -155,11 +157,11 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } -func getSourceFn(archive bool, maxNumSpans int) sourceFn { +func getSourceFn(archive bool, maxNumSpans int, aggregationSize int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { s := elastic.NewSearchSource(). Query(query). - Size(defaultDocCount). + Size(aggregationSize). TerminateAfter(maxNumSpans) if !archive { s.Sort("startTime", true). @@ -241,7 +243,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - return s.serviceOperationStorage.getServices(ctx, jaegerIndices) + return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.aggregationSize) } // GetOperations returns all operations for a specific service traced by Jaeger @@ -253,7 +255,7 @@ func (s *SpanReader) GetOperations( defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName) + operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.aggregationSize) if err != nil { return nil, err } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 1901f3c7378..28e13df2717 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -101,6 +101,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", + AggregationSize: 999, }), } fn(r) @@ -840,7 +841,7 @@ func mockSearchService(r *spanReaderTest) *mock.Call { searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Size", mock.MatchedBy(func(i int) bool { - return i == 0 || i == defaultDocCount + return i == 0 })).Return(searchService) searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 44f2acb8a24..80b2868e845 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -77,8 +77,8 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span } } -func (s *ServiceOperationStorage) getServices(context context.Context, indices []string) ([]string, error) { - serviceAggregation := getServicesAggregation() +func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, aggregationSize int) ([]string, error) { + serviceAggregation := getServicesAggregation(aggregationSize) searchService := s.client.Search(indices...). Size(0). // set to 0 because we don't want actual documents. @@ -100,15 +100,15 @@ func (s *ServiceOperationStorage) getServices(context context.Context, indices [ return bucketToStringArray(serviceNamesBucket) } -func getServicesAggregation() elastic.Query { +func getServicesAggregation(aggregationSize int) elastic.Query { return elastic.NewTermsAggregation(). Field(serviceName). - Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(aggregationSize) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string) ([]string, error) { +func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string, aggregationSize int) ([]string, error) { serviceQuery := elastic.NewTermQuery(serviceName, service) - serviceFilter := getOperationsAggregation() + serviceFilter := getOperationsAggregation(aggregationSize) searchService := s.client.Search(indices...). Size(0). @@ -131,10 +131,10 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices return bucketToStringArray(operationNamesBucket) } -func getOperationsAggregation() elastic.Query { +func getOperationsAggregation(aggregationSize int) elastic.Query { return elastic.NewTermsAggregation(). Field(operationNameField). - Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(aggregationSize) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } func hashCode(s dbmodel.Service) string { From e7d69883555da8f33267c33a0faaf9253c127fc0 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Tue, 1 Sep 2020 19:44:40 +1000 Subject: [PATCH 02/16] aggregationSize -> maxDocCount Signed-off-by: albertteoh --- .../es/esdependencyreader/dependency_store.go | 8 ++++---- .../app/internal/reader/es/esspanreader/query.go | 8 ++++---- .../reader/es/esspanreader/query_test.go | 4 ++-- .../reader/es/esspanreader/span_reader.go | 15 ++++++--------- pkg/es/config/config.go | 14 +++++++------- plugin/storage/es/factory.go | 2 +- plugin/storage/es/options.go | 16 +++++++++------- plugin/storage/es/spanstore/reader.go | 16 ++++++++-------- plugin/storage/es/spanstore/reader_test.go | 2 +- plugin/storage/es/spanstore/service_operation.go | 16 ++++++++-------- 10 files changed, 50 insertions(+), 51 deletions(-) diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index 40ff973953e..f2c0d8411a3 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -38,8 +38,8 @@ const ( // default number of documents to fetch in a query // see search.max_buckets and index.max_result_window - defaultDocCount = 10_000 - indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 + defaultMaxDocCount = 10_000 + indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 ) // DependencyStore defines Elasticsearch dependency store. @@ -87,7 +87,7 @@ func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, searchBody := getSearchBody(endTs, lookback) indices := dailyIndices(r.indexPrefix, endTs, lookback) - response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...) + response, err := r.client.Search(ctx, searchBody,defaultMaxDocCount, indices...) if err != nil { return nil, err } @@ -111,7 +111,7 @@ func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody Query: &esclient.Query{ RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}}, }, - Size: defaultDocCount, + Size: defaultMaxDocCount, } } diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go index b61650fa7b4..f015320beb9 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query.go @@ -187,15 +187,15 @@ func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQu } } -func getServicesSearchBody(aggregationSize int) esclient.SearchBody { - aggs := fmt.Sprintf(getServicesAggregation, aggregationSize) +func getServicesSearchBody(maxDocCount int) esclient.SearchBody { + aggs := fmt.Sprintf(getServicesAggregation, maxDocCount) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), } } -func getOperationsSearchBody(serviceName string, aggregationSize int) esclient.SearchBody { - aggs := fmt.Sprintf(getOperationsAggregation, aggregationSize) +func getOperationsSearchBody(serviceName string, maxDocCount int) esclient.SearchBody { + aggs := fmt.Sprintf(getOperationsAggregation, maxDocCount) return esclient.SearchBody{ Aggregations: json.RawMessage(aggs), Query: &esclient.Query{ diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go index 3d37dca7a81..315155cdebf 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go @@ -231,14 +231,14 @@ const ( ) func TestGetServicesSearchBody(t *testing.T) { - sb := getServicesSearchBody(defaultAggregationSize) + sb := getServicesSearchBody(defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery)) } func TestGetOperationsSearchBody(t *testing.T) { - sb := getOperationsSearchBody("foo", defaultAggregationSize) + sb := getOperationsSearchBody("foo", defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery)) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index 74cdff7352c..70c897e8783 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -32,9 +32,6 @@ import ( const ( // by default UI fetches 20 results defaultNumTraces = 20 - // default number of documents to fetch in a query - // see search.max_buckets and index.max_result_window - defaultAggregationSize = 10_000 spanIndexBaseName = "jaeger-span" serviceIndexBaseName = "jaeger-service" @@ -65,7 +62,7 @@ type Reader struct { // maximum number of spans to fetch per query in multi search maxNumberOfSpans int archive bool - aggregationSize int + maxDocCount int } var _ spanstore.Reader = (*Reader)(nil) @@ -78,7 +75,7 @@ type Config struct { MaxSpanAge time.Duration MaxNumSpans int TagDotReplacement string - AggregationSize int + MaxDocCount int } // NewEsSpanReader creates Elasticseach span reader. @@ -92,7 +89,7 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co converter: dbmodel.NewToDomain(config.TagDotReplacement), spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), - aggregationSize: config.AggregationSize, + maxDocCount: config.MaxDocCount, } } @@ -165,7 +162,7 @@ func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa // GetServices implements spanstore.Reader func (r *Reader) GetServices(ctx context.Context) ([]string, error) { - searchBody := getServicesSearchBody(r.aggregationSize) + searchBody := getServicesSearchBody(r.maxDocCount) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -185,7 +182,7 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) { // GetOperations implements spanstore.Reader func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { - searchBody := getOperationsSearchBody(query.ServiceName, r.aggregationSize) + searchBody := getOperationsSearchBody(query.ServiceName, r.maxDocCount) currentTime := time.Now() indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) response, err := r.client.Search(ctx, searchBody, 0, indices...) @@ -293,7 +290,7 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID, s := esclient.SearchBody{ Indices: indices, Query: traceIDQuery(traceID), - Size: defaultAggregationSize, + Size: r.maxDocCount, TerminateAfter: r.maxNumberOfSpans, } if !r.archive { diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d0b74d5ab9d..c956c34bf29 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -65,7 +65,7 @@ type Configuration struct { UseReadWriteAliases bool `mapstructure:"use_aliases"` CreateIndexTemplates bool `mapstructure:"create_mappings"` Version uint `mapstructure:"version"` - AggregationSize int `mapstructure:"-"` + MaxDocCount int `mapstructure:"-"` } // TagsAsFields holds configuration for tag schema. @@ -99,7 +99,7 @@ type ClientBuilder interface { IsCreateIndexTemplates() bool GetVersion() uint TagKeysAsFields() ([]string, error) - GetAggregationSize() int + GetMaxDocCount() int } // NewClient creates a new ElasticSearch client @@ -235,8 +235,8 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Tags.File == "" { c.Tags.File = source.Tags.File } - if c.AggregationSize == 0 { - c.AggregationSize = source.AggregationSize + if c.MaxDocCount == 0 { + c.MaxDocCount = source.MaxDocCount } } @@ -296,9 +296,9 @@ func (c *Configuration) GetTokenFilePath() string { return c.TokenFilePath } -// GetAggregationSize returns the number of results (buckets) to return from a query -func (c *Configuration) GetAggregationSize() int { - return c.AggregationSize +// GetMaxDocCount returns the number of results (buckets) to return from a query +func (c *Configuration) GetMaxDocCount() int { + return c.MaxDocCount } // IsStorageEnabled determines whether storage is enabled diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 04e3c9a126b..64b83d02530 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -148,7 +148,7 @@ func createSpanReader( TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, - AggregationSize: cfg.GetAggregationSize(), + MaxDocCount: cfg.GetMaxDocCount(), }), nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 959ef0b6a49..6701980f69e 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -53,10 +53,12 @@ const ( suffixCreateIndexTemplate = ".create-index-templates" suffixEnabled = ".enabled" suffixVersion = ".version" - suffixAggregationSize = ".aggregation.size" + suffixMaxDocCount = ".max-doc-count" - defaultServerURL = "http://127.0.0.1:9200" - defaultAggregationSize = 10000 // the default elasticsearch allowed limit + // default number of documents to fetch in a query (elasticsearch allowed limit) + // see search.max_buckets and index.max_result_window + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -99,7 +101,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { CreateIndexTemplates: true, Version: 0, Servers: []string{defaultServerURL}, - AggregationSize: defaultAggregationSize, + MaxDocCount: defaultMaxDocCount, }, namespace: primaryNamespace, }, @@ -241,8 +243,8 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.SnifferTLSEnabled, "Option to enable TLS when sniffing an Elasticsearch Cluster ; client uses sniffing process to find all nodes automatically, disabled by default") flagSet.Int( - nsConfig.namespace+suffixAggregationSize, - nsConfig.AggregationSize, + nsConfig.namespace+suffixMaxDocCount, + nsConfig.MaxDocCount, "The aggregation size to set in Elasticsearch queries to limit the number of results returned; used primarily for querying for distinct services and operations.") if nsConfig.namespace == archiveNamespace { flagSet.Bool( @@ -286,7 +288,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) cfg.CreateIndexTemplates = v.GetBool(cfg.namespace + suffixCreateIndexTemplate) cfg.Version = uint(v.GetInt(cfg.namespace + suffixVersion)) - cfg.AggregationSize = v.GetInt(cfg.namespace + suffixAggregationSize) + cfg.MaxDocCount = v.GetInt(cfg.namespace + suffixMaxDocCount) // TODO: Need to figure out a better way for do this. cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index f1a27321a36..ff4595dd3d1 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -100,7 +100,7 @@ type SpanReader struct { spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn sourceFn sourceFn - aggregationSize int + maxDocCount int } // SpanReaderParams holds constructor params for NewSpanReader @@ -114,7 +114,7 @@ type SpanReaderParams struct { TagDotReplacement string Archive bool UseReadWriteAliases bool - AggregationSize int + MaxDocCount int } // NewSpanReader returns a new SpanReader with a metrics. @@ -128,8 +128,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), - sourceFn: getSourceFn(p.Archive, p.MaxNumSpans, p.AggregationSize), - aggregationSize: p.AggregationSize, + sourceFn: getSourceFn(p.Archive, p.MaxNumSpans, p.MaxDocCount), + maxDocCount: p.MaxDocCount, } } @@ -157,11 +157,11 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } -func getSourceFn(archive bool, maxNumSpans int, aggregationSize int) sourceFn { +func getSourceFn(archive bool, maxNumSpans int, maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { s := elastic.NewSearchSource(). Query(query). - Size(aggregationSize). + Size(maxDocCount). TerminateAfter(maxNumSpans) if !archive { s.Sort("startTime", true). @@ -243,7 +243,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.aggregationSize) + return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } // GetOperations returns all operations for a specific service traced by Jaeger @@ -255,7 +255,7 @@ func (s *SpanReader) GetOperations( defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.aggregationSize) + operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 28e13df2717..a06acab201d 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -101,7 +101,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", - AggregationSize: 999, + MaxDocCount: 999, }), } fn(r) diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 80b2868e845..71ad0c2ee39 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -77,8 +77,8 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span } } -func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, aggregationSize int) ([]string, error) { - serviceAggregation := getServicesAggregation(aggregationSize) +func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, maxDocCount int) ([]string, error) { + serviceAggregation := getServicesAggregation(maxDocCount) searchService := s.client.Search(indices...). Size(0). // set to 0 because we don't want actual documents. @@ -100,15 +100,15 @@ func (s *ServiceOperationStorage) getServices(context context.Context, indices [ return bucketToStringArray(serviceNamesBucket) } -func getServicesAggregation(aggregationSize int) elastic.Query { +func getServicesAggregation(maxDocCount int) elastic.Query { return elastic.NewTermsAggregation(). Field(serviceName). - Size(aggregationSize) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string, aggregationSize int) ([]string, error) { +func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string, maxDocCount int) ([]string, error) { serviceQuery := elastic.NewTermQuery(serviceName, service) - serviceFilter := getOperationsAggregation(aggregationSize) + serviceFilter := getOperationsAggregation(maxDocCount) searchService := s.client.Search(indices...). Size(0). @@ -131,10 +131,10 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices return bucketToStringArray(operationNamesBucket) } -func getOperationsAggregation(aggregationSize int) elastic.Query { +func getOperationsAggregation(maxDocCount int) elastic.Query { return elastic.NewTermsAggregation(). Field(operationNameField). - Size(aggregationSize) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 + Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } func hashCode(s dbmodel.Service) string { From fe99b9ab1f12e50cbca565abe01e9d4c01dc5cdf Mon Sep 17 00:00:00 2001 From: albertteoh Date: Tue, 1 Sep 2020 19:59:13 +1000 Subject: [PATCH 03/16] Refactor create dependency reader Signed-off-by: albertteoh --- .../app/internal/reader/es/esspanreader/query_test.go | 1 + plugin/storage/es/factory.go | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go index 315155cdebf..f1f4b24af3e 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go @@ -28,6 +28,7 @@ import ( ) const ( + defaultMaxDocCount = 10000 servicesSearchBodyFixture = `{ "aggs": { "serviceName": { diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 64b83d02530..6c66664bb44 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -111,8 +111,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()) - return reader, nil + return createDependencyReader(f.primaryClient, f.logger, f.primaryConfig) } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -187,6 +186,11 @@ func createSpanWriter( return writer, nil } +func createDependencyReader(client es.Client, logger *zap.Logger, cfg config.ClientBuilder) (dependencystore.Reader, error) { + reader := esDepStore.NewDependencyStore(client, logger, cfg.GetIndexPrefix()) + return reader, nil +} + // GetSpanServiceMappings returns span and service mappings func GetSpanServiceMappings(shards, replicas int64, esVersion uint) (string, string) { if esVersion == 7 { From b809361b6ae655dac5c45cf2bc8443d97e12675c Mon Sep 17 00:00:00 2001 From: albertteoh Date: Wed, 2 Sep 2020 21:36:50 +1000 Subject: [PATCH 04/16] Stronger assertions on aggregation sizes Signed-off-by: albertteoh --- plugin/storage/es/dependencystore/storage.go | 6 ++- .../es/dependencystore/storage_test.go | 18 +++++---- plugin/storage/es/factory.go | 2 +- plugin/storage/es/spanstore/reader_test.go | 40 ++++++++++++++----- .../storage/integration/elasticsearch_test.go | 2 +- 5 files changed, 47 insertions(+), 21 deletions(-) diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index fc983bbf109..46bfd6b5d30 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -40,10 +40,11 @@ type DependencyStore struct { client es.Client logger *zap.Logger indexPrefix string + maxDocCount int } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore { +func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { var prefix string if indexPrefix != "" { prefix = indexPrefix + "-" @@ -52,6 +53,7 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string client: client, logger: logger, indexPrefix: prefix + dependencyIndex, + maxDocCount: maxDocCount, } } @@ -82,7 +84,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := getIndices(s.indexPrefix, endTs, lookback) searchResult, err := s.client.Search(indices...). - Size(10000). // the default elasticsearch allowed limit + Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). Do(ctx) diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 2b6f460fad4..f353fb92126 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -41,14 +41,14 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStorage(indexPrefix string, fn func(r *depStorageTest)) { +func withDepStorage(indexPrefix string, maxDocCount int, fn func(r *depStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &depStorageTest{ client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix), + storage: NewDependencyStore(client, logger, indexPrefix, maxDocCount), } fn(r) } @@ -67,7 +67,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix) + r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, 0) assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) } } @@ -88,7 +88,7 @@ func TestWriteDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage("", func(r *depStorageTest) { + withDepStorage("", 0, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) indexName := indexWithDate("", fixedTime) writeService := &mocks.IndexService{} @@ -130,6 +130,7 @@ func TestGetDependencies(t *testing.T) { expectedError string expectedOutput []model.DependencyLink indexPrefix string + maxDocCount int indices []interface{} }{ { @@ -141,7 +142,8 @@ func TestGetDependencies(t *testing.T) { CallCount: 12, }, }, - indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, + maxDocCount: 1000, }, { searchResult: createSearchResult(badDependencies), @@ -161,13 +163,15 @@ func TestGetDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage(testCase.indexPrefix, func(r *depStorageTest) { + withDepStorage(testCase.indexPrefix, testCase.maxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) searchService := &mocks.SearchService{} r.client.On("Search", testCase.indices...).Return(searchService) - searchService.On("Size", mock.Anything).Return(searchService) + searchService.On("Size", mock.MatchedBy(func(size int) bool { + return size == testCase.maxDocCount + })).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 6c66664bb44..e1c0848395d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -187,7 +187,7 @@ func createSpanWriter( } func createDependencyReader(client es.Client, logger *zap.Logger, cfg config.ClientBuilder) (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(client, logger, cfg.GetIndexPrefix()) + reader := esDepStore.NewDependencyStore(client, logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()) return reader, nil } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index a06acab201d..42bbaa3f1f0 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -40,6 +40,8 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +const defaultMaxDocCount = 10000 + var exampleESSpan = []byte( `{ "traceID": "1", @@ -82,10 +84,11 @@ var exampleESSpan = []byte( }`) type spanReaderTest struct { - client *mocks.Client - logger *zap.Logger - logBuffer *testutils.Buffer - reader *SpanReader + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + reader *SpanReader + maxDocCount int } func withSpanReader(fn func(r *spanReaderTest)) { @@ -101,7 +104,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", - MaxDocCount: 999, + MaxDocCount: defaultMaxDocCount, }), } fn(r) @@ -805,7 +808,18 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } func TestFindTraceIDs(t *testing.T) { - testGet(traceIDAggregation, t) + testCases := []struct { + aggregrationID string + }{ + {traceIDAggregation}, + {servicesAggregation}, + {operationsAggregation}, + } + for _, testCase := range testCases { + t.Run(testCase.aggregrationID, func(t *testing.T) { + testGet(testCase.aggregrationID, t) + }) + } } func TestTraceIDsStringsToModelsConversion(t *testing.T) { @@ -836,15 +850,21 @@ func mockArchiveMultiSearchService(r *spanReaderTest, indexName string) *mock.Ca return multiSearchService.On("Do", mock.AnythingOfType("*context.valueCtx")) } +func matchTermsAggregation(termsAgg *elastic.TermsAggregation) bool { + val := reflect.ValueOf(termsAgg).Elem() + sizeVal := val.FieldByName("size").Elem().Int() + return sizeVal == defaultMaxDocCount +} + func mockSearchService(r *spanReaderTest) *mock.Call { searchService := &mocks.SearchService{} searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) - searchService.On("Size", mock.MatchedBy(func(i int) bool { - return i == 0 + searchService.On("Size", mock.MatchedBy(func(size int) bool { + return size == 0 })).Return(searchService) - searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) - searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) + searchService.On("Aggregation", servicesAggregation, mock.MatchedBy(matchTermsAggregation)).Return(searchService) + searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) return searchService.On("Do", mock.MatchedBy(func(ctx context.Context) bool { diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 6bd0de3f3f5..43ab53da09c 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -130,7 +130,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, 0) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil { From 45e4e1e5a17869d7d3a7d96a263ce3b850cbb072 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Thu, 3 Sep 2020 22:25:31 +1000 Subject: [PATCH 05/16] Add deprecation note Signed-off-by: albertteoh --- pkg/es/config/config.go | 2 +- plugin/storage/es/options.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index c956c34bf29..99bdae0964b 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -49,7 +49,7 @@ type Configuration struct { AllowTokenFromContext bool `mapstructure:"-"` Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxNumSpans int `mapstructure:"-"` // defines maximum number of spans to fetch from storage per query + MaxNumSpans int `mapstructure:"-"` // deprecated: use MaxDocCount instead. Defines maximum number of spans to fetch from storage per query MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards" mapstructure:"num_shards"` NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 6701980f69e..59ac46f1107 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -35,7 +35,7 @@ const ( suffixTokenPath = ".token-file" suffixServerURLs = ".server-urls" suffixMaxSpanAge = ".max-span-age" - suffixMaxNumSpans = ".max-num-spans" + suffixMaxNumSpans = ".max-num-spans" // deprecated suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" suffixBulkSize = ".bulk.size" @@ -179,7 +179,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, nsConfig.MaxNumSpans, - "The maximum number of spans to fetch at a time per query in Elasticsearch") + "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count.). The maximum number of spans to fetch at a time per query in Elasticsearch") flagSet.Int64( nsConfig.namespace+suffixNumShards, nsConfig.NumShards, From 702bffa3dce3f9133e8b4ef92c3a1d02e9344304 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Thu, 3 Sep 2020 22:44:59 +1000 Subject: [PATCH 06/16] Remove stuttered full-stop Signed-off-by: albertteoh --- plugin/storage/es/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 59ac46f1107..0d78ff0d358 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -179,7 +179,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, nsConfig.MaxNumSpans, - "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count.). The maximum number of spans to fetch at a time per query in Elasticsearch") + "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count). The maximum number of spans to fetch at a time per query in Elasticsearch") flagSet.Int64( nsConfig.namespace+suffixNumShards, nsConfig.NumShards, From 2a0d5d0e8d47c12b99d5903cb697c3116735040f Mon Sep 17 00:00:00 2001 From: albertteoh Date: Thu, 3 Sep 2020 23:22:39 +1000 Subject: [PATCH 07/16] Better comments and readability Signed-off-by: albertteoh --- .../reader/es/esdependencyreader/dependency_store.go | 2 +- .../app/internal/reader/es/esspanreader/query_test.go | 2 +- pkg/es/config/config.go | 2 +- plugin/storage/es/dependencystore/storage_test.go | 8 +++++--- plugin/storage/es/options.go | 4 ++-- plugin/storage/es/spanstore/reader_test.go | 4 ++-- plugin/storage/integration/elasticsearch_test.go | 3 ++- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index f2c0d8411a3..cb497e4b8e4 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -36,7 +36,7 @@ const ( timestampField = "timestamp" - // default number of documents to fetch in a query + // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window defaultMaxDocCount = 10_000 indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go index f1f4b24af3e..33ea5b45994 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/query_test.go @@ -28,7 +28,7 @@ import ( ) const ( - defaultMaxDocCount = 10000 + defaultMaxDocCount = 10_000 servicesSearchBodyFixture = `{ "aggs": { "serviceName": { diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 99bdae0964b..70347a8cded 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -296,7 +296,7 @@ func (c *Configuration) GetTokenFilePath() string { return c.TokenFilePath } -// GetMaxDocCount returns the number of results (buckets) to return from a query +// GetMaxDocCount returns the maximum number of documents that a query should return func (c *Configuration) GetMaxDocCount() int { return c.MaxDocCount } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index f353fb92126..a4558f80be0 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -34,6 +34,8 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" ) +const defaultMaxDocCount = 10_000 + type depStorageTest struct { client *mocks.Client logger *zap.Logger @@ -67,7 +69,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, 0) + r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, defaultMaxDocCount) assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) } } @@ -88,7 +90,7 @@ func TestWriteDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage("", 0, func(r *depStorageTest) { + withDepStorage("", defaultMaxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) indexName := indexWithDate("", fixedTime) writeService := &mocks.IndexService{} @@ -143,7 +145,7 @@ func TestGetDependencies(t *testing.T) { }, }, indices: []interface{}{"jaeger-dependencies-1995-04-21", "jaeger-dependencies-1995-04-20"}, - maxDocCount: 1000, + maxDocCount: 1000, // can be anything, assertion will check this value is used in search query. }, { searchResult: createSearchResult(badDependencies), diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 0d78ff0d358..cc9db06c8dd 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -55,7 +55,7 @@ const ( suffixVersion = ".version" suffixMaxDocCount = ".max-doc-count" - // default number of documents to fetch in a query (elasticsearch allowed limit) + // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window defaultMaxDocCount = 10_000 defaultServerURL = "http://127.0.0.1:9200" @@ -245,7 +245,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.Int( nsConfig.namespace+suffixMaxDocCount, nsConfig.MaxDocCount, - "The aggregation size to set in Elasticsearch queries to limit the number of results returned; used primarily for querying for distinct services and operations.") + "The maximum document count to return from an Elasticsearch query. This will also apply to aggregations.") if nsConfig.namespace == archiveNamespace { flagSet.Bool( nsConfig.namespace+suffixEnabled, diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 42bbaa3f1f0..517c10d2f6e 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -40,7 +40,7 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) -const defaultMaxDocCount = 10000 +const defaultMaxDocCount = 10_000 var exampleESSpan = []byte( `{ @@ -863,7 +863,7 @@ func mockSearchService(r *spanReaderTest) *mock.Call { searchService.On("Size", mock.MatchedBy(func(size int) bool { return size == 0 })).Return(searchService) - searchService.On("Aggregation", servicesAggregation, mock.MatchedBy(matchTermsAggregation)).Return(searchService) + searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 43ab53da09c..03ebbad69cb 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -46,6 +46,7 @@ const ( indexPrefix = "integration-test" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 + defaultMaxDocCount = 10_000 ) type ESStorageIntegration struct { @@ -130,7 +131,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, 0) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, defaultMaxDocCount) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil { From 2347697364e3f85c2a7799b83ce1f3dd9a7622d6 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Thu, 3 Sep 2020 23:33:19 +1000 Subject: [PATCH 08/16] Comments Signed-off-by: albertteoh --- .../app/exporter/elasticsearchexporter/integration_test.go | 1 + plugin/storage/es/spanstore/reader_test.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 5819806152b..48da7ed1aa8 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -37,6 +37,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/plugin/storage/integration" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 517c10d2f6e..fe54208797d 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -850,6 +850,8 @@ func mockArchiveMultiSearchService(r *spanReaderTest, indexName string) *mock.Ca return multiSearchService.On("Do", mock.AnythingOfType("*context.valueCtx")) } +// matchTermsAggregation uses reflection to match the size attribute of the TermsAggregation; neither +// attributes nor getters are exported by TermsAggregation. func matchTermsAggregation(termsAgg *elastic.TermsAggregation) bool { val := reflect.ValueOf(termsAgg).Elem() sizeVal := val.FieldByName("size").Elem().Int() @@ -861,7 +863,7 @@ func mockSearchService(r *spanReaderTest) *mock.Call { searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Size", mock.MatchedBy(func(size int) bool { - return size == 0 + return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level. })).Return(searchService) searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) From 11de7e3d48f3e6a1260cb2f625dfc74a8f78dd63 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 07:33:29 +1000 Subject: [PATCH 09/16] Revert spanstore import Signed-off-by: albertteoh --- .../app/exporter/elasticsearchexporter/integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 48da7ed1aa8..5819806152b 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -37,7 +37,6 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/plugin/storage/integration" - "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( From febfbaf93c9b7e8f659b0c5cbc054104c5e08b20 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 17:40:45 +1000 Subject: [PATCH 10/16] One-liner create dep reader Signed-off-by: albertteoh --- plugin/storage/es/factory.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index e1c0848395d..37240670ea4 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -111,7 +111,8 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.primaryClient, f.logger, f.primaryConfig) + reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), f.primaryConfig.GetMaxDocCount()) + return reader, nil } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -186,11 +187,6 @@ func createSpanWriter( return writer, nil } -func createDependencyReader(client es.Client, logger *zap.Logger, cfg config.ClientBuilder) (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(client, logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()) - return reader, nil -} - // GetSpanServiceMappings returns span and service mappings func GetSpanServiceMappings(shards, replicas int64, esVersion uint) (string, string) { if esVersion == 7 { From d931a7953fd86eb0c8b3375f0d928b3d138809b4 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 18:31:59 +1000 Subject: [PATCH 11/16] MaxNumSpans -> MaxDocCount Signed-off-by: albertteoh --- .../elasticsearchexporter/integration_test.go | 22 +++++++++------- .../elasticsearchexporter/storagefactory.go | 6 ++--- .../es/esdependencyreader/dependency_store.go | 17 ++++++------ .../dependency_store_test.go | 14 +++++----- pkg/es/config/config.go | 20 ++++---------- plugin/storage/es/factory.go | 3 +-- plugin/storage/es/options.go | 16 +++++++++--- plugin/storage/es/options_test.go | 26 +++++++++++++++++++ 8 files changed, 76 insertions(+), 48 deletions(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 5819806152b..f0b45e75a6c 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -40,15 +40,16 @@ import ( ) const ( - host = "0.0.0.0" - esPort = "9200" - esHostPort = host + ":" + esPort - esURL = "http://" + esHostPort - indexPrefix = "integration-test" - tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 - numShards = 5 - numReplicas = 0 + host = "0.0.0.0" + esPort = "9200" + esHostPort = host + ":" + esPort + esURL = "http://" + esHostPort + indexPrefix = "integration-test" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 + numShards = 5 + numReplicas = 0 + defaultMaxDocCount = 10_000 ) type IntegrationTest struct { @@ -120,11 +121,12 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { TagDotReplacement: tagKeyDeDotChar, MaxSpanAge: maxSpanAge, MaxNumSpans: 10_000, + MaxDocCount: defaultMaxDocCount, }) s.SpanReader = reader depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion) - depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix) + depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount) if err := depStore.CreateTemplates(depMapping); err != nil { return nil } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go index b821c81270d..c631314e70f 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go @@ -88,7 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) { UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), MaxSpanAge: cfg.GetMaxSpanAge(), - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), }), nil } @@ -100,7 +100,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error if err != nil { return nil, err } - return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix()), nil + return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil } // CreateArchiveSpanReader creates archive spanstore.Reader @@ -115,7 +115,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), MaxSpanAge: cfg.GetMaxSpanAge(), - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), }), nil } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index cb497e4b8e4..174a54b2780 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -36,10 +36,7 @@ const ( timestampField = "timestamp" - // default number of documents to return from a query (elasticsearch allowed limit) - // see search.max_buckets and index.max_result_window - defaultMaxDocCount = 10_000 - indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 + indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 ) // DependencyStore defines Elasticsearch dependency store. @@ -47,13 +44,14 @@ type DependencyStore struct { client esclient.ElasticsearchClient logger *zap.Logger indexPrefix string + maxDocCount int } var _ dependencystore.Reader = (*DependencyStore)(nil) var _ dependencystore.Writer = (*DependencyStore)(nil) // NewDependencyStore creates dependency store. -func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string) *DependencyStore { +func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { if indexPrefix != "" { indexPrefix += "-" } @@ -61,6 +59,7 @@ func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, client: client, logger: logger, indexPrefix: indexPrefix + dependencyIndexBaseName + "-", + maxDocCount: maxDocCount, } } @@ -84,10 +83,10 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D // GetDependencies implements dependencystore.Reader func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - searchBody := getSearchBody(endTs, lookback) + searchBody := getSearchBody(endTs, lookback, r.maxDocCount) indices := dailyIndices(r.indexPrefix, endTs, lookback) - response, err := r.client.Search(ctx, searchBody,defaultMaxDocCount, indices...) + response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...) if err != nil { return nil, err } @@ -106,12 +105,12 @@ func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, return dbmodel.ToDomainDependencies(dependencies), nil } -func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody { +func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esclient.SearchBody { return esclient.SearchBody{ Query: &esclient.Query{ RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}}, }, - Size: defaultMaxDocCount, + Size: maxDocCount, } } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index c7d05a342ed..ac19e02b5c0 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -33,9 +33,11 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" ) +const defaultMaxDocCount = 10_000 + func TestCreateTemplates(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) template := "template" err := store.CreateTemplates(template) require.NoError(t, err) @@ -46,7 +48,7 @@ func TestCreateTemplates(t *testing.T) { func TestWriteDependencies(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}} tsNow := time.Now() err := store.WriteDependencies(tsNow, dependencies) @@ -85,7 +87,7 @@ func TestGetDependencies(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.NoError(t, err) assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ @@ -107,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Contains(t, err.Error(), "invalid character") assert.Nil(t, dependencies) @@ -118,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) { client := &mockClient{ searchErr: searchErr, } - store := NewDependencyStore(client, zap.NewNop(), "foo") + store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) tsNow := time.Now() dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) @@ -141,7 +143,7 @@ const query = `{ func TestSearchBody(t *testing.T) { date := time.Date(2020, 8, 30, 15, 0, 0, 0, time.UTC) - sb := getSearchBody(date, time.Hour) + sb := getSearchBody(date, time.Hour, defaultMaxDocCount) jsonQuery, err := json.MarshalIndent(sb, "", " ") require.NoError(t, err) assert.Equal(t, query, string(jsonQuery)) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 70347a8cded..861f7033356 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -49,7 +49,7 @@ type Configuration struct { AllowTokenFromContext bool `mapstructure:"-"` Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxNumSpans int `mapstructure:"-"` // deprecated: use MaxDocCount instead. Defines maximum number of spans to fetch from storage per query + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards" mapstructure:"num_shards"` NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` @@ -65,7 +65,6 @@ type Configuration struct { UseReadWriteAliases bool `mapstructure:"use_aliases"` CreateIndexTemplates bool `mapstructure:"create_mappings"` Version uint `mapstructure:"version"` - MaxDocCount int `mapstructure:"-"` } // TagsAsFields holds configuration for tag schema. @@ -88,7 +87,7 @@ type ClientBuilder interface { GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration - GetMaxNumSpans() int + GetMaxDocCount() int GetIndexPrefix() string GetTagsFilePath() string GetAllTagsAsFields() bool @@ -99,7 +98,6 @@ type ClientBuilder interface { IsCreateIndexTemplates() bool GetVersion() uint TagKeysAsFields() ([]string, error) - GetMaxDocCount() int } // NewClient creates a new ElasticSearch client @@ -199,9 +197,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.MaxSpanAge == 0 { c.MaxSpanAge = source.MaxSpanAge } - if c.MaxNumSpans == 0 { - c.MaxNumSpans = source.MaxNumSpans - } if c.NumShards == 0 { c.NumShards = source.NumShards } @@ -255,9 +250,9 @@ func (c *Configuration) GetMaxSpanAge() time.Duration { return c.MaxSpanAge } -// GetMaxNumSpans returns max spans allowed per query from Configuration -func (c *Configuration) GetMaxNumSpans() int { - return c.MaxNumSpans +// GetMaxDocCount returns the maximum number of documents that a query should return +func (c *Configuration) GetMaxDocCount() int { + return c.MaxDocCount } // GetIndexPrefix returns index prefix @@ -296,11 +291,6 @@ func (c *Configuration) GetTokenFilePath() string { return c.TokenFilePath } -// GetMaxDocCount returns the maximum number of documents that a query should return -func (c *Configuration) GetMaxDocCount() int { - return c.MaxDocCount -} - // IsStorageEnabled determines whether storage is enabled func (c *Configuration) IsStorageEnabled() bool { return c.Enabled diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 37240670ea4..b41724db7b0 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -142,13 +142,12 @@ func createSpanReader( Client: client, Logger: logger, MetricsFactory: mFactory, - MaxNumSpans: cfg.GetMaxNumSpans(), + MaxDocCount: cfg.GetMaxDocCount(), MaxSpanAge: cfg.GetMaxSpanAge(), IndexPrefix: cfg.GetIndexPrefix(), TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, - MaxDocCount: cfg.GetMaxDocCount(), }), nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index cc9db06c8dd..3bfafee8f48 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -17,6 +17,7 @@ package es import ( "flag" + "math" "strings" "time" @@ -87,7 +88,6 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { Password: "", Sniffer: false, MaxSpanAge: 72 * time.Hour, - MaxNumSpans: 10000, NumShards: 5, NumReplicas: 1, BulkSize: 5 * 1000 * 1000, @@ -178,7 +178,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "The maximum lookback for spans in Elasticsearch") flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, - nsConfig.MaxNumSpans, + nsConfig.MaxDocCount, "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count). The maximum number of spans to fetch at a time per query in Elasticsearch") flagSet.Int64( nsConfig.namespace+suffixNumShards, @@ -271,7 +271,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) - cfg.MaxNumSpans = v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) @@ -288,7 +287,18 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) cfg.CreateIndexTemplates = v.GetBool(cfg.namespace + suffixCreateIndexTemplate) cfg.Version = uint(v.GetInt(cfg.namespace + suffixVersion)) + + maxNumSpans := v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.MaxDocCount = v.GetInt(cfg.namespace + suffixMaxDocCount) + + if maxNumSpans != 0 { + if cfg.MaxDocCount != 0 { + cfg.MaxDocCount = int(math.Min(float64(maxNumSpans), float64(cfg.MaxDocCount))) + } else { + cfg.MaxDocCount = maxNumSpans + } + } + // TODO: Need to figure out a better way for do this. cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 0e2daf5ecde..57a8c4ec083 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -95,3 +95,29 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) } + +func TestMaxNumSpans(t *testing.T) { + testCases := []struct { + name string + flags []string + wantMaxDocCount int + }{ + {"neither defined", []string{}, 10_000}, + {"max-num-spans only", []string{"--es.max-num-spans=1000"}, 1000}, + {"max-doc-count only", []string{"--es.max-doc-count=1000"}, 1000}, + {"max-num-spans == max-doc-count", []string{"--es.max-num-spans=1000", "--es.max-doc-count=1000"}, 1000}, + {"max-num-spans < max-doc-count", []string{"--es.max-num-spans=999", "--es.max-doc-count=1000"}, 999}, + {"max-num-spans > max-doc-count", []string{"--es.max-num-spans=1000", "--es.max-doc-count=999"}, 999}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := NewOptions("es", "es.aux") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags(tc.flags) + opts.InitFromViper(v) + + primary := opts.GetPrimary() + assert.Equal(t, tc.wantMaxDocCount, primary.MaxDocCount) + }) + } +} From 4aaa2ac41ed68e435ea63148a07c5a45f5bef7a2 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 18:35:56 +1000 Subject: [PATCH 12/16] OTEL: MaxNumSpans -> MaxDocCount Signed-off-by: albertteoh --- .../elasticsearchexporter/integration_test.go | 1 - .../internal/reader/es/esspanreader/span_reader.go | 12 ++++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index f0b45e75a6c..18274db2222 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -120,7 +120,6 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { IndexPrefix: indexPrefix, TagDotReplacement: tagKeyDeDotChar, MaxSpanAge: maxSpanAge, - MaxNumSpans: 10_000, MaxDocCount: defaultMaxDocCount, }) s.SpanReader = reader diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index 70c897e8783..42587ab26a8 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -59,10 +59,8 @@ type Reader struct { serviceIndexName indexNameProvider spanIndexName indexNameProvider maxSpanAge time.Duration - // maximum number of spans to fetch per query in multi search - maxNumberOfSpans int - archive bool maxDocCount int + archive bool } var _ spanstore.Reader = (*Reader)(nil) @@ -73,9 +71,8 @@ type Config struct { UseReadWriteAliases bool IndexPrefix string MaxSpanAge time.Duration - MaxNumSpans int - TagDotReplacement string MaxDocCount int + TagDotReplacement string } // NewEsSpanReader creates Elasticseach span reader. @@ -85,11 +82,10 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co logger: logger, archive: config.Archive, maxSpanAge: config.MaxSpanAge, - maxNumberOfSpans: config.MaxNumSpans, + maxDocCount: config.MaxDocCount, converter: dbmodel.NewToDomain(config.TagDotReplacement), spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), - maxDocCount: config.MaxDocCount, } } @@ -291,7 +287,7 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID, Indices: indices, Query: traceIDQuery(traceID), Size: r.maxDocCount, - TerminateAfter: r.maxNumberOfSpans, + TerminateAfter: r.maxDocCount, } if !r.archive { s.SearchAfter = []interface{}{nextTime} From 3f38316adfed4cd1fc9bcab2e6ccea8e99de878e Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 19:33:24 +1000 Subject: [PATCH 13/16] Fix integration test Signed-off-by: albertteoh --- plugin/storage/es/spanstore/reader.go | 9 ++++----- plugin/storage/integration/elasticsearch_test.go | 15 ++++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index ff4595dd3d1..5c7176d514e 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -108,13 +108,12 @@ type SpanReaderParams struct { Client es.Client Logger *zap.Logger MaxSpanAge time.Duration - MaxNumSpans int + MaxDocCount int MetricsFactory metrics.Factory IndexPrefix string TagDotReplacement string Archive bool UseReadWriteAliases bool - MaxDocCount int } // NewSpanReader returns a new SpanReader with a metrics. @@ -128,7 +127,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), - sourceFn: getSourceFn(p.Archive, p.MaxNumSpans, p.MaxDocCount), + sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, } } @@ -157,12 +156,12 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } -func getSourceFn(archive bool, maxNumSpans int, maxDocCount int) sourceFn { +func getSourceFn(archive bool, maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { s := elastic.NewSearchSource(). Query(query). Size(maxDocCount). - TerminateAfter(maxNumSpans) + TerminateAfter(maxDocCount) if !archive { s.Sort("startTime", true). SearchAfter(nextTime) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 03ebbad69cb..301f68a3a3c 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -39,13 +39,13 @@ import ( ) const ( - host = "0.0.0.0" - queryPort = "9200" - queryHostPort = host + ":" + queryPort - queryURL = "http://" + queryHostPort - indexPrefix = "integration-test" - tagKeyDeDotChar = "@" - maxSpanAge = time.Hour * 72 + host = "0.0.0.0" + queryPort = "9200" + queryHostPort = host + ":" + queryPort + queryURL = "http://" + queryHostPort + indexPrefix = "integration-test" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 defaultMaxDocCount = 10_000 ) @@ -130,6 +130,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, Archive: archive, + MaxDocCount: defaultMaxDocCount, }) dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, defaultMaxDocCount) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) From 5706854ab286afdc0e142bf4d4c1eed602faaf15 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 21:00:58 +1000 Subject: [PATCH 14/16] TestMaxNumSpans -> TestMaxDocCount Signed-off-by: albertteoh --- plugin/storage/es/options_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 57a8c4ec083..65ac35e405a 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -96,7 +96,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "test,tags", aux.Tags.Include) } -func TestMaxNumSpans(t *testing.T) { +func TestMaxDocCount(t *testing.T) { testCases := []struct { name string flags []string From 9ebf9d37a2662f12696c4664d043d5b6c61eaebe Mon Sep 17 00:00:00 2001 From: albertteoh Date: Fri, 4 Sep 2020 21:38:54 +1000 Subject: [PATCH 15/16] Remove unused struct member in test Signed-off-by: albertteoh --- plugin/storage/es/spanstore/reader_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index fe54208797d..997579d19e4 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -84,11 +84,10 @@ var exampleESSpan = []byte( }`) type spanReaderTest struct { - client *mocks.Client - logger *zap.Logger - logBuffer *testutils.Buffer - reader *SpanReader - maxDocCount int + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + reader *SpanReader } func withSpanReader(fn func(r *spanReaderTest)) { From 5a9d1c9ff6de2636f8d5539533548487d4873487 Mon Sep 17 00:00:00 2001 From: albertteoh Date: Sat, 5 Sep 2020 07:45:02 +1000 Subject: [PATCH 16/16] Address review comments Signed-off-by: albertteoh --- CHANGELOG.md | 14 ++++++++++++++ plugin/storage/es/options.go | 14 ++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d85e1a64dc6..e7ffde30491 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,20 @@ Changes by Version ### Backend Changes #### Breaking Changes +* Configurable ES doc count ([#2453](https://github.com/jaegertracing/jaeger/pull/2453), [@albertteoh](https://github.com/albertteoh)) + + The `--es.max-num-spans` flag has been deprecated in favour of `--es.max-doc-count`. + `--es.max-num-spans` is marked for removal in v1.21.0 as indicated in the flag description. + + If both `--es.max-num-spans` and `--es.max-doc-count` are set, the lesser of the two will be used. + + The use of `--es.max-doc-count` (which defaults to 10,000) will limit the results from all Elasticsearch + queries by the configured value, limiting counts for Jaeger UI: + + * Services + * Operations + * Dependencies (edges in a dependency graph) + * Span fetch size for a trace #### New Features diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 3bfafee8f48..06da56a3d8f 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -179,7 +179,9 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.Int( nsConfig.namespace+suffixMaxNumSpans, nsConfig.MaxDocCount, - "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count). The maximum number of spans to fetch at a time per query in Elasticsearch") + "(deprecated, will be removed in release v1.21.0. Please use es.max-doc-count). "+ + "The maximum number of spans to fetch at a time per query in Elasticsearch. "+ + "The lesser of es.max-num-spans and es.max-doc-count will be used if both are set.") flagSet.Int64( nsConfig.namespace+suffixNumShards, nsConfig.NumShards, @@ -288,15 +290,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.CreateIndexTemplates = v.GetBool(cfg.namespace + suffixCreateIndexTemplate) cfg.Version = uint(v.GetInt(cfg.namespace + suffixVersion)) - maxNumSpans := v.GetInt(cfg.namespace + suffixMaxNumSpans) cfg.MaxDocCount = v.GetInt(cfg.namespace + suffixMaxDocCount) - if maxNumSpans != 0 { - if cfg.MaxDocCount != 0 { - cfg.MaxDocCount = int(math.Min(float64(maxNumSpans), float64(cfg.MaxDocCount))) - } else { - cfg.MaxDocCount = maxNumSpans - } + if v.IsSet(cfg.namespace + suffixMaxNumSpans) { + maxNumSpans := v.GetInt(cfg.namespace + suffixMaxNumSpans) + cfg.MaxDocCount = int(math.Min(float64(maxNumSpans), float64(cfg.MaxDocCount))) } // TODO: Need to figure out a better way for do this.