From a5fca4401cf3ea2b2d1e3636b58cc744a376e80c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 18 Jan 2018 15:38:39 +0100 Subject: [PATCH] Use elasticsearch bulk API Signed-off-by: Pavol Loffay --- Makefile | 10 ++ cmd/collector/main.go | 8 + pkg/es/client.go | 6 +- pkg/es/config/config.go | 61 +++++-- pkg/es/mocks/Client.go | 16 +- pkg/es/mocks/IndexService.go | 32 +--- pkg/es/mocks/IndicesCreateService.go | 2 +- pkg/es/mocks/IndicesExistsService.go | 2 +- pkg/es/mocks/MultiSearchService.go | 2 +- pkg/es/mocks/SearchService.go | 2 +- pkg/es/wrapper.go | 37 +++-- plugin/storage/cassandra/spanstore/writer.go | 6 + .../cassandra/spanstore/writer_test.go | 8 + plugin/storage/es/dependencystore/storage.go | 18 +- .../es/dependencystore/storage_test.go | 6 +- plugin/storage/es/factory.go | 2 +- plugin/storage/es/factory_test.go | 2 +- plugin/storage/es/options.go | 54 ++++-- .../storage/es/spanstore/service_operation.go | 23 +-- .../es/spanstore/service_operation_test.go | 32 +--- plugin/storage/es/spanstore/writer.go | 31 ++-- plugin/storage/es/spanstore/writer_test.go | 156 +++--------------- .../integration/es_integration_test.go | 13 +- 23 files changed, 242 insertions(+), 287 deletions(-) diff --git a/Makefile b/Makefile index f44b888b60f..26965fb296f 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,8 @@ COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/'' DOCKER_NAMESPACE?=jaegertracing DOCKER_TAG?=latest +MOCKERY=mockery + .DEFAULT_GOAL := test-and-lint .PHONY: test-and-lint @@ -236,3 +238,11 @@ thrift-image: generate-zipkin-swagger: idl-submodule $(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go + +.PHONY: install-mockery +install-mockery: + go get github.com/vektra/mockery + +.PHONY: generate-mocks +generate-mocks: install-mockery + $(MOCKERY) -all -dir ./pkg/es/ -output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go diff --git a/cmd/collector/main.go b/cmd/collector/main.go index bc3d1d696ca..27a5d00be74 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -16,6 +16,7 @@ package main import ( "fmt" + "io" "log" "net" "net/http" @@ -148,6 +149,13 @@ func main() { hc.Ready() select { case <-signalsChannel: + if closer, ok := spanWriter.(io.Closer); ok { + err := closer.Close() + if err != nil { + logger.Error("Failed to close span writer", zap.Error(err)) + } + } + logger.Info("Jaeger Collector is finishing") } return nil diff --git a/pkg/es/client.go b/pkg/es/client.go index 4472eb8d30d..30e03aa9cbb 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -16,6 +16,7 @@ package es import ( "context" + "io" "gopkg.in/olivere/elastic.v5" ) @@ -27,6 +28,7 @@ type Client interface { Index() IndexService Search(indices ...string) SearchService MultiSearch() MultiSearchService + io.Closer } // IndicesExistsService is an abstraction for elastic.IndicesExistsService @@ -40,13 +42,13 @@ type IndicesCreateService interface { Do(ctx context.Context) (*elastic.IndicesCreateResult, error) } -// IndexService is an abstraction for elastic.IndexService +// IndexService is an abstraction for elastic BulkService type IndexService interface { Index(index string) IndexService Type(typ string) IndexService Id(id string) IndexService BodyJson(body interface{}) IndexService - Do(ctx context.Context) (*elastic.IndexResponse, error) + Add() } // SearchService is an abstraction for elastic.SearchService diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 28cddb57e15..d37ec99ab7b 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -15,9 +15,12 @@ package config import ( + "bytes" + "context" "time" "github.com/pkg/errors" + "go.uber.org/zap" "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es" @@ -25,25 +28,29 @@ import ( // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string - Username string - Password string - Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing - MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads - NumShards int64 `yaml:"shards"` - NumReplicas int64 `yaml:"replicas"` + Servers []string + Username string + Password string + Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing + MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads + NumShards int64 `yaml:"shards"` + NumReplicas int64 `yaml:"replicas"` + BulkSize int + BulkWorkers int + BulkActions int + BulkFlushInterval time.Duration } // ClientBuilder creates new es.Client type ClientBuilder interface { - NewClient() (es.Client, error) + NewClient(logger *zap.Logger) (es.Client, error) GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration } // NewClient creates a new ElasticSearch client -func (c *Configuration) NewClient() (es.Client, error) { +func (c *Configuration) NewClient(logger *zap.Logger) (es.Client, error) { if len(c.Servers) < 1 { return nil, errors.New("No servers specified") } @@ -51,7 +58,29 @@ func (c *Configuration) NewClient() (es.Client, error) { if err != nil { return nil, err } - return es.WrapESClient(rawClient), nil + service, err := rawClient.BulkProcessor(). + After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + if err != nil { + var buffer bytes.Buffer + for i, r := range requests { + buffer.WriteString(r.String()) + if i+1 < len(requests) { + buffer.WriteByte('\n') + } + } + logger.Error("Elasticsearch could not process bulk request", zap.Error(err), + zap.Any("response", response), zap.String("requests", buffer.String())) + } + }). + BulkSize(c.BulkSize). + Workers(c.BulkWorkers). + BulkActions(c.BulkActions). + FlushInterval(c.BulkFlushInterval). + Do(context.Background()) + if err != nil { + return nil, err + } + return es.WrapESClient(rawClient, service), nil } // ApplyDefaults copies settings from source unless its own value is non-zero. @@ -74,6 +103,18 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.NumReplicas == 0 { c.NumReplicas = source.NumReplicas } + if c.BulkSize == 0 { + c.BulkSize = source.BulkSize + } + if c.BulkWorkers == 0 { + c.BulkWorkers = source.BulkWorkers + } + if c.BulkActions == 0 { + c.BulkActions = source.BulkActions + } + if c.BulkFlushInterval == 0 { + c.BulkFlushInterval = source.BulkFlushInterval + } } // GetNumShards returns number of shards from Configuration diff --git a/pkg/es/mocks/Client.go b/pkg/es/mocks/Client.go index 90783af50c5..fd4d82be937 100644 --- a/pkg/es/mocks/Client.go +++ b/pkg/es/mocks/Client.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,20 @@ type Client struct { mock.Mock } +// Close provides a mock function with given fields: +func (_m *Client) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // CreateIndex provides a mock function with given fields: index func (_m *Client) CreateIndex(index string) es.IndicesCreateService { ret := _m.Called(index) diff --git a/pkg/es/mocks/IndexService.go b/pkg/es/mocks/IndexService.go index bf49c20d176..a17caf23533 100644 --- a/pkg/es/mocks/IndexService.go +++ b/pkg/es/mocks/IndexService.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ package mocks -import context "context" -import elastic "gopkg.in/olivere/elastic.v5" import es "github.com/jaegertracing/jaeger/pkg/es" import mock "github.com/stretchr/testify/mock" @@ -26,6 +24,11 @@ type IndexService struct { mock.Mock } +// Add provides a mock function with given fields: +func (_m *IndexService) Add() { + _m.Called() +} + // BodyJson provides a mock function with given fields: body func (_m *IndexService) BodyJson(body interface{}) es.IndexService { ret := _m.Called(body) @@ -42,29 +45,6 @@ func (_m *IndexService) BodyJson(body interface{}) es.IndexService { return r0 } -// Do provides a mock function with given fields: ctx -func (_m *IndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) { - ret := _m.Called(ctx) - - var r0 *elastic.IndexResponse - if rf, ok := ret.Get(0).(func(context.Context) *elastic.IndexResponse); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*elastic.IndexResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // Id provides a mock function with given fields: id func (_m *IndexService) Id(id string) es.IndexService { ret := _m.Called(id) diff --git a/pkg/es/mocks/IndicesCreateService.go b/pkg/es/mocks/IndicesCreateService.go index f9b9a1751cf..5c2fcda996f 100644 --- a/pkg/es/mocks/IndicesCreateService.go +++ b/pkg/es/mocks/IndicesCreateService.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/es/mocks/IndicesExistsService.go b/pkg/es/mocks/IndicesExistsService.go index 415a526e6a5..43234846716 100644 --- a/pkg/es/mocks/IndicesExistsService.go +++ b/pkg/es/mocks/IndicesExistsService.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/es/mocks/MultiSearchService.go b/pkg/es/mocks/MultiSearchService.go index a565e0a4a47..fbb8db3d7cb 100644 --- a/pkg/es/mocks/MultiSearchService.go +++ b/pkg/es/mocks/MultiSearchService.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/es/mocks/SearchService.go b/pkg/es/mocks/SearchService.go index 6bb72f59333..46d6b84c101 100644 --- a/pkg/es/mocks/SearchService.go +++ b/pkg/es/mocks/SearchService.go @@ -1,6 +1,6 @@ // Code generated by mockery v1.0.0 -// Copyright (c) 2017 The Jaeger Authors. +// Copyright (c) 2018 The Jaeger Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/es/wrapper.go b/pkg/es/wrapper.go index c29aa22dc37..4fa9a3b0c11 100644 --- a/pkg/es/wrapper.go +++ b/pkg/es/wrapper.go @@ -24,12 +24,13 @@ import ( // ESClient is a wrapper around elastic.Client type ESClient struct { - client *elastic.Client + client *elastic.Client + bulkService *elastic.BulkProcessor } // WrapESClient creates a ESClient out of *elastic.Client. -func WrapESClient(client *elastic.Client) ESClient { - return ESClient{client: client} +func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ESClient { + return ESClient{client: client, bulkService: s} } // IndexExists calls this function to internal client. @@ -44,7 +45,8 @@ func (c ESClient) CreateIndex(index string) IndicesCreateService { // Index calls this function to internal client. func (c ESClient) Index() IndexService { - return WrapESIndexService(c.client.Index()) + r := elastic.NewBulkIndexRequest() + return WrapESIndexService(r, c.bulkService) } // Search calls this function to internal client. @@ -57,6 +59,11 @@ func (c ESClient) MultiSearch() MultiSearchService { return WrapESMultiSearchService(c.client.MultiSearch()) } +// Close closes ESClient and flushes all data to the storage. +func (c ESClient) Close() error { + return c.bulkService.Close() +} + // --- // ESIndicesExistsService is a wrapper around elastic.IndicesExistsService @@ -100,37 +107,39 @@ func (c ESIndicesCreateService) Do(ctx context.Context) (*elastic.IndicesCreateR // ESIndexService is a wrapper around elastic.ESIndexService type ESIndexService struct { - indexService *elastic.IndexService + bulkIndexReq *elastic.BulkIndexRequest + bulkService *elastic.BulkProcessor } // WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService. -func WrapESIndexService(indexService *elastic.IndexService) ESIndexService { - return ESIndexService{indexService: indexService} +func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) ESIndexService { + return ESIndexService{bulkIndexReq: indexService, bulkService: bulkService} } // Index calls this function to internal service. func (i ESIndexService) Index(index string) IndexService { - return WrapESIndexService(i.indexService.Index(index)) + return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService) } // Type calls this function to internal service. func (i ESIndexService) Type(typ string) IndexService { - return WrapESIndexService(i.indexService.Type(typ)) + return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService) } // Id calls this function to internal service. func (i ESIndexService) Id(id string) IndexService { - return WrapESIndexService(i.indexService.Id(id)) + a := WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService) + return a } // BodyJson calls this function to internal service. func (i ESIndexService) BodyJson(body interface{}) IndexService { - return WrapESIndexService(i.indexService.BodyJson(body)) + return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService) } -// Do calls this function to internal service. -func (i ESIndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) { - return i.indexService.Do(ctx) +// Add add the request to bulk service +func (i ESIndexService) Add() { + i.bulkService.Add(i.bulkIndexReq) } // --- diff --git a/plugin/storage/cassandra/spanstore/writer.go b/plugin/storage/cassandra/spanstore/writer.go index 555e6c41adf..5cf3ab962ae 100644 --- a/plugin/storage/cassandra/spanstore/writer.go +++ b/plugin/storage/cassandra/spanstore/writer.go @@ -116,6 +116,12 @@ func NewSpanWriter( } } +// Close closes SpanWriter +func (s *SpanWriter) Close() error { + s.session.Close() + return nil +} + // WriteSpan saves the span into Cassandra func (s *SpanWriter) WriteSpan(span *model.Span) error { ds := dbmodel.FromDomain(span) diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index 005c0126aca..67a1233a2d0 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -53,6 +53,14 @@ func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest)) { var _ spanstore.Writer = &SpanWriter{} // check API conformance +func TestClientClose(t *testing.T) { + withSpanWriter(0, func(w *spanWriterTest) { + w.session.On("Close").Return(nil) + w.writer.Close() + w.session.AssertNumberOfCalls(t, "Close", 1) + }) +} + func TestSpanWriter(t *testing.T) { testCases := []struct { caption string diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 34f1dfecc40..741f254a262 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -59,7 +59,8 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D if err := s.createIndex(indexName); err != nil { return err } - return s.writeDependencies(indexName, ts, dependencies) + s.writeDependencies(indexName, ts, dependencies) + return nil } func (s *DependencyStore) createIndex(indexName string) error { @@ -70,18 +71,11 @@ func (s *DependencyStore) createIndex(indexName string) error { return nil } -func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) error { - _, err := s.client.Index().Index(indexName). - Type(dependencyType). - BodyJson(&timeToDependencies{ - Timestamp: ts, +func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) { + s.client.Index().Index(indexName).Type(dependencyType). + BodyJson(&timeToDependencies{Timestamp: ts, Dependencies: dependencies, - }). - Do(s.ctx) - if err != nil { - return errors.Wrap(err, "Failed to write dependencies") - } - return nil + }).Add() } // GetDependencies returns all interservice dependencies diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 48dbf431788..5be0afed25f 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -64,10 +64,6 @@ func TestWriteDependencies(t *testing.T) { createIndexError: errors.New("index not created"), expectedError: "Failed to create index: index not created", }, - { - writeError: errors.New("write failed"), - expectedError: "Failed to write dependencies: write failed", - }, {}, } for _, testCase := range testCases { @@ -86,7 +82,7 @@ func TestWriteDependencies(t *testing.T) { writeService.On("Index", stringMatcher(indexName)).Return(writeService) writeService.On("Type", stringMatcher(dependencyType)).Return(writeService) writeService.On("BodyJson", mock.Anything).Return(writeService) - writeService.On("Do", mock.Anything).Return(nil, testCase.writeError) + writeService.On("Add", mock.Anything).Return(nil, testCase.writeError) err := r.storage.WriteDependencies(fixedTime, []model.DependencyLink{}) if testCase.expectedError != "" { diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 5aca99e88fd..4527ffb394f 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -62,7 +62,7 @@ func (f *Factory) InitFromViper(v *viper.Viper) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.primaryConfig.NewClient() + primaryClient, err := f.primaryConfig.NewClient(logger) if err != nil { return err } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index cc9edffae21..f8f3fb0a03f 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -36,7 +36,7 @@ type mockClientBuilder struct { err error } -func (m *mockClientBuilder) NewClient() (es.Client, error) { +func (m *mockClientBuilder) NewClient(logger *zap.Logger) (es.Client, error) { if m.err == nil { return &mocks.Client{}, nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index a46e46143ae..66a50cc95a3 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -25,13 +25,17 @@ import ( ) const ( - suffixUsername = ".username" - suffixPassword = ".password" - suffixSniffer = ".sniffer" - suffixServerURLs = ".server-urls" - suffixMaxSpanAge = ".max-span-age" - suffixNumShards = ".num-shards" - suffixNumReplicas = ".num-replicas" + suffixUsername = ".username" + suffixPassword = ".password" + suffixSniffer = ".sniffer" + suffixServerURLs = ".server-urls" + suffixMaxSpanAge = ".max-span-age" + suffixNumShards = ".num-shards" + suffixNumReplicas = ".num-replicas" + suffixBulkSize = ".bulk.size" + suffixBulkWorkers = ".bulk.workers" + suffixBulkActions = ".bulk.actions" + suffixBulkFlushInterval = ".bulk.flush-interval" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -60,12 +64,16 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { options := &Options{ primary: &namespaceConfig{ Configuration: config.Configuration{ - Username: "", - Password: "", - Sniffer: false, - MaxSpanAge: 72 * time.Hour, - NumShards: 5, - NumReplicas: 1, + Username: "", + Password: "", + Sniffer: false, + MaxSpanAge: 72 * time.Hour, + NumShards: 5, + NumReplicas: 1, + BulkSize: 5 * 1000 * 1000, + BulkWorkers: 1, + BulkActions: 1000, + BulkFlushInterval: time.Millisecond * 200, }, servers: "http://127.0.0.1:9200", namespace: primaryNamespace, @@ -117,6 +125,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixNumReplicas, nsConfig.NumReplicas, "The number of replicas per index in ElasticSearch") + flagSet.Int( + nsConfig.namespace+suffixBulkSize, + nsConfig.BulkSize, + "The number of bytes that the bulk requests can take up before the bulk processor decides to commit") + flagSet.Int( + nsConfig.namespace+suffixBulkWorkers, + nsConfig.BulkWorkers, + "The number of workers that are able to receive bulk requests and eventually commit them to Elasticsearch") + flagSet.Int( + nsConfig.namespace+suffixBulkActions, + nsConfig.BulkActions, + "The number of requests that can be enqueued before the bulk processor decides to commit") + flagSet.Duration( + nsConfig.namespace+suffixBulkFlushInterval, + nsConfig.BulkFlushInterval, + "A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.") } // InitFromViper initializes Options with properties from viper @@ -135,6 +159,10 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) + cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) + cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) + cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) } // GetPrimary returns primary configuration. diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 8e28dec1ecb..033b6d875f4 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -55,10 +55,9 @@ func NewServiceOperationStorage( cacheTTL time.Duration, ) *ServiceOperationStorage { return &ServiceOperationStorage{ - ctx: ctx, - client: client, - metrics: storageMetrics.NewWriteMetrics(metricsFactory, "ServiceOperation"), - logger: logger, + ctx: ctx, + client: client, + logger: logger, serviceCache: cache.NewLRUWithOptions( 100000, &cache.Options{ @@ -69,7 +68,7 @@ func NewServiceOperationStorage( } // Write saves a service to operation pair. -func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span) error { +func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span) { // Insert serviceName:operationName document service := Service{ ServiceName: jsonSpan.Process.ServiceName, @@ -78,15 +77,9 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span) serviceID := fmt.Sprintf("%s|%s", service.ServiceName, service.OperationName) cacheKey := fmt.Sprintf("%s:%s", indexName, serviceID) if !keyInCache(cacheKey, s.serviceCache) { - start := time.Now() - _, err := s.client.Index().Index(indexName).Type(serviceType).Id(serviceID).BodyJson(service).Do(s.ctx) - s.metrics.Emit(err, time.Since(start)) - if err != nil { - return s.logError(jsonSpan, err, "Failed to insert service:operation", s.logger) - } + s.client.Index().Index(indexName).Type(serviceType).Id(serviceID).BodyJson(service).Add() writeCache(cacheKey, s.serviceCache) } - return nil } func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error) { @@ -150,9 +143,3 @@ func getOperationsAggregation() elastic.Query { Field(operationNameField). Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } - -func (s *ServiceOperationStorage) logError(span *jModel.Span, err error, msg string, logger *zap.Logger) error { - logger.Debug("trace info:", zap.String("trace_id", string(span.TraceID)), zap.String("span_id", string(span.SpanID))) - logger.Error(msg, zap.Error(err)) - return errors.Wrap(err, msg) -} diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go index 6b2d8249963..b2dc7d652dc 100644 --- a/plugin/storage/es/spanstore/service_operation_test.go +++ b/plugin/storage/es/spanstore/service_operation_test.go @@ -15,8 +15,6 @@ package spanstore import ( - "errors" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -37,7 +35,7 @@ func TestWriteService(t *testing.T) { indexService.On("Type", stringMatcher(serviceType)).Return(indexService) indexService.On("Id", stringMatcher("service|operation")).Return(indexService) indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService) - indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(&elastic.IndexResponse{}, nil) + indexService.On("Add") w.client.On("Index").Return(indexService) @@ -50,16 +48,14 @@ func TestWriteService(t *testing.T) { }, } - err := w.writer.writeService(indexName, jsonSpan) - require.NoError(t, err) + w.writer.writeService(indexName, jsonSpan) - indexService.AssertNumberOfCalls(t, "Do", 1) + indexService.AssertNumberOfCalls(t, "Add", 1) assert.Equal(t, "", w.logBuffer.String()) // test that cache works, will call the index service only once. - err = w.writer.writeService(indexName, jsonSpan) - require.NoError(t, err) - indexService.AssertNumberOfCalls(t, "Do", 1) + w.writer.writeService(indexName, jsonSpan) + indexService.AssertNumberOfCalls(t, "Add", 1) }) } @@ -72,7 +68,7 @@ func TestWriteServiceError(t *testing.T) { indexService.On("Type", stringMatcher(serviceType)).Return(indexService) indexService.On("Id", stringMatcher("service|operation")).Return(indexService) indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService) - indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, errors.New("service insertion error")) + indexService.On("Add") w.client.On("Index").Return(indexService) @@ -85,21 +81,7 @@ func TestWriteServiceError(t *testing.T) { }, } - err := w.writer.writeService(indexName, jsonSpan) - assert.EqualError(t, err, "Failed to insert service:operation: service insertion error") - - indexService.AssertNumberOfCalls(t, "Do", 1) - - expectedLogs := []string{ - `"msg":"Failed to insert service:operation"`, - `"trace_id":"1"`, - `"span_id":"0"`, - `"error":"service insertion error"`, - } - - for _, expectedLog := range expectedLogs { - assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String()) - } + w.writer.writeService(indexName, jsonSpan) }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 87985e8dc94..7fe59c377a1 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -43,10 +43,9 @@ const ( type spanWriterMetrics struct { indexCreate *storageMetrics.WriteMetrics - spans *storageMetrics.WriteMetrics } -type serviceWriter func(string, *jModel.Span) error +type serviceWriter func(string, *jModel.Span) // SpanWriter is a wrapper around elastic.Client type SpanWriter struct { @@ -98,7 +97,6 @@ func NewSpanWriter( logger: logger, writerMetrics: spanWriterMetrics{ indexCreate: storageMetrics.NewWriteMetrics(metricsFactory, "IndexCreate"), - spans: storageMetrics.NewWriteMetrics(metricsFactory, "Spans"), }, serviceWriter: serviceOperationStorage.Write, indexCache: cache.NewLRUWithOptions( @@ -121,13 +119,17 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error { if err := s.createIndex(serviceIndexName, serviceMapping, jsonSpan); err != nil { return err } - if err := s.writeService(serviceIndexName, jsonSpan); err != nil { - return err - } + s.writeService(serviceIndexName, jsonSpan) if err := s.createIndex(spanIndexName, spanMapping, jsonSpan); err != nil { return err } - return s.writeSpan(spanIndexName, jsonSpan) + s.writeSpan(spanIndexName, jsonSpan) + return nil +} + +// Close closes SpanWriter +func (s *SpanWriter) Close() error { + return s.client.Close() } func indexNames(span *model.Span) (string, string) { @@ -170,19 +172,14 @@ func (s *SpanWriter) fixMapping(mapping string) string { return mapping } -func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) error { - return s.serviceWriter(indexName, jsonSpan) +func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) { + s.serviceWriter(indexName, jsonSpan) } -func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span) error { - start := time.Now() +func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span) { elasticSpan := Span{Span: jsonSpan, StartTimeMillis: jsonSpan.StartTime / 1000} // Microseconds to milliseconds - _, err := s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Do(s.ctx) - s.writerMetrics.spans.Emit(err, time.Since(start)) - if err != nil { - return s.logError(jsonSpan, err, "Failed to insert span", s.logger) - } - return nil + + s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Add() } func (s *SpanWriter) logError(span *jModel.Span, err error, msg string, logger *zap.Logger) error { diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index ccefa514273..d2df42946e9 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/json" @@ -56,6 +55,14 @@ func withSpanWriter(fn func(w *spanWriterTest)) { var _ spanstore.Writer = &SpanWriter{} // check API conformance +func TestClientClose(t *testing.T) { + withSpanWriter(func(w *spanWriterTest) { + w.client.On("Close").Return(nil) + w.writer.Close() + w.client.AssertNumberOfCalls(t, "Close", 1) + }) +} + // This test behaves as a large test that checks WriteSpan's behavior as a whole. // Extra tests for individual functions are below. func TestSpanWriter_WriteSpan(t *testing.T) { @@ -70,23 +77,6 @@ func TestSpanWriter_WriteSpan(t *testing.T) { expectedError string expectedLogs []string }{ - { - caption: "index exists query", - - serviceIndexExists: true, - spanIndexExists: true, - - expectedError: "", - expectedLogs: []string{}, - }, - { - caption: "index dne/creation query", - - serviceIndexExists: false, - - expectedError: "", - expectedLogs: []string{}, - }, { caption: "index creation error", @@ -101,33 +91,13 @@ func TestSpanWriter_WriteSpan(t *testing.T) { `"error":"index creation error"`, }, }, - { - caption: "service insertion error", - - serviceIndexExists: false, - - servicePutError: errors.New("service insertion error"), - expectedError: "Failed to insert service:operation: service insertion error", - expectedLogs: []string{ - `"msg":"Failed to insert service:operation"`, - `"trace_id":"1"`, - `"span_id":"0"`, - `"error":"service insertion error"`, - }, - }, { caption: "span insertion error", serviceIndexExists: false, - spanPutError: errors.New("span insertion error"), - expectedError: "Failed to insert span: span insertion error", - expectedLogs: []string{ - `"msg":"Failed to insert span"`, - `"trace_id":"1"`, - `"span_id":"0"`, - `"error":"span insertion error"`, - }, + expectedError: "", + expectedLogs: []string{}, }, { caption: "span index dne error", @@ -191,11 +161,11 @@ func TestSpanWriter_WriteSpan(t *testing.T) { indexServicePut.On("Id", stringMatcher("service|operation")).Return(indexServicePut) indexServicePut.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexServicePut) - indexServicePut.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.servicePutError) + indexServicePut.On("Add") indexSpanPut.On("Id", mock.AnythingOfType("string")).Return(indexSpanPut) indexSpanPut.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexSpanPut) - indexSpanPut.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.spanPutError) + indexSpanPut.On("Add") w.client.On("IndexExists", stringMatcher(spanIndexName)).Return(spanExistsService) w.client.On("CreateIndex", stringMatcher(spanIndexName)).Return(spanCreateService) @@ -206,9 +176,9 @@ func TestSpanWriter_WriteSpan(t *testing.T) { err = w.writer.WriteSpan(span) if testCase.expectedError == "" { - assert.NoError(t, err) - indexServicePut.AssertNumberOfCalls(t, "Do", 1) - indexSpanPut.AssertNumberOfCalls(t, "Do", 1) + require.NoError(t, err) + indexServicePut.AssertNumberOfCalls(t, "Add", 1) + indexSpanPut.AssertNumberOfCalls(t, "Add", 1) } else { assert.EqualError(t, err, testCase.expectedError) } @@ -235,75 +205,6 @@ func TestSpanIndexName(t *testing.T) { assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) } -func TestCheckAndCreateIndex(t *testing.T) { - testCases := []struct { - indexExists bool - indexExistsError error - createResult *elastic.IndicesCreateResult - createError error - expectedError string - expectedLogs []string - }{ - { - indexExists: false, - createResult: &elastic.IndicesCreateResult{}, - }, - { - createError: errors.New("index creation error"), - expectedError: "Failed to create index: index creation error", - expectedLogs: []string{ - `"msg":"Failed to create index"`, - `"trace_id":"1"`, - `"span_id":"0"`, - `"error":"index creation error"`, - }, - }, - { - indexExists: false, - createError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}}, - indexExistsError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}}, - }, - } - for _, testCase := range testCases { - withSpanWriter(func(w *spanWriterTest) { - existsService := &mocks.IndicesExistsService{} - existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError) - - createService := &mocks.IndicesCreateService{} - createService.On("Body", stringMatcher(w.writer.fixMapping(spanMapping))).Return(createService) - createService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.createResult, testCase.createError) - - indexName := "jaeger-1995-04-21" - w.client.On("IndexExists", stringMatcher(indexName)).Return(existsService) - w.client.On("CreateIndex", stringMatcher(indexName)).Return(createService) - - jsonSpan := &json.Span{ - TraceID: json.TraceID("1"), - SpanID: json.SpanID("0"), - } - - err := w.writer.createIndex(indexName, spanMapping, jsonSpan) - createService.AssertNumberOfCalls(t, "Do", 1) - - if testCase.expectedError == "" { - assert.NoError(t, err) - // makes sure that the cache works - _ = w.writer.createIndex(indexName, spanMapping, jsonSpan) - createService.AssertNumberOfCalls(t, "Do", 1) - } else { - assert.EqualError(t, err, testCase.expectedError) - } - - for _, expectedLog := range testCase.expectedLogs { - assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String()) - } - if len(testCase.expectedLogs) == 0 { - assert.Equal(t, "", w.logBuffer.String()) - } - }) - } -} - func TestFixMapping(t *testing.T) { withSpanWriter(func(w *spanWriterTest) { testMapping := `{ @@ -351,16 +252,14 @@ func TestWriteSpanInternal(t *testing.T) { indexService.On("Index", stringMatcher(indexName)).Return(indexService) indexService.On("Type", stringMatcher(spanType)).Return(indexService) indexService.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexService) - indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(&elastic.IndexResponse{}, nil) + indexService.On("Add") w.client.On("Index").Return(indexService) jsonSpan := &json.Span{} - err := w.writer.writeSpan(indexName, jsonSpan) - require.NoError(t, err) - - indexService.AssertNumberOfCalls(t, "Do", 1) + w.writer.writeSpan(indexName, jsonSpan) + indexService.AssertNumberOfCalls(t, "Add", 1) assert.Equal(t, "", w.logBuffer.String()) }) } @@ -373,7 +272,7 @@ func TestWriteSpanInternalError(t *testing.T) { indexService.On("Index", stringMatcher(indexName)).Return(indexService) indexService.On("Type", stringMatcher(spanType)).Return(indexService) indexService.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexService) - indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, errors.New("span insertion error")) + indexService.On("Add") w.client.On("Index").Return(indexService) @@ -382,21 +281,8 @@ func TestWriteSpanInternalError(t *testing.T) { SpanID: json.SpanID("0"), } - err := w.writer.writeSpan(indexName, jsonSpan) - assert.EqualError(t, err, "Failed to insert span: span insertion error") - - indexService.AssertNumberOfCalls(t, "Do", 1) - - expectedLogs := []string{ - `"msg":"Failed to insert span"`, - `"trace_id":"1"`, - `"span_id":"0"`, - `"error":"span insertion error"`, - } - - for _, expectedLog := range expectedLogs { - assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String()) - } + w.writer.writeSpan(indexName, jsonSpan) + indexService.AssertNumberOfCalls(t, "Add", 1) }) } diff --git a/plugin/storage/integration/es_integration_test.go b/plugin/storage/integration/es_integration_test.go index 923c3d2e360..9ad162d6c79 100644 --- a/plugin/storage/integration/es_integration_test.go +++ b/plugin/storage/integration/es_integration_test.go @@ -44,6 +44,7 @@ const ( type ESStorageIntegration struct { client *elastic.Client StorageIntegration + bulkProcessor *elastic.BulkProcessor } func (s *ESStorageIntegration) initializeES() error { @@ -59,7 +60,8 @@ func (s *ESStorageIntegration) initializeES() error { s.client = rawClient s.logger = logger - client := es.WrapESClient(s.client) + s.bulkProcessor, _ = s.client.BulkProcessor().Do(context.Background()) + client := es.WrapESClient(s.client, s.bulkProcessor) dependencyStore := dependencystore.NewDependencyStore(client, logger) s.dependencyReader = dependencyStore s.dependencyWriter = dependencyStore @@ -77,13 +79,18 @@ func (s *ESStorageIntegration) esCleanUp() error { } func (s *ESStorageIntegration) initSpanstore() { - client := es.WrapESClient(s.client) + bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) + client := es.WrapESClient(s.client, bp) s.spanWriter = spanstore.NewSpanWriter(client, s.logger, metrics.NullFactory, 0, 0) s.spanReader = spanstore.NewSpanReader(client, s.logger, 72*time.Hour, metrics.NullFactory) } func (s *ESStorageIntegration) esRefresh() error { - _, err := s.client.Refresh().Do(context.Background()) + err := s.bulkProcessor.Flush() + if err != nil { + return err + } + _, err = s.client.Refresh().Do(context.Background()) return err }