diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 58503c72bb9..7d2dd74500d 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -57,7 +57,7 @@ processors: - model: jaeger protocol: compact server: - hostPort: 3.3.3.3:6831 + hostPort: 3.3.3.3:6831 socketBufferSize: 16384 - model: jaeger protocol: binary diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store.go new file mode 100644 index 00000000000..96b4e3e9bbe --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store.go @@ -0,0 +1,128 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dependencystore + +import ( + "bytes" + "context" + "encoding/json" + "strings" + "time" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" + "github.com/jaegertracing/jaeger/storage/dependencystore" +) + +const ( + dependencyType = "dependencies" + dependencyIndexBaseName = "jaeger-dependencies" + + timestampField = "timestamp" + + // default number of documents to fetch in a query + // see search.max_buckets and index.max_result_window + defaultDocCount = 10_000 + indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 +) + +// DependencyStore defines Elasticsearch dependency store. +type DependencyStore struct { + client esclient.ElasticsearchClient + logger *zap.Logger + indexPrefix string +} + +var _ dependencystore.Reader = (*DependencyStore)(nil) +var _ dependencystore.Writer = (*DependencyStore)(nil) + +// NewDependencyStore creates dependency store. +func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string) *DependencyStore { + if indexPrefix != "" { + indexPrefix += "-" + } + return &DependencyStore{ + client: client, + logger: logger, + indexPrefix: indexPrefix + dependencyIndexBaseName + "-", + } +} + +// CreateTemplates creates index templates for dependency index +func (r *DependencyStore) CreateTemplates(dependenciesTemplate string) error { + return r.client.PutTemplate(context.Background(), dependencyIndexBaseName, strings.NewReader(dependenciesTemplate)) +} + +// WriteDependencies implements dependencystore.Writer +func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { + d := &dbmodel.TimeDependencies{ + Timestamp: ts, + Dependencies: dbmodel.FromDomainDependencies(dependencies), + } + data, err := json.Marshal(d) + if err != nil { + return err + } + return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, ts), dependencyType) +} + +// GetDependencies implements dependencystore.Reader +func (r *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + searchBody := getSearchBody(endTs, lookback) + + indices := dailyIndices(r.indexPrefix, endTs, lookback) + response, err := r.client.Search(context.Background(), searchBody, defaultDocCount, indices...) + if err != nil { + return nil, err + } + + var dependencies []dbmodel.DependencyLink + for _, hit := range response.Hits.Hits { + var d dbmodel.TimeDependencies + if err := json.Unmarshal(*hit.Source, &d); err != nil { + return nil, err + } + dependencies = append(dependencies, d.Dependencies...) + } + return dbmodel.ToDomainDependencies(dependencies), nil +} + +func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody { + return esclient.SearchBody{ + Query: &esclient.Query{ + RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}}, + }, + Size: defaultDocCount, + } +} + +func indexWithDate(indexNamePrefix string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateFormat) +} + +func dailyIndices(prefix string, ts time.Time, lookback time.Duration) []string { + var indices []string + firstIndex := indexWithDate(prefix, ts.Add(-lookback)) + currentIndex := indexWithDate(prefix, ts) + for currentIndex != firstIndex { + indices = append(indices, currentIndex) + ts = ts.Add(-24 * time.Hour) + currentIndex = indexWithDate(prefix, ts) + } + return append(indices, firstIndex) +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store_test.go new file mode 100644 index 00000000000..ee1e90518cd --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore/dependency_store_test.go @@ -0,0 +1,191 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dependencystore + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" +) + +func TestCreateTemplates(t *testing.T) { + client := &mockClient{} + store := NewDependencyStore(client, zap.NewNop(), "foo") + template := "template" + err := store.CreateTemplates(template) + require.NoError(t, err) + receivedBody, err := ioutil.ReadAll(client.receivedBody) + require.NoError(t, err) + assert.Equal(t, template, string(receivedBody)) +} + +func TestWriteDependencies(t *testing.T) { + client := &mockClient{} + store := NewDependencyStore(client, zap.NewNop(), "foo") + dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}} + tsNow := time.Now() + err := store.WriteDependencies(tsNow, dependencies) + require.NoError(t, err) + + d := &dbmodel.TimeDependencies{ + Timestamp: tsNow, + Dependencies: dbmodel.FromDomainDependencies(dependencies), + } + jsonDependencies, err := json.Marshal(d) + require.NoError(t, err) + + receivedBody, err := ioutil.ReadAll(client.receivedBody) + require.NoError(t, err) + assert.Equal(t, jsonDependencies, receivedBody) +} + +func TestGetDependencies(t *testing.T) { + tsNow := time.Now() + timeDependencies := dbmodel.TimeDependencies{ + Timestamp: tsNow, + Dependencies: []dbmodel.DependencyLink{ + {Parent: "foo", Child: "bar"}, + }, + } + jsonDep, err := json.Marshal(timeDependencies) + require.NoError(t, err) + rawMessage := json.RawMessage(jsonDep) + client := &mockClient{ + searchResponse: &esclient.SearchResponse{ + Hits: esclient.Hits{ + Total: 1, + Hits: []esclient.Hit{ + {Source: &rawMessage}, + }, + }, + }, + } + store := NewDependencyStore(client, zap.NewNop(), "foo") + dependencies, err := store.GetDependencies(tsNow, time.Hour) + require.NoError(t, err) + assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ + Timestamp: tsNow, + Dependencies: dbmodel.FromDomainDependencies(dependencies), + }) +} + +func TestGetDependencies_err_unmarshall(t *testing.T) { + tsNow := time.Now() + rawMessage := json.RawMessage("#") + client := &mockClient{ + searchResponse: &esclient.SearchResponse{ + Hits: esclient.Hits{ + Total: 1, + Hits: []esclient.Hit{ + {Source: &rawMessage}, + }, + }, + }, + } + store := NewDependencyStore(client, zap.NewNop(), "foo") + dependencies, err := store.GetDependencies(tsNow, time.Hour) + require.Contains(t, err.Error(), "invalid character") + assert.Nil(t, dependencies) +} + +func TestGetDependencies_err_client(t *testing.T) { + searchErr := fmt.Errorf("client err") + client := &mockClient{ + searchErr: searchErr, + } + store := NewDependencyStore(client, zap.NewNop(), "foo") + tsNow := time.Now() + dependencies, err := store.GetDependencies(tsNow, time.Hour) + require.Error(t, err) + assert.Nil(t, dependencies) + assert.Contains(t, err.Error(), searchErr.Error()) +} + +const query = `{ + "query": { + "range": { + "timestamp": { + "gte": "2020-08-30T14:00:00Z", + "lte": "2020-08-30T15:00:00Z" + } + } + }, + "size": 10000, + "terminate_after": 0 +}` + +func TestSearchBody(t *testing.T) { + date := time.Date(2020, 8, 30, 15, 0, 0, 0, time.UTC) + sb := getSearchBody(date, time.Hour) + jsonQuery, err := json.MarshalIndent(sb, "", " ") + require.NoError(t, err) + assert.Equal(t, query, string(jsonQuery)) +} + +func TestIndexWithDate(t *testing.T) { + assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC))) +} + +func TestDailyIndices(t *testing.T) { + indices := dailyIndices("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour) + assert.Equal(t, []string{"foo-2020-09-30", "foo-2020-09-29"}, indices) +} + +type mockClient struct { + receivedBody io.Reader + searchResponse *esclient.SearchResponse + searchErr error +} + +var _ esclient.ElasticsearchClient = (*mockClient)(nil) + +func (m *mockClient) PutTemplate(ctx context.Context, name string, template io.Reader) error { + m.receivedBody = template + return nil +} + +func (m mockClient) Bulk(ctx context.Context, bulkBody io.Reader) (*esclient.BulkResponse, error) { + panic("implement me") +} + +func (m mockClient) AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) { + panic("implement me") +} + +func (m *mockClient) Index(ctx context.Context, body io.Reader, index, typ string) error { + m.receivedBody = body + return nil +} + +func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size int, indices ...string) (*esclient.SearchResponse, error) { + return m.searchResponse, m.searchErr +} + +func (m mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) { + panic("implement me") +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go index d6d0f5e2a20..a33c4363fd8 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go @@ -16,8 +16,11 @@ package esclient import ( "bytes" + "context" + "encoding/json" "fmt" "io" + "net/http" "go.uber.org/zap" @@ -27,11 +30,18 @@ import ( // ElasticsearchClient exposes Elasticsearch API used by Jaeger type ElasticsearchClient interface { // PutTemplate creates index template - PutTemplate(name string, template io.Reader) error + PutTemplate(ctx context.Context, name string, template io.Reader) error // Bulk submits a bulk request - Bulk(bulkBody io.Reader) (*BulkResponse, error) + Bulk(ctx context.Context, bulkBody io.Reader) (*BulkResponse, error) // AddDataToBulkBuffer creates bulk item from data, index and typ and adds it to bulkBody AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) + // Index data + Index(ctx context.Context, body io.Reader, index, typ string) error + + // Search searches data via /_search + Search(ctx context.Context, query SearchBody, size int, indices ...string) (*SearchResponse, error) + // MultiSearch searches data via /_msearch + MultiSearch(ctx context.Context, queries []SearchBody) (*MultiSearchResponse, error) } // BulkResponse is a response returned by Elasticsearch Bulk API @@ -54,6 +64,106 @@ type BulkResponse struct { } `json:"items"` } +// SearchBody defines search request. +type SearchBody struct { + // indices are not in body, the ES client puts them to request path + Indices []string `json:"-"` + Aggregations json.RawMessage `json:"aggs,omitempty"` + Query *Query `json:"query,omitempty"` + Sort []map[string]Order `json:"sort,omitempty"` + Size int `json:"size"` + TerminateAfter int `json:"terminate_after"` + SearchAfter []interface{} `json:"search_after,omitempty"` +} + +// Order defines order in the query. +type Order string + +const ( + // AscOrder defines ascending order. + AscOrder Order = "asc" +) + +// BoolQueryType defines bool query type. +type BoolQueryType string + +// Must defines must bool query type. +const Must BoolQueryType = "must" + +// Should defines should bool query type. +const Should BoolQueryType = "should" + +// Query defines search query. +type Query struct { + Term *Terms `json:"term,omitempty"` + RangeQueries map[string]RangeQuery `json:"range,omitempty"` + BoolQuery map[BoolQueryType][]BoolQuery `json:"bool,omitempty"` +} + +// BoolQuery defines bool query. +type BoolQuery struct { + Term map[string]string `json:"term,omitempty"` + Regexp map[string]TermQuery `json:"regexp,omitempty"` + Nested *NestedQuery `json:"nested,omitempty"` + BoolQuery map[BoolQueryType][]BoolQuery `json:"bool,omitempty"` + RangeQueries map[string]RangeQuery `json:"range,omitempty"` + MatchQueries map[string]MatchQuery `json:"match,omitempty"` +} + +// NestedQuery defines nested query. +type NestedQuery struct { + Path string `json:"path"` + Query Query `json:"query"` +} + +// RangeQuery defines range query. +type RangeQuery struct { + GTE interface{} `json:"gte"` + LTE interface{} `json:"lte"` +} + +// Terms defines terms query. +type Terms map[string]TermQuery + +// TermQuery defines term query. +type TermQuery struct { + Value string `json:"value"` +} + +// MatchQuery defines match query. +type MatchQuery struct { + Query string `json:"query"` +} + +// MultiSearchResponse defines multi search response. +type MultiSearchResponse struct { + Responses []SearchResponse `json:"responses"` +} + +// SearchResponse defines search response. +type SearchResponse struct { + Hits Hits `json:"hits"` + Aggs map[string]AggregationResponse `json:"aggregations,omitempty"` +} + +// Hits defines search hits. +type Hits struct { + Total int `json:"total"` + Hits []Hit `json:"hits"` +} + +// Hit defines a single search hit. +type Hit struct { + Source *json.RawMessage `json:"_source"` +} + +// AggregationResponse defines aggregation reponse. +type AggregationResponse struct { + Buckets []struct { + Key string `json:"key"` + } `json:"buckets"` +} + // NewElasticsearchClient returns an instance of Elasticsearch client func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (ElasticsearchClient, error) { roundTripper, err := config.GetHTTPRoundTripper(¶ms, logger) @@ -73,12 +183,28 @@ func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (El logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) params.Version = uint(esVersion) } - switch params.Version { + return newElasticsearchClient(int(params.Version), clientConfig{ + DiscoverNotesOnStartup: params.Sniffer, + Addresses: params.Servers, + Username: params.Username, + Password: params.Password, + }, roundTripper) +} + +type clientConfig struct { + DiscoverNotesOnStartup bool + Addresses []string + Username string + Password string +} + +func newElasticsearchClient(version int, params clientConfig, roundTripper http.RoundTripper) (ElasticsearchClient, error) { + switch version { case 5, 6: return newElasticsearch6Client(params, roundTripper) case 7: return newElasticsearch7Client(params, roundTripper) default: - return nil, fmt.Errorf("could not create Elasticseach client for version %d", params.Version) + return nil, fmt.Errorf("could not create Elasticseach client for version %d", version) } } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go index 196cecd7aa0..d68af133d7a 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go @@ -15,6 +15,12 @@ package esclient import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -61,3 +67,271 @@ func TestGetClient(t *testing.T) { }) } } + +type mockTransport struct { + Response *http.Response + Err error +} + +func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return t.Response, t.Err +} + +type mockReader struct { + err error +} + +var _ io.Reader = (*mockReader)(nil) + +func (m mockReader) Read(p []byte) (n int, err error) { + return 0, m.err +} + +func testPutTemplate(t *testing.T, clientFactory func(tripper http.RoundTripper) (ElasticsearchClient, error)) { + tests := []struct { + name string + transportErr error + }{ + { + name: "body reader error", + transportErr: fmt.Errorf("failed to get body"), + }, + { + name: "success", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, + Err: test.transportErr, + } + client, err := clientFactory(mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + + err = client.PutTemplate(context.Background(), "foo", strings.NewReader("")) + if test.transportErr != nil { + require.Error(t, err) + assert.Contains(t, err.Error(), test.transportErr.Error()) + return + } + require.NoError(t, err) + }) + } +} + +func testBulk(t *testing.T, clientFactory func(tripper http.RoundTripper) (ElasticsearchClient, error)) { + tests := []struct { + expectedResponse *BulkResponse + resp *http.Response + err string + }{ + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, + expectedResponse: &BulkResponse{}, + }, + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "looking for beginning of object key string", + }, + { + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "bulk request failed with code 400", + }, + } + for _, test := range tests { + t.Run(test.err, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: test.resp, + } + client, err := clientFactory(mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + + bulkResp, err := client.Bulk(context.Background(), strings.NewReader("data")) + if test.err != "" { + assert.Contains(t, err.Error(), test.err) + return + } + require.NoError(t, err) + assert.Equal(t, test.expectedResponse, bulkResp) + }) + } +} + +func testIndex(t *testing.T, clientFactory func(tripper http.RoundTripper) (ElasticsearchClient, error)) { + tests := []struct { + err error + }{ + {}, + { + err: fmt.Errorf("wrong request"), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.err), func(t *testing.T) { + mocktrans := &mockTransport{ + Response: &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(strings.NewReader("")), + }, + Err: test.err, + } + client, err := clientFactory(mocktrans) + require.NoError(t, err) + require.NotNil(t, client) + + err = client.Index(context.Background(), strings.NewReader(""), "", "") + if test.err != nil { + assert.EqualError(t, err, test.err.Error()) + return + } + require.NoError(t, err) + }) + } +} + +func testSearch(t *testing.T, clientFactory func(tripper http.RoundTripper) (ElasticsearchClient, error)) { + tests := []struct { + name string + searchBody SearchBody + transportError error + response *http.Response + expectedError string + expectedResponse *SearchResponse + }{ + { + name: "marshall query error", + searchBody: SearchBody{SearchAfter: []interface{}{make(chan bool)}}, + expectedError: "unsupported type", + }, + { + name: "transport error", + transportError: fmt.Errorf("transport err"), + expectedError: "transport err", + }, + { + name: "read response body error", + response: &http.Response{ + Body: ioutil.NopCloser(mockReader{err: fmt.Errorf("failed to read body")}), + }, + expectedError: "failed to read body", + }, + { + name: "unmarshall body error", + response: &http.Response{ + Body: ioutil.NopCloser(strings.NewReader("@")), + }, + expectedError: "invalid character '@'", + }, + { + name: "success", + response: &http.Response{ + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, + expectedResponse: &SearchResponse{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: test.response, + Err: test.transportError, + } + client, err := clientFactory(mocktrans) + require.NoError(t, err) + require.NotNil(t, client) + + response, err := client.Search(context.Background(), test.searchBody, 0) + if test.expectedError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.expectedError) + assert.Nil(t, response) + return + } + + require.NoError(t, err) + assert.Equal(t, test.expectedResponse, response) + }) + } +} + +func testMultiSearch(t *testing.T, clientFactory func(tripper http.RoundTripper) (ElasticsearchClient, error)) { + tests := []struct { + name string + searchBody SearchBody + transportError error + response *http.Response + expectedError string + expectedResponse *MultiSearchResponse + }{ + { + name: "marshall query error", + searchBody: SearchBody{SearchAfter: []interface{}{make(chan bool)}}, + expectedError: "unsupported type", + }, + { + name: "transport error", + transportError: fmt.Errorf("transport err"), + expectedError: "transport err", + }, + { + name: "read response body error", + response: &http.Response{ + Body: ioutil.NopCloser(mockReader{err: fmt.Errorf("failed to read body")}), + }, + expectedError: "failed to read body", + }, + { + name: "unmarshall body error", + response: &http.Response{ + Body: ioutil.NopCloser(strings.NewReader("@")), + }, + expectedError: "invalid character '@'", + }, + { + name: "success", + response: &http.Response{ + Body: ioutil.NopCloser(strings.NewReader("{\"responses\": [{}, {}]}")), + }, + expectedResponse: &MultiSearchResponse{Responses: []SearchResponse{{}, {}}}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: test.response, + Err: test.transportError, + } + client, err := clientFactory(mocktrans) + require.NoError(t, err) + require.NotNil(t, client) + + response, err := client.MultiSearch(context.Background(), []SearchBody{test.searchBody}) + if test.expectedError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.expectedError) + assert.Nil(t, response) + return + } + + require.NoError(t, err) + assert.Equal(t, test.expectedResponse, response) + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go index bab8f0962d1..8bc567a7938 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go @@ -16,14 +16,14 @@ package esclient import ( "bytes" + "context" "encoding/json" "fmt" "io" + "io/ioutil" "net/http" elasticsearch6 "github.com/elastic/go-elasticsearch/v6" - - "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -36,10 +36,10 @@ type elasticsearch6Client struct { var _ ElasticsearchClient = (*elasticsearch6Client)(nil) -func newElasticsearch6Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch6Client, error) { +func newElasticsearch6Client(params clientConfig, roundTripper http.RoundTripper) (*elasticsearch6Client, error) { client, err := elasticsearch6.NewClient(elasticsearch6.Config{ - DiscoverNodesOnStart: params.Sniffer, - Addresses: params.Servers, + DiscoverNodesOnStart: params.DiscoverNotesOnStartup, + Addresses: params.Addresses, Username: params.Username, Password: params.Password, Transport: roundTripper, @@ -52,8 +52,8 @@ func newElasticsearch6Client(params config.Configuration, roundTripper http.Roun }, nil } -func (es *elasticsearch6Client) PutTemplate(name string, body io.Reader) error { - resp, err := es.client.Indices.PutTemplate(name, body) +func (es *elasticsearch6Client) PutTemplate(ctx context.Context, name string, body io.Reader) error { + resp, err := es.client.Indices.PutTemplate(name, body, es.client.Indices.PutTemplate.WithContext(ctx)) if err != nil { return err } @@ -69,8 +69,8 @@ func (es *elasticsearch6Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data [ buffer.Write([]byte("\n")) } -func (es *elasticsearch6Client) Bulk(reader io.Reader) (*BulkResponse, error) { - response, err := es.client.Bulk(reader) +func (es *elasticsearch6Client) Bulk(ctx context.Context, reader io.Reader) (*BulkResponse, error) { + response, err := es.client.Bulk(reader, es.client.Bulk.WithContext(ctx)) if err != nil { return nil, err } @@ -85,3 +85,70 @@ func (es *elasticsearch6Client) Bulk(reader io.Reader) (*BulkResponse, error) { } return &blk, nil } + +func (es *elasticsearch6Client) Index(ctx context.Context, body io.Reader, index, typ string) error { + response, err := es.client.Index(index, body, es.client.Index.WithContext(ctx), es.client.Index.WithDocumentType(typ)) + if err != nil { + return err + } + return response.Body.Close() +} + +func (es *elasticsearch6Client) Search(ctx context.Context, query SearchBody, size int, indices ...string) (*SearchResponse, error) { + body, err := encodeSearchBody(query) + if err != nil { + return nil, err + } + + response, err := es.client.Search( + es.client.Search.WithContext(ctx), + es.client.Search.WithIndex(indices...), + es.client.Search.WithBody(body), + es.client.Search.WithIgnoreUnavailable(true), + es.client.Search.WithSize(size)) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, err + } + r := &SearchResponse{} + if err = json.Unmarshal(data, r); err != nil { + return nil, err + } + return r, nil +} + +func (es *elasticsearch6Client) MultiSearch(ctx context.Context, queries []SearchBody) (*MultiSearchResponse, error) { + body, err := encodeSearchBodies(queries) + if err != nil { + return nil, err + } + + var indices []string + for _, q := range queries { + indices = append(indices, q.Indices...) + } + + response, err := es.client.Msearch(body, + es.client.Msearch.WithContext(ctx), + es.client.Msearch.WithIndex(indices...), + ) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, err + } + r := &MultiSearchResponse{} + if err = json.Unmarshal(data, r); err != nil { + return nil, err + } + return r, nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go index 657843aab4b..25977251862 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go @@ -16,53 +16,29 @@ package esclient import ( "bytes" - "fmt" - "io/ioutil" "net/http" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/jaegertracing/jaeger/pkg/es/config" ) -type mockTransport struct { - Response *http.Response - RoundTripFn func(req *http.Request) (*http.Response, error) -} - -func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { - return t.RoundTripFn(req) -} - func TestES6NewClient_err(t *testing.T) { - client, err := newElasticsearch6Client(config.Configuration{ - Sniffer: true, - Servers: []string{"$%"}, + client, err := newElasticsearch6Client(clientConfig{ + Addresses: []string{"$%"}, }, &http.Transport{}) require.Error(t, err) assert.Nil(t, client) } func TestES6PutTemplateES6Client(t *testing.T) { - mocktrans := &mockTransport{ - Response: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(`{}`)), - }, - } - mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } - client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) - require.NoError(t, err) - assert.NotNil(t, client) - err = client.PutTemplate("foo", strings.NewReader("bar")) - require.NoError(t, err) + testPutTemplate(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch6Client(clientConfig{}, tripper) + }) } func TestES6AddDataToBulk(t *testing.T) { - client, err := newElasticsearch6Client(config.Configuration{}, &http.Transport{}) + client, err := newElasticsearch6Client(clientConfig{}, &http.Transport{}) require.NoError(t, err) assert.NotNil(t, client) @@ -72,48 +48,25 @@ func TestES6AddDataToBulk(t *testing.T) { } func TestES6Bulk(t *testing.T) { - tests := []struct { - resp *http.Response - err string - }{ - { - resp: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("{}")), - }, - }, - { - resp: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("{#}")), - }, - err: "looking for beginning of object key string", - }, - { - resp: &http.Response{ - StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(strings.NewReader("{#}")), - }, - err: "bulk request failed with code 400", - }, - } - for _, test := range tests { - t.Run(test.err, func(t *testing.T) { - mocktrans := &mockTransport{ - Response: test.resp, - } - mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + testBulk(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch6Client(clientConfig{}, tripper) + }) +} + +func TestES6Index(t *testing.T) { + testIndex(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch6Client(clientConfig{}, tripper) + }) +} + +func TestES6Search(t *testing.T) { + testSearch(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch6Client(clientConfig{}, tripper) + }) +} - client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) - require.NoError(t, err) - assert.NotNil(t, client) - _, err = client.Bulk(strings.NewReader("data")) - if test.err != "" { - fmt.Println() - assert.Contains(t, err.Error(), test.err) - } else { - require.NoError(t, err) - } - }) - } +func TestES6MultiSearch(t *testing.T) { + testMultiSearch(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch6Client(clientConfig{}, tripper) + }) } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go index ba6cd4f4f79..efceb684d3f 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go @@ -16,14 +16,14 @@ package esclient import ( "bytes" + "context" "encoding/json" "fmt" "io" + "io/ioutil" "net/http" elasticsearch7 "github.com/elastic/go-elasticsearch/v7" - - "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -36,11 +36,11 @@ type elasticsearch7Client struct { var _ ElasticsearchClient = (*elasticsearch7Client)(nil) -func newElasticsearch7Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch7Client, error) { +func newElasticsearch7Client(config clientConfig, roundTripper http.RoundTripper) (*elasticsearch7Client, error) { client, err := elasticsearch7.NewClient(elasticsearch7.Config{ - Addresses: params.Servers, - Username: params.Username, - Password: params.Password, + Addresses: config.Addresses, + Username: config.Username, + Password: config.Password, Transport: roundTripper, }) if err != nil { @@ -51,8 +51,8 @@ func newElasticsearch7Client(params config.Configuration, roundTripper http.Roun }, nil } -func (es *elasticsearch7Client) PutTemplate(name string, body io.Reader) error { - resp, err := es.client.Indices.PutTemplate(body, name) +func (es *elasticsearch7Client) PutTemplate(ctx context.Context, name string, body io.Reader) error { + resp, err := es.client.Indices.PutTemplate(body, name, es.client.Indices.PutTemplate.WithContext(ctx)) if err != nil { return err } @@ -68,8 +68,8 @@ func (es *elasticsearch7Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data [ buffer.Write([]byte("\n")) } -func (es *elasticsearch7Client) Bulk(reader io.Reader) (*BulkResponse, error) { - response, err := es.client.Bulk(reader) +func (es *elasticsearch7Client) Bulk(ctx context.Context, reader io.Reader) (*BulkResponse, error) { + response, err := es.client.Bulk(reader, es.client.Bulk.WithContext(ctx)) if err != nil { return nil, err } @@ -85,3 +85,127 @@ func (es *elasticsearch7Client) Bulk(reader io.Reader) (*BulkResponse, error) { } return &blk, nil } + +func (es *elasticsearch7Client) Index(ctx context.Context, body io.Reader, index string, _ string) error { + response, err := es.client.Index(index, body, es.client.Index.WithContext(ctx)) + if err != nil { + return err + } + return response.Body.Close() +} + +func (es *elasticsearch7Client) Search(ctx context.Context, query SearchBody, size int, indices ...string) (*SearchResponse, error) { + body, err := encodeSearchBody(query) + if err != nil { + return nil, err + } + + response, err := es.client.Search( + es.client.Search.WithContext(ctx), + es.client.Search.WithIndex(indices...), + es.client.Search.WithBody(body), + es.client.Search.WithIgnoreUnavailable(true), + es.client.Search.WithSize(size)) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, err + } + r := &es7searchResponse{} + if err = json.Unmarshal(data, r); err != nil { + return nil, err + } + convertedResp := convertSearchResponse(*r) + return &convertedResp, nil +} + +func (es *elasticsearch7Client) MultiSearch(ctx context.Context, queries []SearchBody) (*MultiSearchResponse, error) { + var indices []string + var es7Queries []es7SearchBody + for _, q := range queries { + es7Queries = append(es7Queries, es7SearchBody{ + SearchBody: q, + TrackTotalHits: true, + }) + indices = append(indices, q.Indices...) + } + body, err := es7QueryBodies(es7Queries) + if err != nil { + return nil, err + } + + response, err := es.client.Msearch(body, + es.client.Msearch.WithContext(ctx), + es.client.Msearch.WithIndex(indices...), + ) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, err + } + r := &es7multiSearchResponse{} + if err = json.Unmarshal(data, r); err != nil { + return nil, err + } + return convertMultiSearchResponse(r), nil +} + +func convertMultiSearchResponse(response *es7multiSearchResponse) *MultiSearchResponse { + mResponse := &MultiSearchResponse{} + for _, r := range response.Responses { + mResponse.Responses = append(mResponse.Responses, convertSearchResponse(r)) + } + return mResponse +} + +func convertSearchResponse(response es7searchResponse) SearchResponse { + return SearchResponse{ + Aggs: response.Aggs, + Hits: Hits{ + Total: response.Hits.Total.Value, + Hits: response.Hits.Hits, + }, + } +} + +// Override the SearchBody to add TrackTotalHits compatible with ES7 +type es7SearchBody struct { + SearchBody + TrackTotalHits bool `json:"track_total_hits"` +} + +type es7multiSearchResponse struct { + Responses []es7searchResponse `json:"responses"` +} + +type es7searchResponse struct { + Hits es7its `json:"hits"` + Aggs map[string]AggregationResponse `json:"aggregations,omitempty"` +} + +type es7its struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []Hit `json:"hits"` +} + +func es7QueryBodies(searchBodies []es7SearchBody) (io.Reader, error) { + buf := &bytes.Buffer{} + for _, sb := range searchBodies { + data, err := json.Marshal(sb) + if err != nil { + return nil, err + } + addDataToMSearchBuffer(buf, data) + } + return buf, nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go index 5f2c490d75e..c159c1dd6b1 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go @@ -16,44 +16,29 @@ package esclient import ( "bytes" - "fmt" - "io/ioutil" "net/http" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/jaegertracing/jaeger/pkg/es/config" ) func TestES7NewClient_err(t *testing.T) { - client, err := newElasticsearch6Client(config.Configuration{ - Sniffer: true, - Servers: []string{"$%"}, + client, err := newElasticsearch7Client(clientConfig{ + Addresses: []string{"$%"}, }, &http.Transport{}) require.Error(t, err) assert.Nil(t, client) } -func TestES7PutTemplateES6Client(t *testing.T) { - mocktrans := &mockTransport{ - Response: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(`{}`)), - }, - } - mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } - client, err := newElasticsearch7Client(config.Configuration{}, mocktrans) - require.NoError(t, err) - assert.NotNil(t, client) - err = client.PutTemplate("foo", strings.NewReader("bar")) - require.NoError(t, err) +func TestES7PutTemplate(t *testing.T) { + testPutTemplate(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch7Client(clientConfig{}, tripper) + }) } func TestES7AddDataToBulk(t *testing.T) { - client, err := newElasticsearch7Client(config.Configuration{}, &http.Transport{}) + client, err := newElasticsearch7Client(clientConfig{}, &http.Transport{}) require.NoError(t, err) assert.NotNil(t, client) @@ -63,48 +48,25 @@ func TestES7AddDataToBulk(t *testing.T) { } func TestES7Bulk(t *testing.T) { - tests := []struct { - resp *http.Response - err string - }{ - { - resp: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("{}")), - }, - }, - { - resp: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("{#}")), - }, - err: "looking for beginning of object key string", - }, - { - resp: &http.Response{ - StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(strings.NewReader("{#}")), - }, - err: "bulk request failed with code 400", - }, - } - for _, test := range tests { - t.Run(test.err, func(t *testing.T) { - mocktrans := &mockTransport{ - Response: test.resp, - } - mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + testBulk(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch7Client(clientConfig{}, tripper) + }) +} + +func TestES7Index(t *testing.T) { + testIndex(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch7Client(clientConfig{}, tripper) + }) +} + +func TestES7Search(t *testing.T) { + testSearch(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch7Client(clientConfig{}, tripper) + }) +} - client, err := newElasticsearch7Client(config.Configuration{}, mocktrans) - require.NoError(t, err) - assert.NotNil(t, client) - _, err = client.Bulk(strings.NewReader("data")) - if test.err != "" { - fmt.Println() - assert.Contains(t, err.Error(), test.err) - } else { - require.NoError(t, err) - } - }) - } +func TestES7MultiSearch(t *testing.T) { + testMultiSearch(t, func(tripper http.RoundTripper) (ElasticsearchClient, error) { + return newElasticsearch7Client(clientConfig{}, tripper) + }) } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder.go new file mode 100644 index 00000000000..12be366c0b4 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder.go @@ -0,0 +1,53 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "encoding/json" + "io" +) + +// Elasticsearch header for multi search API +// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html +const multiSearchHeaderFormat = `{"ignore_unavailable": "true"}` + "\n" + +func encodeSearchBody(searchBody SearchBody) (io.Reader, error) { + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(searchBody); err != nil { + return nil, err + } + return buf, nil +} + +func encodeSearchBodies(searchBodies []SearchBody) (io.Reader, error) { + buf := &bytes.Buffer{} + for _, sb := range searchBodies { + data, err := json.Marshal(sb) + if err != nil { + return nil, err + } + addDataToMSearchBuffer(buf, data) + } + return buf, nil +} + +func addDataToMSearchBuffer(buffer *bytes.Buffer, data []byte) { + meta := []byte(multiSearchHeaderFormat) + buffer.Grow(len(data) + len(meta) + len("\n")) + buffer.Write(meta) + buffer.Write(data) + buffer.Write([]byte("\n")) +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder_test.go new file mode 100644 index 00000000000..9e80cd78667 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/search_body_encoder_test.go @@ -0,0 +1,39 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEncodeSearchBody(t *testing.T) { + bodyReader, err := encodeSearchBody(SearchBody{}) + require.NoError(t, err) + bodyBytes, err := ioutil.ReadAll(bodyReader) + require.NoError(t, err) + assert.Equal(t, "{\"size\":0,\"terminate_after\":0}\n", string(bodyBytes)) +} + +func TestEncodeSearchBodies(t *testing.T) { + bodyReader, err := encodeSearchBodies([]SearchBody{{}}) + require.NoError(t, err) + bodyBytes, err := ioutil.ReadAll(bodyReader) + require.NoError(t, err) + assert.Equal(t, "{\"ignore_unavailable\": \"true\"}\n{\"size\":0,\"terminate_after\":0}\n", string(bodyBytes)) +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index 393b3b5b809..6ccef91cf69 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -15,6 +15,8 @@ package elasticsearchexporter import ( + "context" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -22,7 +24,7 @@ import ( ) // new creates Elasticsearch exporter/storage. -func new(config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) { +func new(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) { esCfg := config.GetPrimary() w, err := newEsSpanWriter(*esCfg, params.Logger) if err != nil { @@ -30,7 +32,7 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace } if config.Primary.IsCreateIndexTemplates() { spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), esCfg.GetVersion()) - if err = w.CreateTemplates(spanMapping, serviceMapping); err != nil { + if err = w.CreateTemplates(ctx, spanMapping, serviceMapping); err != nil { return nil, err } } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go index bc8b03f3864..5105311eebd 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go @@ -66,7 +66,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { // CreateTraceExporter creates Jaeger Elasticsearch trace exporter. // This function implements OTEL component.ExporterFactory interface. func (Factory) CreateTraceExporter( - _ context.Context, + ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter, ) (component.TraceExporter, error) { @@ -74,15 +74,15 @@ func (Factory) CreateTraceExporter( if !ok { return nil, fmt.Errorf("could not cast configuration to %s", TypeStr) } - return new(esCfg, params) + return new(ctx, esCfg, params) } // CreateMetricsExporter is not implemented. // This function implements OTEL component.ExporterFactory interface. func (Factory) CreateMetricsExporter( - _ context.Context, - _ component.ExporterCreateParams, - _ configmodels.Exporter, + context.Context, + component.ExporterCreateParams, + configmodels.Exporter, ) (component.MetricsExporter, error) { return nil, configerror.ErrDataTypeIsNotSupported } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 7f68f10d928..d794aa6fe8f 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -27,16 +27,15 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/require" - "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/dependencystore" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/config" - eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" - "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" - "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/plugin/storage/integration" ) @@ -54,9 +53,8 @@ const ( type IntegrationTest struct { integration.StorageIntegration - client *elastic.Client - bulkProcessor *elastic.BulkProcessor - logger *zap.Logger + client *elastic.Client + logger *zap.Logger } type storageWrapper struct { @@ -69,7 +67,7 @@ func (s storageWrapper) WriteSpan(span *model.Span) error { //_, err := s.writer.WriteTraces(context.Background(), traces) converter := dbmodel.FromDomain{} dbSpan := converter.FromDomainEmbedProcess(span) - _, err := s.writer.writeSpans([]*dbmodel.Span{dbSpan}) + _, err := s.writer.writeSpans(context.Background(), []*dbmodel.Span{dbSpan}) return err } @@ -115,57 +113,55 @@ func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error { } func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { - bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) - s.bulkProcessor = bp esVersion, err := s.getVersion() if err != nil { return err } - client := eswrapper.WrapESClient(s.client, bp, esVersion) - spanMapping, serviceMapping := es.GetSpanServiceMappings(5, 1, client.GetVersion()) + spanMapping, serviceMapping := es.GetSpanServiceMappings(5, 1, esVersion) - w, err := newEsSpanWriter(config.Configuration{ + cfg := config.Configuration{ Servers: []string{queryURL}, IndexPrefix: indexPrefix, Tags: config.TagsAsFields{ AllAsFields: allTagsAsFields, }, - }, s.logger) + } + w, err := newEsSpanWriter(cfg, s.logger) if err != nil { return err } - err = w.CreateTemplates(spanMapping, serviceMapping) + err = w.CreateTemplates(context.Background(), spanMapping, serviceMapping) if err != nil { return err } s.SpanWriter = storageWrapper{ writer: w, } - s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: client, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, + + elasticsearchClient, err := esclient.NewElasticsearchClient(cfg, s.logger) + if err != nil { + return err + } + reader := spanreader.NewEsSpanReader(elasticsearchClient, s.logger, spanreader.Config{ IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, + MaxSpanAge: maxSpanAge, + MaxNumSpans: 10_000, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) - depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) - err = dependencyStore.CreateTemplates(depMapping) - if err != nil { - return err + s.SpanReader = reader + + depMapping := es.GetDependenciesMappings(5, 1, esVersion) + depStore := dependencystore.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix) + if err := depStore.CreateTemplates(depMapping); err != nil { + return nil } - s.DependencyReader = dependencyStore - s.DependencyWriter = dependencyStore + s.DependencyReader = depStore + s.DependencyWriter = depStore return nil } func (s *IntegrationTest) esRefresh() error { - err := s.bulkProcessor.Flush() - if err != nil { - return err - } - _, err = s.client.Refresh().Do(context.Background()) + _, err := s.client.Refresh().Do(context.Background()) return err } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name.go new file mode 100644 index 00000000000..c54a7f106d3 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name.go @@ -0,0 +1,60 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import "time" + +const indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 + +type indexNameProvider struct { + index string + noDateIndex bool +} + +func newIndexNameProvider(index, prefix string, useAliases bool, archive bool) indexNameProvider { + if prefix != "" { + index = prefix + "-" + index + } + index += "-" + if archive { + index += "archive" + } + if useAliases { + if index[len(index)-1] != '-' { + index += "-" + } + index += "read" + } + return indexNameProvider{ + index: index, + noDateIndex: archive || useAliases, + } +} + +func (n indexNameProvider) get(start, end time.Time) []string { + if n.noDateIndex { + return []string{n.index} + } + var indices []string + firstIndex := n.index + start.UTC().Format(indexDateFormat) + currentIndex := n.index + end.UTC().Format(indexDateFormat) + for currentIndex != firstIndex { + indices = append(indices, currentIndex) + end = end.Add(-24 * time.Hour) + currentIndex = n.index + end.UTC().Format(indexDateFormat) + } + indices = append(indices, firstIndex) + return indices +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name_test.go new file mode 100644 index 00000000000..e581c351e04 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/index_name_test.go @@ -0,0 +1,72 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestIndexName(t *testing.T) { + tests := []struct { + name string + indices []string + nameProvider indexNameProvider + start time.Time + end time.Time + }{ + { + name: "index prefix", + nameProvider: newIndexNameProvider("myindex", "production", false, false), + indices: []string{"production-myindex-0001-01-01"}, + }, + { + name: "multiple dates", + nameProvider: newIndexNameProvider("myindex", "", false, false), + indices: []string{"myindex-2020-08-30", "myindex-2020-08-29", "myindex-2020-08-28"}, + start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), + }, + { + name: "use aliases", + nameProvider: newIndexNameProvider("myindex", "", true, false), + indices: []string{"myindex-read"}, + start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), + }, + { + name: "use archive", + nameProvider: newIndexNameProvider("myindex", "", false, true), + indices: []string{"myindex-archive"}, + start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), + }, + { + name: "use archive alias", + nameProvider: newIndexNameProvider("myindex", "", true, true), + indices: []string{"myindex-archive-read"}, + start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), + end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indices := test.nameProvider.get(test.start, test.end) + assert.Equal(t, test.indices, indices) + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query.go new file mode 100644 index 00000000000..b41b2026266 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query.go @@ -0,0 +1,207 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +const ( + getServicesAggregation = `{ + "serviceName": { + "terms": { + "field": "serviceName", + "size": %d + } + } + } +` + getOperationsAggregation = `{ + "operationName": { + "terms": { + "field": "operationName", + "size": %d + } + } + } +` + findTraceIDsAggregation = `{ + "traceID": { + "aggs": { + "startTime": { + "max": { + "field": "startTime" + } + } + }, + "terms": { + "field": "traceID", + "size": %d, + "order": { + "startTime": "desc" + } + } + } + } +` +) + +var ( + defaultMaxDuration = model.DurationAsMicroseconds(time.Hour * 24) + objectTagFieldList = []string{objectTagsField, objectProcessTagsField} + nestedTagFieldList = []string{nestedTagsField, nestedProcessTagsField, nestedLogFieldsField} +) + +func buildDurationQuery(durationMin time.Duration, durationMax time.Duration, query esclient.Query) { + minDurationMicros := model.DurationAsMicroseconds(durationMin) + maxDurationMicros := defaultMaxDuration + if durationMax != 0 { + maxDurationMicros = model.DurationAsMicroseconds(durationMax) + } + query.BoolQuery[esclient.Must] = append(query.BoolQuery[esclient.Must], + esclient.BoolQuery{ + RangeQueries: map[string]esclient.RangeQuery{durationField: {GTE: minDurationMicros, LTE: maxDurationMicros}}}) +} + +func addStartTimeQuery(startTimeMin time.Time, startTimeMax time.Time, query esclient.Query) { + minStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMin) + maxStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMax) + query.BoolQuery[esclient.Must] = append(query.BoolQuery[esclient.Must], esclient.BoolQuery{RangeQueries: map[string]esclient.RangeQuery{startTimeField: {GTE: minStartTimeMicros, LTE: maxStartTimeMicros}}}) +} + +func addServiceNameQuery(serviceName string, query esclient.Query) { + query.BoolQuery[esclient.Must] = append(query.BoolQuery[esclient.Must], esclient.BoolQuery{Term: map[string]string{processServiceNameField: serviceName}}) +} + +func addOperationNameQuery(operationName string, query esclient.Query) { + query.BoolQuery[esclient.Must] = append(query.BoolQuery[esclient.Must], esclient.BoolQuery{Term: map[string]string{operationNameField: operationName}}) +} + +func addTagQuery(converter dbmodel.ToDomain, tags map[string]string, query esclient.Query) { + if tags == nil || len(tags) == 0 { + return + } + + tagQueries := esclient.BoolQuery{BoolQuery: map[esclient.BoolQueryType][]esclient.BoolQuery{}} + for i := range objectTagFieldList { + addObjectQuery(converter, objectTagFieldList[i], tags, tagQueries) + } + for i := range nestedTagFieldList { + addNestedQuery(nestedTagFieldList[i], tags, tagQueries) + } + query.BoolQuery[esclient.Must] = append(query.BoolQuery[esclient.Must], tagQueries) +} + +func addObjectQuery(converter dbmodel.ToDomain, field string, tags map[string]string, query esclient.BoolQuery) { + for k, v := range tags { + kd := converter.ReplaceDot(k) + keyField := fmt.Sprintf("%s.%s", field, kd) + query.BoolQuery[esclient.Should] = append(query.BoolQuery[esclient.Should], + esclient.BoolQuery{BoolQuery: map[esclient.BoolQueryType][]esclient.BoolQuery{ + esclient.Must: { + {Regexp: map[string]esclient.TermQuery{keyField: {Value: v}}}, + }, + }}, + ) + } +} + +func addNestedQuery(nestedField string, tags map[string]string, query esclient.BoolQuery) { + keyField := fmt.Sprintf("%s.%s", nestedField, tagKeyField) + valueField := fmt.Sprintf("%s.%s", nestedField, tagValueField) + for k, v := range tags { + nestedQuery := &esclient.NestedQuery{ + Path: nestedField, + Query: esclient.Query{ + BoolQuery: map[esclient.BoolQueryType][]esclient.BoolQuery{ + esclient.Must: { + {MatchQueries: map[string]esclient.MatchQuery{keyField: {Query: k}}}, + {Regexp: map[string]esclient.TermQuery{valueField: {Value: v}}}, + }, + }, + }, + } + query.BoolQuery[esclient.Should] = append(query.BoolQuery[esclient.Should], + esclient.BoolQuery{ + Nested: nestedQuery, + }) + } +} + +func findTraceIDsQuery(converter dbmodel.ToDomain, query *spanstore.TraceQueryParameters) esclient.Query { + q := esclient.Query{} + q.BoolQuery = map[esclient.BoolQueryType][]esclient.BoolQuery{} + // add startTime query + addStartTimeQuery(query.StartTimeMin, query.StartTimeMax, q) + // add duration query + if query.DurationMax != 0 || query.DurationMin != 0 { + buildDurationQuery(query.DurationMin, query.DurationMax, q) + } + // add process.serviceName query + if query.ServiceName != "" { + addServiceNameQuery(query.ServiceName, q) + } + // add operationName query + if query.OperationName != "" { + addOperationNameQuery(query.OperationName, q) + } + // add tag query + addTagQuery(converter, query.Tags, q) + return q +} + +func traceIDQuery(traceID model.TraceID) *esclient.Query { + traceIDStr := traceID.String() + return &esclient.Query{Term: &esclient.Terms{ + traceIDField: esclient.TermQuery{ + Value: traceIDStr, + }, + }} +} + +func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQueryParameters) esclient.SearchBody { + q := findTraceIDsQuery(converter, query) + aggs := fmt.Sprintf(findTraceIDsAggregation, query.NumTraces) + return esclient.SearchBody{ + Aggregations: json.RawMessage(aggs), + Query: &q, + } +} + +func getServicesSearchBody() esclient.SearchBody { + aggs := fmt.Sprintf(getServicesAggregation, defaultDocCount) + return esclient.SearchBody{ + Aggregations: json.RawMessage(aggs), + } +} + +func getOperationsSearchBody(serviceName string) esclient.SearchBody { + aggs := fmt.Sprintf(getOperationsAggregation, defaultDocCount) + return esclient.SearchBody{ + Aggregations: json.RawMessage(aggs), + Query: &esclient.Query{ + Term: &esclient.Terms{ + serviceNameField: esclient.TermQuery{Value: serviceName}, + }, + }, + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query_test.go new file mode 100644 index 00000000000..b1d7bfa840f --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/query_test.go @@ -0,0 +1,268 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +const ( + servicesSearchBodyFixture = `{ + "aggs": { + "serviceName": { + "terms": { + "field": "serviceName", + "size": 10000 + } + } + }, + "size": 0, + "terminate_after": 0 +}` + operationsSearchBodyFixture = `{ + "aggs": { + "operationName": { + "terms": { + "field": "operationName", + "size": 10000 + } + } + }, + "query": { + "term": { + "serviceName": { + "value": "foo" + } + } + }, + "size": 0, + "terminate_after": 0 +}` + findTraceIDsSearchBodyFixture = `{ + "aggs": { + "traceID": { + "aggs": { + "startTime": { + "max": { + "field": "startTime" + } + } + }, + "terms": { + "field": "traceID", + "size": 0, + "order": { + "startTime": "desc" + } + } + } + }, + "query": { + "bool": { + "must": [ + { + "range": { + "startTime": { + "gte": 18439948709130680271, + "lte": 18439948709130680271 + } + } + }, + { + "range": { + "duration": { + "gte": 1000000, + "lte": 60000000 + } + } + }, + { + "term": { + "process.serviceName": "foo" + } + }, + { + "term": { + "operationName": "bar" + } + }, + { + "bool": { + "should": [ + { + "bool": { + "must": [ + { + "regexp": { + "tag.key": { + "value": "val" + } + } + } + ] + } + }, + { + "bool": { + "must": [ + { + "regexp": { + "process.tag.key": { + "value": "val" + } + } + } + ] + } + }, + { + "nested": { + "path": "tags", + "query": { + "bool": { + "must": [ + { + "match": { + "tags.key": { + "query": "key" + } + } + }, + { + "regexp": { + "tags.value": { + "value": "val" + } + } + } + ] + } + } + } + }, + { + "nested": { + "path": "process.tags", + "query": { + "bool": { + "must": [ + { + "match": { + "process.tags.key": { + "query": "key" + } + } + }, + { + "regexp": { + "process.tags.value": { + "value": "val" + } + } + } + ] + } + } + } + }, + { + "nested": { + "path": "logs.fields", + "query": { + "bool": { + "must": [ + { + "match": { + "logs.fields.key": { + "query": "key" + } + } + }, + { + "regexp": { + "logs.fields.value": { + "value": "val" + } + } + } + ] + } + } + } + } + ] + } + } + ] + } + }, + "size": 0, + "terminate_after": 0 +}` + + findTraceIDQuery = `{ + "term": { + "traceID": { + "value": "000000000000aaaa" + } + } +}` +) + +func TestGetServicesSearchBody(t *testing.T) { + sb := getServicesSearchBody() + jsonQuery, err := json.MarshalIndent(sb, "", " ") + require.NoError(t, err) + assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery)) +} + +func TestGetOperationsSearchBody(t *testing.T) { + sb := getOperationsSearchBody("foo") + jsonQuery, err := json.MarshalIndent(sb, "", " ") + require.NoError(t, err) + assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery)) +} + +func TestFindTraceIDsSearchBody(t *testing.T) { + q := &spanstore.TraceQueryParameters{ + ServiceName: "foo", + OperationName: "bar", + DurationMin: time.Second, + DurationMax: time.Minute, + Tags: map[string]string{"key": "val"}, + } + sb := findTraceIDsSearchBody(dbmodel.NewToDomain("@"), q) + jsonQuery, err := json.MarshalIndent(sb, "", " ") + require.NoError(t, err) + assert.Equal(t, findTraceIDsSearchBodyFixture, string(jsonQuery)) +} + +func TestTraceIDQuery(t *testing.T) { + traceID, err := model.TraceIDFromString("AAAA") + require.NoError(t, err) + query := traceIDQuery(traceID) + jsonQuery, err := json.MarshalIndent(query, "", " ") + require.NoError(t, err) + assert.Equal(t, findTraceIDQuery, string(jsonQuery)) +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader.go new file mode 100644 index 00000000000..05e30313fa0 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader.go @@ -0,0 +1,291 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +const ( + // by default UI fetches 20 results + defaultNumTraces = 20 + // default number of documents to fetch in a query + // see search.max_buckets and index.max_result_window + defaultDocCount = 10_000 + + spanIndexBaseName = "jaeger-span" + serviceIndexBaseName = "jaeger-service" + + operationNameField = "operationName" + serviceNameField = "serviceName" + traceIDField = "traceID" + startTimeField = "startTime" + durationField = "duration" + objectTagsField = "tag" + objectProcessTagsField = "process.tag" + nestedProcessTagsField = "process.tags" + nestedTagsField = "tags" + nestedLogFieldsField = "logs.fields" + processServiceNameField = "process.serviceName" + tagKeyField = "key" + tagValueField = "value" +) + +// Reader defines Elasticsearch reader. +type Reader struct { + logger *zap.Logger + client esclient.ElasticsearchClient + converter dbmodel.ToDomain + serviceIndexName indexNameProvider + spanIndexName indexNameProvider + maxSpanAge time.Duration + // maximum number of spans to fetch per query in multi search + maxNumberOfSpans int + archive bool +} + +var _ spanstore.Reader = (*Reader)(nil) + +// Config defines configuration for span reader. +type Config struct { + Archive bool + UseReadWriteAliases bool + IndexPrefix string + MaxSpanAge time.Duration + MaxNumSpans int + TagDotReplacement string +} + +// NewEsSpanReader creates Elasticseach span reader. +func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, config Config) *Reader { + return &Reader{ + client: client, + logger: logger, + archive: config.Archive, + maxSpanAge: config.MaxSpanAge, + maxNumberOfSpans: config.MaxNumSpans, + converter: dbmodel.NewToDomain(config.TagDotReplacement), + spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), + serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive), + } +} + +// GetTrace implements spanstore.Reader +func (r *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + currentTime := time.Now() + traces, err := r.traceIDsMultiSearch(ctx, []model.TraceID{traceID}, currentTime.Add(-r.maxSpanAge), currentTime) + if err != nil { + return nil, err + } + if len(traces) == 0 { + return nil, spanstore.ErrTraceNotFound + } + return traces[0], nil +} + +// FindTraces implements spanstore.Reader +func (r *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { + uniqueTraceIDs, err := r.FindTraceIDs(ctx, query) + if err != nil { + return nil, err + } + return r.traceIDsMultiSearch(ctx, uniqueTraceIDs, query.StartTimeMin, query.StartTimeMax) +} + +// FindTraceIDs implements spanstore.Reader +func (r *Reader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + if err := validateQuery(query); err != nil { + return nil, err + } + if query.NumTraces == 0 { + query.NumTraces = defaultNumTraces + } + esTraceIDs, err := r.findTraceIDs(ctx, query) + if err != nil { + return nil, err + } + return convertTraceIDsStringsToModel(esTraceIDs) +} + +func convertTraceIDsStringsToModel(traceIDs []string) ([]model.TraceID, error) { + traceIDsModels := make([]model.TraceID, 0, len(traceIDs)) + for _, ID := range traceIDs { + traceID, err := model.TraceIDFromString(ID) + if err != nil { + return nil, fmt.Errorf("making traceID from string '%s' failed: %w", ID, err) + } + traceIDsModels = append(traceIDsModels, traceID) + } + return traceIDsModels, nil +} + +func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]string, error) { + searchBody := findTraceIDsSearchBody(r.converter, query) + indices := r.spanIndexName.get(query.StartTimeMin, query.StartTimeMax) + response, err := r.client.Search(ctx, searchBody, 0, indices...) + if err != nil { + return nil, err + } + + var traceIDs []string + for _, k := range response.Aggs[traceIDField].Buckets { + traceIDs = append(traceIDs, k.Key) + } + return traceIDs, nil +} + +// GetServices implements spanstore.Reader +func (r *Reader) GetServices(ctx context.Context) ([]string, error) { + searchBody := getServicesSearchBody() + currentTime := time.Now() + indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) + response, err := r.client.Search(ctx, searchBody, 0, indices...) + if err != nil { + return nil, err + } + + var serviceNames []string + for _, k := range response.Aggs[serviceNameField].Buckets { + serviceNames = append(serviceNames, k.Key) + } + return serviceNames, nil +} + +// GetOperations implements spanstore.Reader +func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { + searchBody := getOperationsSearchBody(query.ServiceName) + currentTime := time.Now() + indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime) + searchResponse, err := r.client.Search(ctx, searchBody, 0, indices...) + if err != nil { + return nil, err + } + + var operations []spanstore.Operation + for _, k := range searchResponse.Aggs[operationNameField].Buckets { + operations = append(operations, spanstore.Operation{ + Name: k.Key, + }) + } + return operations, nil +} + +// traceIDsMultiSearch invokes ES multi search API to search for traces with given traceIDs +func (r *Reader) traceIDsMultiSearch(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { + if len(traceIDs) == 0 { + return []*model.Trace{}, nil + } + + indices := r.spanIndexName.get(startTime.Add(-time.Hour), endTime.Add(time.Hour)) + nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) + tracesMap := make(map[model.TraceID]*model.Trace) + searchAfterTime := make(map[model.TraceID]uint64) + totalFetchedSpans := make(map[model.TraceID]int) + + // The loop creates request for each traceID + // If the response has more total hits than fetched spans + // then second query for a trace is initiated with later timestamp. + for { + if len(traceIDs) == 0 { + break + } + searchRequests, nt := r.multiSearchRequests(indices, traceIDs, searchAfterTime, nextTime) + nextTime = nt + traceIDs = nil // set traceIDs to empty + response, err := r.client.MultiSearch(ctx, searchRequests) + if err != nil { + return nil, err + } + + for _, resp := range response.Responses { + if resp.Hits.Total == 0 { + continue + } + spans, err := r.getSpans(resp) + if err != nil { + return nil, err + } + + lastSpan := spans[len(spans)-1] + if trace, ok := tracesMap[lastSpan.TraceID]; ok { + trace.Spans = append(trace.Spans, spans...) + } else { + tracesMap[lastSpan.TraceID] = &model.Trace{Spans: spans} + } + + totalFetchedSpans[lastSpan.TraceID] = totalFetchedSpans[lastSpan.TraceID] + len(resp.Hits.Hits) + if totalFetchedSpans[lastSpan.TraceID] < resp.Hits.Total { + traceIDs = append(traceIDs, lastSpan.TraceID) + searchAfterTime[lastSpan.TraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) + } + } + } + + var traces []*model.Trace + for _, trace := range tracesMap { + traces = append(traces, trace) + } + return traces, nil +} + +func (r *Reader) getSpans(response esclient.SearchResponse) ([]*model.Span, error) { + spans := make([]*model.Span, len(response.Hits.Hits)) + for i, hit := range response.Hits.Hits { + var dbSpan *dbmodel.Span + d := json.NewDecoder(bytes.NewReader(*hit.Source)) + d.UseNumber() + if err := d.Decode(&dbSpan); err != nil { + return nil, err + } + span, err := r.converter.SpanToDomain(dbSpan) + if err != nil { + return nil, err + } + spans[i] = span + } + return spans, nil +} + +func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID, searchAfterTime map[model.TraceID]uint64, nextTime uint64) ([]esclient.SearchBody, uint64) { + queries := make([]esclient.SearchBody, len(traceIDs)) + for i, traceID := range traceIDs { + if v, ok := searchAfterTime[traceID]; ok { + nextTime = v + } + s := esclient.SearchBody{ + Indices: indices, + Query: traceIDQuery(traceID), + Size: defaultDocCount, + TerminateAfter: r.maxNumberOfSpans, + SearchAfter: []interface{}{nextTime}, + } + if !r.archive { + s.Sort = []map[string]esclient.Order{{startTimeField: esclient.AscOrder}} + } + queries[i] = s + } + return queries, nextTime +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader_test.go new file mode 100644 index 00000000000..b18706652ff --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/span_reader_test.go @@ -0,0 +1,184 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "bytes" + "context" + "encoding/json" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func TestGetServices(t *testing.T) { + client := &mockClient{ + searchResponse: &esclient.SearchResponse{ + Aggs: map[string]esclient.AggregationResponse{ + serviceNameField: { + Buckets: []struct { + Key string `json:"key"` + }{{Key: "foo"}, {Key: "bar"}}, + }, + }, + }, + } + reader := NewEsSpanReader(client, zap.NewNop(), Config{}) + services, err := reader.GetServices(context.Background()) + require.NoError(t, err) + assert.Equal(t, []string{"foo", "bar"}, services) +} + +func TestGetOperations(t *testing.T) { + client := &mockClient{ + searchResponse: &esclient.SearchResponse{ + Aggs: map[string]esclient.AggregationResponse{ + operationNameField: { + Buckets: []struct { + Key string `json:"key"` + }{{Key: "foo"}, {Key: "bar"}}, + }, + }, + }, + } + reader := NewEsSpanReader(client, zap.NewNop(), Config{}) + operations, err := reader.GetOperations(context.Background(), spanstore.OperationQueryParameters{ServiceName: "baz"}) + require.NoError(t, err) + assert.Equal(t, []spanstore.Operation{{Name: "foo"}, {Name: "bar"}}, operations) +} + +func TestGetTrace(t *testing.T) { + s := dbmodel.Span{ + TraceID: dbmodel.TraceID("aaaa"), + SpanID: dbmodel.SpanID("aaaa"), + } + jsonSpan, err := json.Marshal(s) + require.NoError(t, err) + jsonMessage := json.RawMessage(jsonSpan) + + client := &mockClient{ + multiSearchResponse: &esclient.MultiSearchResponse{ + Responses: []esclient.SearchResponse{ + { + Hits: esclient.Hits{ + Total: 1, + Hits: []esclient.Hit{ + { + Source: &jsonMessage, + }, + }, + }, + }, + }, + }, + } + reader := NewEsSpanReader(client, zap.NewNop(), Config{TagDotReplacement: "@"}) + + trace, err := reader.GetTrace(context.Background(), model.TraceID{}) + require.NoError(t, err) + domain := dbmodel.NewToDomain("@") + modelSpan, err := domain.SpanToDomain(&s) + assert.Equal(t, &model.Trace{Spans: []*model.Span{modelSpan}}, trace) +} + +func TestFindTraces(t *testing.T) { + dbSpan := dbmodel.Span{ + TraceID: dbmodel.TraceID("aaaa"), + SpanID: dbmodel.SpanID("aaaa"), + } + jsonSpan, err := json.Marshal(dbSpan) + require.NoError(t, err) + jsonMessage := json.RawMessage(jsonSpan) + + client := &mockClient{ + searchResponse: &esclient.SearchResponse{ + Aggs: map[string]esclient.AggregationResponse{ + traceIDField: { + Buckets: []struct { + Key string `json:"key"` + }{{Key: "aaaa"}}, + }, + }, + }, + multiSearchResponse: &esclient.MultiSearchResponse{ + Responses: []esclient.SearchResponse{ + { + Hits: esclient.Hits{ + Total: 1, + Hits: []esclient.Hit{ + { + Source: &jsonMessage, + }, + }, + }, + }, + }, + }, + } + reader := NewEsSpanReader(client, zap.NewNop(), Config{TagDotReplacement: "@"}) + traces, err := reader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ + StartTimeMin: time.Now().Add(-time.Hour), + StartTimeMax: time.Now(), + }) + require.NoError(t, err) + + domain := dbmodel.NewToDomain("@") + modelSpan, err := domain.SpanToDomain(&dbSpan) + assert.Equal(t, []*model.Trace{{Spans: []*model.Span{modelSpan}}}, traces) +} + +type mockClient struct { + receivedBody io.Reader + searchResponse *esclient.SearchResponse + multiSearchResponse *esclient.MultiSearchResponse + searchErr error +} + +var _ esclient.ElasticsearchClient = (*mockClient)(nil) + +func (m *mockClient) PutTemplate(ctx context.Context, name string, template io.Reader) error { + m.receivedBody = template + return nil +} + +func (m mockClient) Bulk(ctx context.Context, bulkBody io.Reader) (*esclient.BulkResponse, error) { + panic("implement me") +} + +func (m mockClient) AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) { + panic("implement me") +} + +func (m *mockClient) Index(ctx context.Context, body io.Reader, index, typ string) error { + m.receivedBody = body + return nil +} + +func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size int, indices ...string) (*esclient.SearchResponse, error) { + return m.searchResponse, m.searchErr +} + +func (m *mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) { + return m.multiSearchResponse, nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query.go new file mode 100644 index 00000000000..2d9503ab578 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query.go @@ -0,0 +1,48 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "errors" + + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var ( + errNilQuery = errors.New("query is nil") + errServiceNameNotSet = errors.New("service name must be set") + errStartAndEndTimeNotSet = errors.New("start and end time must be set") + errStartTimeMinGreaterThanMax = errors.New("start time minimum is above maximum") + errDurationMinGreaterThanMax = errors.New("duration minimum is above maximum") +) + +func validateQuery(p *spanstore.TraceQueryParameters) error { + if p == nil { + return errNilQuery + } + if p.ServiceName == "" && len(p.Tags) > 0 { + return errServiceNameNotSet + } + if p.StartTimeMin.IsZero() || p.StartTimeMax.IsZero() { + return errStartAndEndTimeNotSet + } + if p.StartTimeMax.Before(p.StartTimeMin) { + return errStartTimeMinGreaterThanMax + } + if p.DurationMin != 0 && p.DurationMax != 0 && p.DurationMin > p.DurationMax { + return errDurationMinGreaterThanMax + } + return nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query_test.go new file mode 100644 index 00000000000..dfb53c920c0 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanreader/validate_query_test.go @@ -0,0 +1,71 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanreader + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func TestValidateQuery(t *testing.T) { + tests := []struct { + query *spanstore.TraceQueryParameters + err error + }{ + { + err: errNilQuery, + }, + { + query: &spanstore.TraceQueryParameters{Tags: map[string]string{"foo": "bar"}}, + err: errServiceNameNotSet, + }, + { + query: &spanstore.TraceQueryParameters{}, + err: errStartAndEndTimeNotSet, + }, + { + query: &spanstore.TraceQueryParameters{StartTimeMax: time.Now().Add(-time.Hour), StartTimeMin: time.Now()}, + err: errStartTimeMinGreaterThanMax, + }, + { + query: &spanstore.TraceQueryParameters{ + StartTimeMax: time.Now(), StartTimeMin: time.Now().Add(-time.Hour), + DurationMin: time.Hour, DurationMax: time.Minute, + }, + err: errDurationMinGreaterThanMax, + }, + { + query: &spanstore.TraceQueryParameters{ + StartTimeMax: time.Now(), StartTimeMin: time.Now().Add(-time.Hour), + DurationMin: time.Minute, DurationMax: time.Hour, + }, + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.err), func(t *testing.T) { + err := validateQuery(test.query) + if test.err != nil { + assert.EqualError(t, err, test.err.Error()) + return + } + assert.NoError(t, err) + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go index 938246ed888..49f17d62b88 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -108,12 +108,12 @@ func (n indexNameProvider) get(date time.Time) string { } // CreateTemplates creates index templates. -func (w *esSpanWriter) CreateTemplates(spanTemplate, serviceTemplate string) error { - err := w.client.PutTemplate(spanIndexBaseName, strings.NewReader(spanTemplate)) +func (w *esSpanWriter) CreateTemplates(ctx context.Context, spanTemplate, serviceTemplate string) error { + err := w.client.PutTemplate(context.Background(), spanIndexBaseName, strings.NewReader(spanTemplate)) if err != nil { return err } - err = w.client.PutTemplate(serviceIndexBaseName, strings.NewReader(serviceTemplate)) + err = w.client.PutTemplate(ctx, serviceIndexBaseName, strings.NewReader(serviceTemplate)) if err != nil { return err } @@ -121,15 +121,15 @@ func (w *esSpanWriter) CreateTemplates(spanTemplate, serviceTemplate string) err } // WriteTraces writes traces to the storage -func (w *esSpanWriter) WriteTraces(_ context.Context, traces pdata.Traces) (int, error) { +func (w *esSpanWriter) WriteTraces(ctx context.Context, traces pdata.Traces) (int, error) { spans, err := w.translator.ConvertSpans(traces) if err != nil { return traces.SpanCount(), consumererror.Permanent(err) } - return w.writeSpans(spans) + return w.writeSpans(ctx, spans) } -func (w *esSpanWriter) writeSpans(spans []*dbmodel.Span) (int, error) { +func (w *esSpanWriter) writeSpans(ctx context.Context, spans []*dbmodel.Span) (int, error) { buffer := &bytes.Buffer{} // mapping for bulk operation to span bulkOperations := make([]bulkItem, len(spans)) @@ -154,7 +154,7 @@ func (w *esSpanWriter) writeSpans(spans []*dbmodel.Span) (int, error) { bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true}) } } - res, err := w.client.Bulk(bytes.NewReader(buffer.Bytes())) + res, err := w.client.Bulk(ctx, bytes.NewReader(buffer.Bytes())) if err != nil { errs = append(errs, err) return len(spans), componenterror.CombineErrors(errs)