From 00b6e96c41f7a03fd99243a6cc2a97a629d9e74c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 1 Jul 2020 14:22:38 +0200 Subject: [PATCH] Add native OTEL ES exporter (#2295) * Reimplement OTEL elasticsearch exporter Signed-off-by: Pavol Loffay * Fix typo in comment Signed-off-by: Pavol Loffay --- .travis.yml | 4 + Makefile | 5 + .../elasticsearchexporter/esclient/client.go | 84 ++++ .../esclient/client_test.go | 63 +++ .../esclient/es6client.go | 87 ++++ .../esclient/es6client_test.go | 119 +++++ .../esclient/es7client.go | 87 ++++ .../esclient/es7client_test.go | 110 +++++ .../elasticsearchexporter/esclient/ping.go | 89 ++++ .../esclient/ping_test.go | 101 ++++ .../esmodeltranslator/modeltranslator.go | 445 ++++++++++++++++++ .../esmodeltranslator/modeltranslator_test.go | 232 +++++++++ .../elasticsearchexporter/exporter.go | 18 +- .../elasticsearchexporter/factory_test.go | 7 +- .../elasticsearchexporter/integration_test.go | 202 ++++++++ .../elasticsearchexporter/spanstore.go | 222 +++++++++ cmd/opentelemetry/go.mod | 3 + cmd/opentelemetry/go.sum | 4 + pkg/es/config/config.go | 100 ++-- plugin/storage/es/factory.go | 24 +- plugin/storage/es/factory_test.go | 2 +- .../storage/integration/badgerstore_test.go | 2 +- .../storage/integration/elasticsearch_test.go | 4 +- .../{integration_test.go => integration.go} | 49 +- ...trace_compare_test.go => trace_compare.go} | 2 + scripts/travis/es-integration-test.sh | 13 +- 26 files changed, 1980 insertions(+), 98 deletions(-) create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go create mode 100644 cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go rename plugin/storage/integration/{integration_test.go => integration.go} (91%) rename plugin/storage/integration/{domain_trace_compare_test.go => trace_compare.go} (96%) diff --git a/.travis.yml b/.travis.yml index 3ad7e5540fe..f1871c1dbb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,6 +30,9 @@ matrix: - go: "1.14.x" env: - ES_INTEGRATION_TEST=true + - go: "1.14.x" + env: + - ES_OTEL_INTEGRATION_TEST=true - go: "1.14.x" env: - KAFKA_INTEGRATION_TEST=true @@ -71,6 +74,7 @@ script: - if [ "$DOCKER" == true ]; then bash ./scripts/travis/upload-all-docker-images.sh ; else echo 'skipping docker upload'; fi - if [ "$DEPLOY" == true ]; then make build-all-platforms ; else echo 'skipping build-all-platforms'; fi - if [ "$ES_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/es-integration-test.sh ; else echo 'skipping elastic search integration test'; fi + - if [ "$ES_OTEL_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/es-integration-test.sh ; else echo 'skipping elastic search integration test'; fi - if [ "$KAFKA_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/kafka-integration-test.sh ; else echo 'skipping kafka integration test'; fi - if [ "$CASSANDRA_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/cassandra-integration-test.sh ; else echo 'skipping cassandra integration test'; fi - if [ "$HOTROD" == true ]; then bash ./scripts/travis/hotrod-integration-test.sh ; else echo 'skipping hotrod example'; fi diff --git a/Makefile b/Makefile index ca2d987f54b..5a128492e25 100644 --- a/Makefile +++ b/Makefile @@ -108,6 +108,11 @@ storage-integration-test: go-gen go clean -testcache bash -c "set -e; set -o pipefail; $(GOTEST) $(STORAGE_PKGS) | $(COLORIZE)" +.PHONY: es-otel-exporter-integration-test +es-otel-exporter-integration-test: go-gen + go clean -testcache + bash -c "set -e; set -o pipefail; cd ${OTEL_COLLECTOR_DIR} && go clean -testcache && $(GOTEST) -tags=integration ./app/exporter/elasticsearchexporter | $(COLORIZE)" + .PHONY: test-compile-es-scripts test-compile-es-scripts: docker run --rm -it -v ${PWD}:/tmp/jaeger python:3-alpine /usr/local/bin/python -m py_compile /tmp/jaeger/plugin/storage/es/esRollover.py diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go new file mode 100644 index 00000000000..d6d0f5e2a20 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client.go @@ -0,0 +1,84 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "fmt" + "io" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +// ElasticsearchClient exposes Elasticsearch API used by Jaeger +type ElasticsearchClient interface { + // PutTemplate creates index template + PutTemplate(name string, template io.Reader) error + // Bulk submits a bulk request + Bulk(bulkBody io.Reader) (*BulkResponse, error) + // AddDataToBulkBuffer creates bulk item from data, index and typ and adds it to bulkBody + AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) +} + +// BulkResponse is a response returned by Elasticsearch Bulk API +type BulkResponse struct { + Errors bool `json:"errors"` + Items []struct { + Index struct { + ID string `json:"_id"` + Result string `json:"result"` + Status int `json:"status"` + Error struct { + Type string `json:"type"` + Reason string `json:"reason"` + Cause struct { + Type string `json:"type"` + Reason string `json:"reason"` + } `json:"caused_by"` + } `json:"error"` + } `json:"index"` + } `json:"items"` +} + +// NewElasticsearchClient returns an instance of Elasticsearch client +func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (ElasticsearchClient, error) { + roundTripper, err := config.GetHTTPRoundTripper(¶ms, logger) + if err != nil { + return nil, err + } + if params.GetVersion() == 0 { + esPing := elasticsearchPing{ + username: params.Username, + password: params.Password, + roundTripper: roundTripper, + } + esVersion, err := esPing.getVersion(params.Servers[0]) + if err != nil { + return nil, err + } + logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) + params.Version = uint(esVersion) + } + switch params.Version { + case 5, 6: + return newElasticsearch6Client(params, roundTripper) + case 7: + return newElasticsearch7Client(params, roundTripper) + default: + return nil, fmt.Errorf("could not create Elasticseach client for version %d", params.Version) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go new file mode 100644 index 00000000000..196cecd7aa0 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/client_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +func TestGetClient(t *testing.T) { + tests := []struct { + version int + err string + }{ + { + version: 5, + }, + { + version: 6, + }, + { + version: 7, + }, + { + version: 1, + err: "could not create Elasticseach client for version 1", + }, + } + for _, test := range tests { + t.Run(test.err, func(t *testing.T) { + client, err := NewElasticsearchClient(config.Configuration{Servers: []string{""}, Version: uint(test.version)}, zap.NewNop()) + if test.err == "" { + require.NoError(t, err) + } + switch test.version { + case 5, 6: + assert.IsType(t, &elasticsearch6Client{}, client) + case 7: + assert.IsType(t, &elasticsearch7Client{}, client) + default: + assert.EqualError(t, err, test.err) + assert.Nil(t, client) + } + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go new file mode 100644 index 00000000000..bab8f0962d1 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client.go @@ -0,0 +1,87 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + elasticsearch6 "github.com/elastic/go-elasticsearch/v6" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +const ( + bulkES6MetaFormat = `{"index":{"_index":"%s","_type":"%s"}}` + "\n" +) + +type elasticsearch6Client struct { + client *elasticsearch6.Client +} + +var _ ElasticsearchClient = (*elasticsearch6Client)(nil) + +func newElasticsearch6Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch6Client, error) { + client, err := elasticsearch6.NewClient(elasticsearch6.Config{ + DiscoverNodesOnStart: params.Sniffer, + Addresses: params.Servers, + Username: params.Username, + Password: params.Password, + Transport: roundTripper, + }) + if err != nil { + return nil, err + } + return &elasticsearch6Client{ + client: client, + }, nil +} + +func (es *elasticsearch6Client) PutTemplate(name string, body io.Reader) error { + resp, err := es.client.Indices.PutTemplate(name, body) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +func (es *elasticsearch6Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data []byte, index, typ string) { + meta := []byte(fmt.Sprintf(bulkES6MetaFormat, index, typ)) + buffer.Grow(len(meta) + len(data) + len("\n")) + buffer.Write(meta) + buffer.Write(data) + buffer.Write([]byte("\n")) +} + +func (es *elasticsearch6Client) Bulk(reader io.Reader) (*BulkResponse, error) { + response, err := es.client.Bulk(reader) + if err != nil { + return nil, err + } + defer response.Body.Close() + if response.StatusCode >= 400 { + return nil, fmt.Errorf("bulk request failed with code %d", response.StatusCode) + } + var blk BulkResponse + err = json.NewDecoder(response.Body).Decode(&blk) + if err != nil { + return nil, err + } + return &blk, nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go new file mode 100644 index 00000000000..657843aab4b --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es6client_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +type mockTransport struct { + Response *http.Response + RoundTripFn func(req *http.Request) (*http.Response, error) +} + +func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return t.RoundTripFn(req) +} + +func TestES6NewClient_err(t *testing.T) { + client, err := newElasticsearch6Client(config.Configuration{ + Sniffer: true, + Servers: []string{"$%"}, + }, &http.Transport{}) + require.Error(t, err) + assert.Nil(t, client) +} + +func TestES6PutTemplateES6Client(t *testing.T) { + mocktrans := &mockTransport{ + Response: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(`{}`)), + }, + } + mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + err = client.PutTemplate("foo", strings.NewReader("bar")) + require.NoError(t, err) +} + +func TestES6AddDataToBulk(t *testing.T) { + client, err := newElasticsearch6Client(config.Configuration{}, &http.Transport{}) + require.NoError(t, err) + assert.NotNil(t, client) + + buf := &bytes.Buffer{} + client.AddDataToBulkBuffer(buf, []byte("data"), "foo", "bar") + assert.Equal(t, "{\"index\":{\"_index\":\"foo\",\"_type\":\"bar\"}}\ndata\n", buf.String()) +} + +func TestES6Bulk(t *testing.T) { + tests := []struct { + resp *http.Response + err string + }{ + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, + }, + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "looking for beginning of object key string", + }, + { + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "bulk request failed with code 400", + }, + } + for _, test := range tests { + t.Run(test.err, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: test.resp, + } + mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + + client, err := newElasticsearch6Client(config.Configuration{}, mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + _, err = client.Bulk(strings.NewReader("data")) + if test.err != "" { + fmt.Println() + assert.Contains(t, err.Error(), test.err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go new file mode 100644 index 00000000000..ba6cd4f4f79 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client.go @@ -0,0 +1,87 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + elasticsearch7 "github.com/elastic/go-elasticsearch/v7" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +const ( + bulkES7MetaFormat = `{"index":{"_index":"%s"}}` + "\n" +) + +type elasticsearch7Client struct { + client *elasticsearch7.Client +} + +var _ ElasticsearchClient = (*elasticsearch7Client)(nil) + +func newElasticsearch7Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch7Client, error) { + client, err := elasticsearch7.NewClient(elasticsearch7.Config{ + Addresses: params.Servers, + Username: params.Username, + Password: params.Password, + Transport: roundTripper, + }) + if err != nil { + return nil, err + } + return &elasticsearch7Client{ + client: client, + }, nil +} + +func (es *elasticsearch7Client) PutTemplate(name string, body io.Reader) error { + resp, err := es.client.Indices.PutTemplate(body, name) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +func (es *elasticsearch7Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data []byte, index, _ string) { + meta := []byte(fmt.Sprintf(bulkES7MetaFormat, index)) + buffer.Grow(len(meta) + len(data) + len("\n")) + buffer.Write(meta) + buffer.Write(data) + buffer.Write([]byte("\n")) +} + +func (es *elasticsearch7Client) Bulk(reader io.Reader) (*BulkResponse, error) { + response, err := es.client.Bulk(reader) + if err != nil { + return nil, err + } + defer response.Body.Close() + if response.StatusCode >= 400 { + return nil, fmt.Errorf("bulk request failed with code %d", response.StatusCode) + } + + var blk BulkResponse + err = json.NewDecoder(response.Body).Decode(&blk) + if err != nil { + return nil, err + } + return &blk, nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go new file mode 100644 index 00000000000..5f2c490d75e --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/es7client_test.go @@ -0,0 +1,110 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +func TestES7NewClient_err(t *testing.T) { + client, err := newElasticsearch6Client(config.Configuration{ + Sniffer: true, + Servers: []string{"$%"}, + }, &http.Transport{}) + require.Error(t, err) + assert.Nil(t, client) +} + +func TestES7PutTemplateES6Client(t *testing.T) { + mocktrans := &mockTransport{ + Response: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(`{}`)), + }, + } + mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + client, err := newElasticsearch7Client(config.Configuration{}, mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + err = client.PutTemplate("foo", strings.NewReader("bar")) + require.NoError(t, err) +} + +func TestES7AddDataToBulk(t *testing.T) { + client, err := newElasticsearch7Client(config.Configuration{}, &http.Transport{}) + require.NoError(t, err) + assert.NotNil(t, client) + + buf := &bytes.Buffer{} + client.AddDataToBulkBuffer(buf, []byte("data"), "foo", "bar") + assert.Equal(t, "{\"index\":{\"_index\":\"foo\"}}\ndata\n", buf.String()) +} + +func TestES7Bulk(t *testing.T) { + tests := []struct { + resp *http.Response + err string + }{ + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, + }, + { + resp: &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "looking for beginning of object key string", + }, + { + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: ioutil.NopCloser(strings.NewReader("{#}")), + }, + err: "bulk request failed with code 400", + }, + } + for _, test := range tests { + t.Run(test.err, func(t *testing.T) { + mocktrans := &mockTransport{ + Response: test.resp, + } + mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil } + + client, err := newElasticsearch7Client(config.Configuration{}, mocktrans) + require.NoError(t, err) + assert.NotNil(t, client) + _, err = client.Bulk(strings.NewReader("data")) + if test.err != "" { + fmt.Println() + assert.Contains(t, err.Error(), test.err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping.go new file mode 100644 index 00000000000..84f99883ffe --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping.go @@ -0,0 +1,89 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "strconv" +) + +// elasticsearchPing is used to get Elasticsearch version. +// ES native client cannot be used because its version should be known beforehand. +type elasticsearchPing struct { + username string + password string + roundTripper http.RoundTripper +} + +func (p *elasticsearchPing) getPingResponse(url string) (*pingResponse, error) { + client := http.Client{ + Transport: p.roundTripper, + } + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + if p.username != "" && p.password != "" { + req.Header.Add("Authorization", "Basic "+basicAuth(p.username, p.password)) + client.CheckRedirect = redirectPolicyFunc(p.username, p.password) + } + response, err := client.Do(req) + if err != nil { + return nil, err + } + defer response.Body.Close() + var pingResp pingResponse + if err := json.NewDecoder(response.Body).Decode(&pingResp); err != nil { + return nil, err + } + return &pingResp, nil +} + +func (p *elasticsearchPing) getVersion(url string) (int, error) { + pingResponse, err := p.getPingResponse(url) + if err != nil { + return 0, err + } + esVersion, err := strconv.Atoi(string(pingResponse.Version.Number[0])) + if err != nil { + return 0, fmt.Errorf("elasticsearch verision %s cannot be converted to a number: %v", pingResponse.Version, err) + } + return esVersion, nil +} + +func basicAuth(username, password string) string { + auth := username + ":" + password + return base64.StdEncoding.EncodeToString([]byte(auth)) +} + +func redirectPolicyFunc(username, password string) func(req *http.Request, via []*http.Request) error { + return func(req *http.Request, via []*http.Request) error { + auth := basicAuth(username, password) + req.Header.Add("Authorization", "Basic "+auth) + return nil + } +} + +type pingResponse struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version struct { + Number string `json:"number"` + } `json:"version"` +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping_test.go new file mode 100644 index 00000000000..3958c7219e4 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient/ping_test.go @@ -0,0 +1,101 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esclient + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const es5PingResponse = `{ + "name" : "H_3P3Ll", + "cluster_name" : "docker-cluster", + "cluster_uuid" : "j-Kn4lBKTdCE1Cp9V4iYcA", + "version" : { + "number" : "5.6.10", + "build_hash" : "b727a60", + "build_date" : "2018-06-06T15:48:34.860Z", + "build_snapshot" : false, + "lucene_version" : "6.6.1" + }, + "tagline" : "You Know, for Search" +}` +const es8PingResponse = `{ + "name" : "509984c472e3", + "cluster_name" : "docker-cluster", + "cluster_uuid" : "GJbBkpRLQZil3DiqGFUTDQ", + "version" : { + "number" : "8.0.0-SNAPSHOT", + "build_flavor" : "oss", + "build_type" : "docker", + "build_hash" : "ebe89518795211eeba01b21c65d9396702441d0a", + "build_date" : "2020-06-16T17:13:26.051209Z", + "build_snapshot" : true, + "lucene_version" : "8.6.0", + "minimum_wire_compatibility_version" : "7.9.0", + "minimum_index_compatibility_version" : "7.0.0" + }, + "tagline" : "You Know, for Search" +}` + +func TestPing(t *testing.T) { + tests := []struct { + name string + resp string + err string + version int + }{ + { + name: "e5", + resp: es5PingResponse, + version: 5, + }, + { + name: "es8", + resp: es8PingResponse, + version: 8, + }, + { + name: "wrong response", + resp: "foo", + err: "invalid character 'o' in literal false (expecting 'a')", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.Write([]byte(test.resp)) + }), + ) + defer ts.Close() + esPing := elasticsearchPing{} + version, err := esPing.getVersion(ts.URL) + if test.err != "" { + assert.Contains(t, err.Error(), test.err) + assert.Equal(t, 0, version) + } else { + require.NoError(t, err) + assert.Equal(t, test.version, version) + } + }) + } +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go new file mode 100644 index 00000000000..8cb46d181ca --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go @@ -0,0 +1,445 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esmodeltranslator + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" + tracetranslator "go.opentelemetry.io/collector/translator/trace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" +) + +const ( + eventNameKey = "event" +) + +var ( + errZeroTraceID = errors.New("traceID is zero") + errZeroSpanID = errors.New("spanID is zero") +) + +// Translator configures translator +type Translator struct { + allTagsAsFields bool + tagKeysAsFields map[string]bool + tagDotReplacement string +} + +// NewTranslator returns new translator instance +func NewTranslator(allTagsAsFields bool, tagsKeysAsFields []string, tagDotReplacement string) *Translator { + tagsKeysAsFieldsMap := map[string]bool{} + for _, v := range tagsKeysAsFields { + tagsKeysAsFieldsMap[v] = true + } + return &Translator{ + allTagsAsFields: allTagsAsFields, + tagKeysAsFields: tagsKeysAsFieldsMap, + tagDotReplacement: tagDotReplacement, + } +} + +// ConvertSpans converts spans from OTEL model to Jaeger Elasticsearch model +func (c *Translator) ConvertSpans(traces pdata.Traces) ([]*dbmodel.Span, error) { + rss := traces.ResourceSpans() + if rss.Len() == 0 { + return nil, nil + } + dbSpans := make([]*dbmodel.Span, 0, traces.SpanCount()) + for i := 0; i < rss.Len(); i++ { + // this would correspond to a single batch + err := c.resourceSpans(rss.At(i), &dbSpans) + if err != nil { + return nil, err + } + } + return dbSpans, nil +} + +func (c *Translator) resourceSpans(spans pdata.ResourceSpans, dbSpans *[]*dbmodel.Span) error { + ils := spans.InstrumentationLibrarySpans() + process := c.process(spans.Resource()) + for i := 0; i < ils.Len(); i++ { + // TODO convert instrumentation library info + //ils.At(i).InstrumentationLibrary() + spans := ils.At(i).Spans() + for j := 0; j < spans.Len(); j++ { + dbSpan, err := c.spanWithoutProcess(spans.At(j)) + if err != nil { + return err + } + dbSpan.Process = *process + *dbSpans = append(*dbSpans, dbSpan) + } + } + return nil +} + +func (c *Translator) spanWithoutProcess(span pdata.Span) (*dbmodel.Span, error) { + if span.IsNil() { + return nil, nil + } + traceID, err := convertTraceID(span.TraceID()) + if err != nil { + return nil, err + } + spanID, err := convertSpanID(span.SpanID()) + if err != nil { + return nil, err + } + references, err := references(span.Links(), span.ParentSpanID(), traceID) + if err != nil { + return nil, err + } + startTime := toTime(span.StartTime()) + startTimeMicros := model.TimeAsEpochMicroseconds(startTime) + tags, tagMap := c.tags(span) + return &dbmodel.Span{ + TraceID: traceID, + SpanID: spanID, + References: references, + OperationName: span.Name(), + StartTime: startTimeMicros, + StartTimeMillis: startTimeMicros / 1000, + Duration: model.DurationAsMicroseconds(toTime(span.EndTime()).Sub(startTime)), + Tags: tags, + Tag: tagMap, + Logs: logs(span.Events()), + }, nil +} + +func toTime(nano pdata.TimestampUnixNano) time.Time { + return time.Unix(0, int64(nano)).UTC() +} + +func references(links pdata.SpanLinkSlice, parentSpanID pdata.SpanID, traceID dbmodel.TraceID) ([]dbmodel.Reference, error) { + parentSpanIDSet := len(parentSpanID.Bytes()) != 0 + if !parentSpanIDSet && links.Len() == 0 { + return nil, nil + } + + refsCount := links.Len() + if parentSpanIDSet { + refsCount++ + } + + refs := make([]dbmodel.Reference, 0, refsCount) + + // Put parent span ID at the first place because usually backends look for it + // as the first CHILD_OF item in the model.SpanRef slice. + if parentSpanIDSet { + jParentSpanID, err := convertSpanID(parentSpanID) + if err != nil { + return nil, fmt.Errorf("OC incorrect parent span ID: %v", err) + } + refs = append(refs, dbmodel.Reference{ + TraceID: traceID, + SpanID: jParentSpanID, + RefType: dbmodel.ChildOf, + }) + } + + for i := 0; i < links.Len(); i++ { + link := links.At(i) + if link.IsNil() { + continue + } + + traceID, err := convertTraceID(link.TraceID()) + if err != nil { + continue // skip invalid link + } + + spanID, err := convertSpanID(link.SpanID()) + if err != nil { + continue // skip invalid link + } + + refs = append(refs, dbmodel.Reference{ + TraceID: traceID, + SpanID: spanID, + // Since Jaeger RefType is not captured in internal data, + // use SpanRefType_FOLLOWS_FROM by default. + // SpanRefType_CHILD_OF supposed to be set only from parentSpanID. + RefType: dbmodel.FollowsFrom, + }) + } + return refs, nil +} + +func convertSpanID(spanID pdata.SpanID) (dbmodel.SpanID, error) { + spanIDInt, err := tracetranslator.BytesToUInt64SpanID(spanID) + if err != nil { + return "", err + } + if spanIDInt == 0 { + return "", errZeroSpanID + } + return dbmodel.SpanID(fmt.Sprintf("%016x", spanIDInt)), nil +} + +func convertTraceID(traceID pdata.TraceID) (dbmodel.TraceID, error) { + high, low, err := tracetranslator.BytesToUInt64TraceID(traceID) + if err != nil { + return "", err + } + if low == 0 && high == 0 { + return "", errZeroTraceID + } + return dbmodel.TraceID(traceIDToString(high, low)), nil +} + +func traceIDToString(high, low uint64) string { + if high == 0 { + return fmt.Sprintf("%016x", low) + } + return fmt.Sprintf("%016x%016x", high, low) +} + +func (c *Translator) process(resource pdata.Resource) *dbmodel.Process { + if resource.IsNil() || resource.Attributes().Len() == 0 { + return nil + } + p := &dbmodel.Process{} + attrs := resource.Attributes() + attrsCount := attrs.Len() + if serviceName, ok := attrs.Get(conventions.AttributeServiceName); ok { + p.ServiceName = serviceName.StringVal() + attrsCount-- + } + if attrsCount == 0 { + return p + } + tags := make([]dbmodel.KeyValue, 0, attrsCount) + var tagMap map[string]interface{} + if c.allTagsAsFields || len(c.tagKeysAsFields) > 0 { + tagMap = make(map[string]interface{}, attrsCount) + } + tags, tagMap = c.appendTagsFromAttributes(tags, tagMap, attrs, true) + p.Tags = tags + if len(tagMap) > 0 { + p.Tag = tagMap + } + return p +} + +func (c *Translator) tags(span pdata.Span) ([]dbmodel.KeyValue, map[string]interface{}) { + var spanKindTag, statusCodeTag, errorTag, statusMsgTag dbmodel.KeyValue + var spanKindTagFound, statusCodeTagFound, errorTagFound, statusMsgTagFound bool + tagsCount := span.Attributes().Len() + spanKindTag, spanKindTagFound = getTagFromSpanKind(span.Kind()) + if spanKindTagFound { + tagsCount++ + } + status := span.Status() + if !status.IsNil() { + statusCodeTag, statusCodeTagFound = getTagFromStatusCode(status.Code()) + tagsCount++ + + errorTag, errorTagFound = getErrorTagFromStatusCode(status.Code()) + if errorTagFound { + tagsCount++ + } + + statusMsgTag, statusMsgTagFound = getTagFromStatusMsg(status.Message()) + if statusMsgTagFound { + tagsCount++ + } + } + + if tagsCount == 0 { + return nil, nil + } + + tags := make([]dbmodel.KeyValue, 0, tagsCount) + var tagMap map[string]interface{} + if spanKindTagFound { + if c.allTagsAsFields || c.tagKeysAsFields[spanKindTag.Key] { + tagMap = c.addToTagMap(spanKindTag.Key, spanKindTag.Value, tagMap) + } else { + tags = append(tags, spanKindTag) + } + } + if statusCodeTagFound { + if c.allTagsAsFields || c.tagKeysAsFields[statusCodeTag.Key] { + tagMap = c.addToTagMap(statusCodeTag.Key, statusCodeTag.Value, tagMap) + } else { + tags = append(tags, statusCodeTag) + } + } + if errorTagFound { + if c.allTagsAsFields || c.tagKeysAsFields[errorTag.Key] { + tagMap = c.addToTagMap(errorTag.Key, errorTag.Value, tagMap) + } else { + tags = append(tags, errorTag) + } + } + if statusMsgTagFound { + if c.allTagsAsFields || c.tagKeysAsFields[statusMsgTag.Key] { + tagMap = c.addToTagMap(statusMsgTag.Key, statusMsgTag.Value, tagMap) + } else { + tags = append(tags, statusMsgTag) + } + } + return c.appendTagsFromAttributes(tags, tagMap, span.Attributes(), false) +} + +func (c *Translator) addToTagMap(key string, val interface{}, tagMap map[string]interface{}) map[string]interface{} { + if tagMap == nil { + tagMap = map[string]interface{}{} + } + tagMap[strings.Replace(key, ".", c.tagDotReplacement, -1)] = val + return tagMap +} + +func getTagFromSpanKind(spanKind pdata.SpanKind) (dbmodel.KeyValue, bool) { + var tagStr string + switch spanKind { + case pdata.SpanKindCLIENT: + tagStr = string(tracetranslator.OpenTracingSpanKindClient) + case pdata.SpanKindSERVER: + tagStr = string(tracetranslator.OpenTracingSpanKindServer) + case pdata.SpanKindPRODUCER: + tagStr = string(tracetranslator.OpenTracingSpanKindProducer) + case pdata.SpanKindCONSUMER: + tagStr = string(tracetranslator.OpenTracingSpanKindConsumer) + default: + return dbmodel.KeyValue{}, false + } + return dbmodel.KeyValue{ + Key: tracetranslator.TagSpanKind, + Type: dbmodel.StringType, + Value: tagStr, + }, true +} + +func getTagFromStatusCode(statusCode pdata.StatusCode) (dbmodel.KeyValue, bool) { + return dbmodel.KeyValue{ + Key: tracetranslator.TagStatusCode, + // TODO is this ok? + Value: statusCode.String(), + Type: dbmodel.StringType, + }, true +} + +func getErrorTagFromStatusCode(statusCode pdata.StatusCode) (dbmodel.KeyValue, bool) { + if statusCode == pdata.StatusCode(0) { + return dbmodel.KeyValue{}, false + } + return dbmodel.KeyValue{ + Key: tracetranslator.TagError, + Value: "true", + Type: dbmodel.BoolType, + }, true +} + +func getTagFromStatusMsg(statusMsg string) (dbmodel.KeyValue, bool) { + if statusMsg == "" { + return dbmodel.KeyValue{}, false + } + return dbmodel.KeyValue{ + Key: tracetranslator.TagStatusMsg, + Value: statusMsg, + Type: dbmodel.StringType, + }, true +} + +func logs(events pdata.SpanEventSlice) []dbmodel.Log { + if events.Len() == 0 { + return nil + } + logs := make([]dbmodel.Log, 0, events.Len()) + for i := 0; i < events.Len(); i++ { + event := events.At(i) + if event.IsNil() { + continue + } + var fields []dbmodel.KeyValue + if event.Attributes().Len() > 0 { + fields = make([]dbmodel.KeyValue, 0, event.Attributes().Len()+1) + if event.Name() != "" { + fields = append(fields, dbmodel.KeyValue{Key: eventNameKey, Value: event.Name(), Type: dbmodel.StringType}) + } + event.Attributes().ForEach(func(k string, v pdata.AttributeValue) { + fields = append(fields, attributeToKeyValue(k, v)) + }) + } + logs = append(logs, dbmodel.Log{ + Timestamp: model.TimeAsEpochMicroseconds(toTime(event.Timestamp())), + Fields: fields, + }) + } + return logs +} + +func (c *Translator) appendTagsFromAttributes(tags []dbmodel.KeyValue, tagMap map[string]interface{}, attrs pdata.AttributeMap, skipService bool) ([]dbmodel.KeyValue, map[string]interface{}) { + attrs.ForEach(func(key string, attr pdata.AttributeValue) { + if skipService && key == conventions.AttributeServiceName { + return + } + if c.allTagsAsFields || c.tagKeysAsFields[key] { + tagMap = c.addToTagMap(key, attributeValueToInterface(attr), tagMap) + } else { + tags = append(tags, attributeToKeyValue(key, attr)) + } + }) + return tags, tagMap +} + +func attributeToKeyValue(key string, attr pdata.AttributeValue) dbmodel.KeyValue { + tag := dbmodel.KeyValue{ + Key: key, + } + switch attr.Type() { + case pdata.AttributeValueSTRING: + tag.Type = dbmodel.StringType + tag.Value = attr.StringVal() + case pdata.AttributeValueBOOL: + tag.Type = dbmodel.BoolType + if attr.BoolVal() { + tag.Value = "true" + } else { + tag.Value = "false" + } + case pdata.AttributeValueINT: + tag.Type = dbmodel.Int64Type + tag.Value = strconv.FormatInt(attr.IntVal(), 10) + case pdata.AttributeValueDOUBLE: + tag.Type = dbmodel.Float64Type + tag.Value = strconv.FormatFloat(attr.DoubleVal(), 'g', 10, 64) + } + return tag +} + +func attributeValueToInterface(attr pdata.AttributeValue) interface{} { + switch attr.Type() { + case pdata.AttributeValueSTRING: + return attr.StringVal() + case pdata.AttributeValueBOOL: + return attr.BoolVal() + case pdata.AttributeValueINT: + return attr.IntVal() + case pdata.AttributeValueDOUBLE: + return attr.DoubleVal() + } + return nil +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go new file mode 100644 index 00000000000..3b9c0027190 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go @@ -0,0 +1,232 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package esmodeltranslator + +import ( + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" + + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" +) + +var ( + traceID = []byte("0123456789abcdef") + spanID = []byte("01234567") +) + +func TestAttributeToKeyValue(t *testing.T) { + tests := []struct { + key string + attr pdata.AttributeValue + keyValue dbmodel.KeyValue + }{ + { + key: "foo", + attr: pdata.NewAttributeValueString("bar"), + keyValue: dbmodel.KeyValue{Key: "foo", Value: "bar", Type: dbmodel.StringType}, + }, + { + key: "foo", + attr: pdata.NewAttributeValueBool(true), + keyValue: dbmodel.KeyValue{Key: "foo", Value: "true", Type: dbmodel.BoolType}, + }, + { + key: "foo", + attr: pdata.NewAttributeValueBool(false), + keyValue: dbmodel.KeyValue{Key: "foo", Value: "false", Type: dbmodel.BoolType}, + }, + { + key: "foo", + attr: pdata.NewAttributeValueInt(15), + keyValue: dbmodel.KeyValue{Key: "foo", Value: "15", Type: dbmodel.Int64Type}, + }, + { + key: "foo", + attr: pdata.NewAttributeValueDouble(16.42), + keyValue: dbmodel.KeyValue{Key: "foo", Value: "16.42", Type: dbmodel.Float64Type}, + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("%s:%v", test.keyValue.Key, test.keyValue.Value), func(t *testing.T) { + keyValue := attributeToKeyValue(test.key, test.attr) + assert.Equal(t, test.keyValue, keyValue) + }) + } +} + +func TestTagMapValue(t *testing.T) { + tests := []struct { + attr pdata.AttributeValue + value interface{} + }{ + { + attr: pdata.NewAttributeValueString("foo"), + value: "foo", + }, + { + attr: pdata.NewAttributeValueBool(true), + value: true, + }, + { + attr: pdata.NewAttributeValueInt(15), + value: int64(15), + }, + { + attr: pdata.NewAttributeValueDouble(123.66), + value: float64(123.66), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.value), func(t *testing.T) { + val := attributeValueToInterface(test.attr) + assert.Equal(t, test.value, val) + }) + } +} + +func TestConvertSpan(t *testing.T) { + traces := traces("myservice") + resource := traces.ResourceSpans().At(0).Resource() + resource.Attributes().InsertDouble("num", 16.66) + span := addSpan(traces, "root", traceID, spanID) + span.SetKind(pdata.SpanKindCLIENT) + span.Status().InitEmpty() + span.Status().SetCode(1) + span.Status().SetMessage("messagetext") + span.SetStartTime(pdata.TimestampUnixNano(1000000)) + span.SetEndTime(pdata.TimestampUnixNano(2000000)) + span.Attributes().Insert("foo", pdata.NewAttributeValueBool(true)) + span.Attributes().Insert("toTagMap", pdata.NewAttributeValueString("val")) + span.Events().Resize(1) + span.Events().At(0).SetName("eventName") + span.Events().At(0).SetTimestamp(500000) + span.Events().At(0).Attributes().InsertString("foo", "bar") + span.SetParentSpanID(spanID) + span.Links().Resize(1) + span.Links().At(0).InitEmpty() + span.Links().At(0).SetSpanID(pdata.NewSpanID(spanID)) + span.Links().At(0).SetTraceID(pdata.NewTraceID(traceID)) + + c := &Translator{ + tagKeysAsFields: map[string]bool{"toTagMap": true}, + } + spans, err := c.ConvertSpans(traces) + require.NoError(t, err) + assert.Equal(t, 1, len(spans)) + assert.Equal(t, &dbmodel.Span{ + TraceID: "30313233343536373839616263646566", + SpanID: "3031323334353637", + StartTime: 1000, + Duration: 1000, + OperationName: "root", + StartTimeMillis: 1, + Tags: []dbmodel.KeyValue{ + {Key: "span.kind", Type: dbmodel.StringType, Value: "client"}, + {Key: "status.code", Type: dbmodel.StringType, Value: "Cancelled"}, + {Key: "error", Type: dbmodel.BoolType, Value: "true"}, + {Key: "status.message", Type: dbmodel.StringType, Value: "messagetext"}, + {Key: "foo", Type: dbmodel.BoolType, Value: "true"}}, + Tag: map[string]interface{}{"toTagMap": "val"}, + Logs: []dbmodel.Log{{Fields: []dbmodel.KeyValue{ + {Key: "event", Value: "eventName", Type: dbmodel.StringType}, + {Key: "foo", Value: "bar", Type: dbmodel.StringType}}, Timestamp: 500}}, + References: []dbmodel.Reference{ + {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.ChildOf}, + {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.FollowsFrom}}, + Process: dbmodel.Process{ + ServiceName: "myservice", + Tags: []dbmodel.KeyValue{{Key: "num", Value: "16.66", Type: dbmodel.Float64Type}}, + }, + }, spans[0]) +} + +func TestEmpty(t *testing.T) { + c := &Translator{} + spans, err := c.ConvertSpans(pdata.NewTraces()) + require.NoError(t, err) + assert.Nil(t, spans) +} + +func TestErrorIDs(t *testing.T) { + zero64Bytes := make([]byte, 16) + binary.LittleEndian.PutUint64(zero64Bytes, 0) + binary.LittleEndian.PutUint64(zero64Bytes, 0) + tests := []struct { + spanID []byte + traceID []byte + err string + }{ + { + traceID: []byte("invalid-%"), + err: "TraceID does not have 16 bytes", + }, + { + traceID: traceID, + spanID: []byte("invalid-%"), + err: "SpanID does not have 8 bytes", + }, + { + traceID: traceID, + spanID: zero64Bytes[:8], + err: errZeroSpanID.Error(), + }, + { + traceID: zero64Bytes, + spanID: spanID, + err: errZeroTraceID.Error(), + }, + } + for _, test := range tests { + t.Run(test.err, func(t *testing.T) { + c := &Translator{} + traces := traces("foo") + addSpan(traces, "foo", test.traceID, test.spanID) + spans, err := c.ConvertSpans(traces) + assert.EqualError(t, err, test.err) + assert.Nil(t, spans) + }) + } + +} + +func traces(serviceName string) pdata.Traces { + traces := pdata.NewTraces() + traces.ResourceSpans().Resize(1) + traces.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + traces.ResourceSpans().At(0).Resource().InitEmpty() + traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{conventions.AttributeServiceName: pdata.NewAttributeValueString(serviceName)}) + return traces +} + +func addSpan(traces pdata.Traces, name string, traceID []byte, spanID []byte) pdata.Span { + rspans := traces.ResourceSpans() + instSpans := rspans.At(0).InstrumentationLibrarySpans() + spans := instSpans.At(0).Spans() + spans.Resize(spans.Len() + 1) + span := spans.At(spans.Len() - 1) + span.SetName(name) + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetStartTime(pdata.TimestampUnixNano(time.Now().UnixNano())) + span.SetEndTime(pdata.TimestampUnixNano(time.Now().UnixNano())) + return span +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index e307ba45bbc..393b3b5b809 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -15,20 +15,26 @@ package elasticsearchexporter import ( - "github.com/uber/jaeger-lib/metrics" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" - "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter" "github.com/jaegertracing/jaeger/plugin/storage/es" ) // new creates Elasticsearch exporter/storage. func new(config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) { - factory := es.NewFactory() - factory.InitFromOptions(config.Options) - err := factory.Initialize(metrics.NullFactory, params.Logger) + esCfg := config.GetPrimary() + w, err := newEsSpanWriter(*esCfg, params.Logger) if err != nil { return nil, err } - return exporter.NewSpanWriterExporter(&config.ExporterSettings, factory) + if config.Primary.IsCreateIndexTemplates() { + spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), esCfg.GetVersion()) + if err = w.CreateTemplates(spanMapping, serviceMapping); err != nil { + return nil, err + } + } + return exporterhelper.NewTraceExporter( + config, + w.WriteTraces) } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory_test.go index 7061694d5fe..3ffa169dfd4 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" + "go.uber.org/zap" jConfig "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/plugin/storage/es" @@ -36,9 +37,11 @@ func TestCreateTraceExporter(t *testing.T) { factory := &Factory{OptionsFactory: func() *es.Options { return opts }} - exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, factory.CreateDefaultConfig()) + config := factory.CreateDefaultConfig().(*Config) + config.Primary.Servers = []string{"http://foobardoesnotexists.test"} + exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config) require.Nil(t, exporter) - assert.Contains(t, err.Error(), "failed to create primary Elasticsearch client") + assert.Contains(t, err.Error(), "no such host") } func TestCreateTraceExporter_nilConfig(t *testing.T) { diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go new file mode 100644 index 00000000000..7f68f10d928 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -0,0 +1,202 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build integration + +package elasticsearchexporter + +import ( + "context" + "errors" + "net/http" + "strconv" + "testing" + "time" + + "github.com/olivere/elastic" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es/config" + eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" + "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/plugin/storage/es" + "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/plugin/storage/integration" +) + +const ( + host = "0.0.0.0" + queryPort = "9200" + queryHostPort = host + ":" + queryPort + queryURL = "http://" + queryHostPort + indexPrefix = "integration-test" + tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 +) + +type IntegrationTest struct { + integration.StorageIntegration + + client *elastic.Client + bulkProcessor *elastic.BulkProcessor + logger *zap.Logger +} + +type storageWrapper struct { + writer *esSpanWriter +} + +func (s storageWrapper) WriteSpan(span *model.Span) error { + // This fails because there is no binary tag type in OTEL and also OTEL span's status code is always created + //traces := jaegertranslator.ProtoBatchesToInternalTraces([]*model.Batch{{Process: span.Process, Spans: []*model.Span{span}}}) + //_, err := s.writer.WriteTraces(context.Background(), traces) + converter := dbmodel.FromDomain{} + dbSpan := converter.FromDomainEmbedProcess(span) + _, err := s.writer.writeSpans([]*dbmodel.Span{dbSpan}) + return err +} + +func (s *IntegrationTest) getVersion() (uint, error) { + pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) + if err != nil { + return 0, err + } + esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) + if err != nil { + return 0, err + } + return uint(esVersion), nil +} + +func (s *IntegrationTest) initializeES(allTagsAsFields bool) error { + rawClient, err := elastic.NewClient( + elastic.SetURL(queryURL), + elastic.SetSniff(false)) + if err != nil { + return err + } + s.logger, _ = testutils.NewLogger() + + s.client = rawClient + s.initSpanstore(allTagsAsFields) + s.CleanUp = func() error { + return s.esCleanUp(allTagsAsFields) + } + s.Refresh = s.esRefresh + s.esCleanUp(allTagsAsFields) + // TODO: remove this flag after ES support returning spanKind when get operations + s.NotSupportSpanKindWithOperation = true + return nil +} + +func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error { + _, err := s.client.DeleteIndex("*").Do(context.Background()) + if err != nil { + return err + } + return s.initSpanstore(allTagsAsFields) +} + +func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { + bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) + s.bulkProcessor = bp + esVersion, err := s.getVersion() + if err != nil { + return err + } + client := eswrapper.WrapESClient(s.client, bp, esVersion) + spanMapping, serviceMapping := es.GetSpanServiceMappings(5, 1, client.GetVersion()) + + w, err := newEsSpanWriter(config.Configuration{ + Servers: []string{queryURL}, + IndexPrefix: indexPrefix, + Tags: config.TagsAsFields{ + AllAsFields: allTagsAsFields, + }, + }, s.logger) + if err != nil { + return err + } + err = w.CreateTemplates(spanMapping, serviceMapping) + if err != nil { + return err + } + s.SpanWriter = storageWrapper{ + writer: w, + } + s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ + Client: client, + Logger: s.logger, + MetricsFactory: metrics.NullFactory, + IndexPrefix: indexPrefix, + MaxSpanAge: maxSpanAge, + TagDotReplacement: tagKeyDeDotChar, + }) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) + depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) + err = dependencyStore.CreateTemplates(depMapping) + if err != nil { + return err + } + s.DependencyReader = dependencyStore + s.DependencyWriter = dependencyStore + return nil +} + +func (s *IntegrationTest) esRefresh() error { + err := s.bulkProcessor.Flush() + if err != nil { + return err + } + _, err = s.client.Refresh().Do(context.Background()) + return err +} + +func healthCheck() error { + for i := 0; i < 200; i++ { + if _, err := http.Get(queryURL); err == nil { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return errors.New("elastic search is not ready") +} + +func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { + if err := healthCheck(); err != nil { + t.Fatal(err) + } + s := &IntegrationTest{ + StorageIntegration: integration.StorageIntegration{ + FixturesPath: "../../../../../plugin/storage/integration", + }, + } + require.NoError(t, s.initializeES(allTagsAsFields)) + s.Fixtures = integration.LoadAndParseQueryTestCases(t, "../../../../../plugin/storage/integration/fixtures/queries_es.json") + s.IntegrationTestAll(t) +} + +func TestElasticsearchStorage(t *testing.T) { + testElasticsearchStorage(t, false) +} + +func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) { + testElasticsearchStorage(t, true) +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go new file mode 100644 index 00000000000..938246ed888 --- /dev/null +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -0,0 +1,222 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package elasticsearchexporter + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "hash/fnv" + "strings" + "time" + + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esclient" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/cache" + "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" +) + +const ( + spanIndexBaseName = "jaeger-span" + serviceIndexBaseName = "jaeger-service" + spanTypeName = "span" + serviceTypeName = "service" + indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 +) + +// esSpanWriter holds components required for ES span writer +type esSpanWriter struct { + logger *zap.Logger + client esclient.ElasticsearchClient + serviceCache cache.Cache + spanIndexName indexNameProvider + serviceIndexName indexNameProvider + translator *esmodeltranslator.Translator +} + +// newEsSpanWriter creates new instance of esSpanWriter +func newEsSpanWriter(params config.Configuration, logger *zap.Logger) (*esSpanWriter, error) { + client, err := esclient.NewElasticsearchClient(params, logger) + if err != nil { + return nil, err + } + tagsKeysAsFields, err := config.LoadTagsFromFile(params.Tags.File) + if err != nil { + return nil, err + } + return &esSpanWriter{ + client: client, + spanIndexName: newIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases), + serviceIndexName: newIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.UseReadWriteAliases), + translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()), + serviceCache: cache.NewLRUWithOptions( + // we do not expect more than 100k unique services + 100_000, + &cache.Options{ + TTL: time.Hour * 12, + }, + ), + }, nil +} + +func newIndexNameProvider(index, prefix string, useAliases bool) indexNameProvider { + if prefix != "" { + prefix = prefix + "-" + index = prefix + index + } + index = index + "-" + if useAliases { + index = index + "write" + } + return indexNameProvider{ + index: index, + useAlias: useAliases, + } +} + +type indexNameProvider struct { + index string + useAlias bool +} + +func (n indexNameProvider) get(date time.Time) string { + if n.useAlias { + return n.index + } + spanDate := date.UTC().Format(indexDateFormat) + return n.index + spanDate +} + +// CreateTemplates creates index templates. +func (w *esSpanWriter) CreateTemplates(spanTemplate, serviceTemplate string) error { + err := w.client.PutTemplate(spanIndexBaseName, strings.NewReader(spanTemplate)) + if err != nil { + return err + } + err = w.client.PutTemplate(serviceIndexBaseName, strings.NewReader(serviceTemplate)) + if err != nil { + return err + } + return nil +} + +// WriteTraces writes traces to the storage +func (w *esSpanWriter) WriteTraces(_ context.Context, traces pdata.Traces) (int, error) { + spans, err := w.translator.ConvertSpans(traces) + if err != nil { + return traces.SpanCount(), consumererror.Permanent(err) + } + return w.writeSpans(spans) +} + +func (w *esSpanWriter) writeSpans(spans []*dbmodel.Span) (int, error) { + buffer := &bytes.Buffer{} + // mapping for bulk operation to span + bulkOperations := make([]bulkItem, len(spans)) + var errs []error + dropped := 0 + for _, span := range spans { + data, err := json.Marshal(span) + if err != nil { + errs = append(errs, err) + dropped++ + continue + } + indexName := w.spanIndexName.get(model.EpochMicrosecondsAsTime(span.StartTime)) + bulkOperations = append(bulkOperations, bulkItem{span: span, isService: false}) + w.client.AddDataToBulkBuffer(buffer, data, indexName, spanTypeName) + write, err := w.writeService(span, buffer) + if err != nil { + errs = append(errs, err) + // dropped is not increased since this is only service name, the span could be written well + continue + } else if write { + bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true}) + } + } + res, err := w.client.Bulk(bytes.NewReader(buffer.Bytes())) + if err != nil { + errs = append(errs, err) + return len(spans), componenterror.CombineErrors(errs) + } + droppedFromResponse := w.handleResponse(res, bulkOperations) + dropped += droppedFromResponse + return dropped, componenterror.CombineErrors(errs) +} + +func (w *esSpanWriter) handleResponse(blk *esclient.BulkResponse, operationToSpan []bulkItem) int { + numErrors := 0 + for i, d := range blk.Items { + if d.Index.Status > 201 { + numErrors++ + w.logger.Error("Part of the bulk request failed", + zap.String("result", d.Index.Result), + zap.String("error.reason", d.Index.Error.Reason), + zap.String("error.type", d.Index.Error.Type), + zap.String("error.cause.type", d.Index.Error.Cause.Type), + zap.String("error.cause.reason", d.Index.Error.Cause.Reason)) + // TODO return an error or a struct that indicates which spans should be retried + // https://github.com/open-telemetry/opentelemetry-collector/issues/990 + } else { + // passed + bulkOp := operationToSpan[i] + if bulkOp.isService { + cacheKey := hashCode(bulkOp.span.Process.ServiceName, bulkOp.span.OperationName) + w.serviceCache.Put(cacheKey, cacheKey) + } + } + } + return numErrors +} + +func (w *esSpanWriter) writeService(span *dbmodel.Span, buffer *bytes.Buffer) (bool, error) { + cacheKey := hashCode(span.Process.ServiceName, span.OperationName) + if w.serviceCache.Get(cacheKey) != nil { + return false, nil + } + svc := dbmodel.Service{ + ServiceName: span.Process.ServiceName, + OperationName: span.OperationName, + } + data, err := json.Marshal(svc) + if err != nil { + return false, err + } + indexName := w.serviceIndexName.get(model.EpochMicrosecondsAsTime(span.StartTime)) + w.client.AddDataToBulkBuffer(buffer, data, indexName, serviceTypeName) + return true, nil +} + +func hashCode(serviceName, operationName string) string { + h := fnv.New64a() + h.Write([]byte(serviceName)) + h.Write([]byte(operationName)) + return fmt.Sprintf("%x", h.Sum64()) +} + +type bulkItem struct { + // span associated with the bulk operation + span *dbmodel.Span + // isService indicates that this bulk operation is for service index + isService bool +} diff --git a/cmd/opentelemetry/go.mod b/cmd/opentelemetry/go.mod index 32bbe133d17..85d19038ddc 100644 --- a/cmd/opentelemetry/go.mod +++ b/cmd/opentelemetry/go.mod @@ -8,10 +8,13 @@ replace github.com/jaegertracing/jaeger => ./../../ require ( github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f + github.com/elastic/go-elasticsearch/v6 v6.8.10 + github.com/elastic/go-elasticsearch/v7 v7.0.0 github.com/golang/protobuf v1.4.2 // indirect github.com/google/go-cmp v0.5.0 // indirect github.com/imdario/mergo v0.3.9 github.com/jaegertracing/jaeger v1.18.2-0.20200626141145-be17169a4179 + github.com/olivere/elastic v6.2.27+incompatible github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e github.com/shirou/gopsutil v2.20.4+incompatible // indirect github.com/spf13/pflag v1.0.5 diff --git a/cmd/opentelemetry/go.sum b/cmd/opentelemetry/go.sum index 5e2296726ed..4e80c129c9d 100644 --- a/cmd/opentelemetry/go.sum +++ b/cmd/opentelemetry/go.sum @@ -188,6 +188,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= +github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= +github.com/elastic/go-elasticsearch/v7 v7.0.0 h1:IWIAwLEjkuF3EzXsMthrSOX8lETspbsblyPVZz4DNEY= +github.com/elastic/go-elasticsearch/v7 v7.0.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20181003060214-f58a169a71a5/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 49d5b8b29ae..d81417449e5 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -16,11 +16,13 @@ package config import ( + "bufio" "context" "crypto/tls" "errors" "io/ioutil" "net/http" + "os" "path/filepath" "strconv" "strings" @@ -299,52 +301,60 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp Timeout: c.Timeout, } options = append(options, elastic.SetHttpClient(httpClient)) + options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) + transport, err := GetHTTPRoundTripper(c, logger) + if err != nil { + return nil, err + } + httpClient.Transport = transport + return options, nil +} + +// GetHTTPRoundTripper returns configured http.RoundTripper +func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTripper, error) { if c.TLS.Enabled { ctlsConfig, err := c.TLS.Config() if err != nil { return nil, err } - httpClient.Transport = &http.Transport{ + return &http.Transport{ TLSClientConfig: ctlsConfig, + }, nil + } + var transport http.RoundTripper + httpTransport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + // #nosec G402 + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.TLS.SkipHostVerify}, + } + if c.TLS.CAPath != "" { + ctlsConfig, err := c.TLS.Config() + if err != nil { + return nil, err } - } else { - httpTransport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - // #nosec G402 - TLSClientConfig: &tls.Config{InsecureSkipVerify: c.TLS.SkipHostVerify}, - } - if c.TLS.CAPath != "" { - config, err := c.TLS.Config() - if err != nil { - return nil, err - } - httpTransport.TLSClientConfig = config - } + httpTransport.TLSClientConfig = ctlsConfig + transport = httpTransport + } - token := "" - if c.TokenFilePath != "" { - if c.AllowTokenFromContext { - logger.Warn("Token file and token propagation are both enabled, token from file won't be used") - } - tokenFromFile, err := loadToken(c.TokenFilePath) - if err != nil { - return nil, err - } - token = tokenFromFile + token := "" + if c.TokenFilePath != "" { + if c.AllowTokenFromContext { + logger.Warn("Token file and token propagation are both enabled, token from file won't be used") } - - if token != "" || c.AllowTokenFromContext { - httpClient.Transport = &tokenAuthTransport{ - token: token, - allowOverrideFromCtx: c.AllowTokenFromContext, - wrapped: httpTransport, - } - } else { - httpClient.Transport = httpTransport - options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) + tokenFromFile, err := loadToken(c.TokenFilePath) + if err != nil { + return nil, err } + token = tokenFromFile } - return options, nil + if token != "" || c.AllowTokenFromContext { + transport = &tokenAuthTransport{ + token: token, + allowOverrideFromCtx: c.AllowTokenFromContext, + wrapped: httpTransport, + } + } + return transport, nil } // TokenAuthTransport @@ -373,3 +383,23 @@ func loadToken(path string) (string, error) { } return strings.TrimRight(string(b), "\r\n"), nil } + +// LoadTagsFromFile loads tags from a file +func LoadTagsFromFile(filePath string) ([]string, error) { + file, err := os.Open(filepath.Clean(filePath)) + if err != nil { + return nil, err + } + scanner := bufio.NewScanner(file) + var tags []string + for scanner.Scan() { + line := scanner.Text() + if tag := strings.TrimSpace(line); tag != "" { + tags = append(tags, tag) + } + } + if err := file.Close(); err != nil { + return nil, err + } + return tags, nil +} diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 232697a41e9..de75f912240 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -16,11 +16,8 @@ package es import ( - "bufio" "flag" "fmt" - "os" - "path/filepath" "strconv" "strings" @@ -117,25 +114,6 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return reader, nil } -func loadTagsFromFile(filePath string) ([]string, error) { - file, err := os.Open(filepath.Clean(filePath)) - if err != nil { - return nil, err - } - /* #nosec G307 */ - defer file.Close() - - scanner := bufio.NewScanner(file) - var tags []string - for scanner.Scan() { - line := scanner.Text() - if tag := strings.TrimSpace(line); tag != "" { - tags = append(tags, tag) - } - } - return tags, nil -} - // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.IsStorageEnabled() { @@ -182,7 +160,7 @@ func createSpanWriter( var tags []string if cfg.GetTagsFilePath() != "" { var err error - if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil { + if tags, err = config.LoadTagsFromFile(cfg.GetTagsFilePath()); err != nil { logger.Error("Could not open file with tags", zap.Error(err)) return nil, err } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 6ab82caf9d1..cbd9600bb53 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -125,7 +125,7 @@ func TestLoadTagsFromFile(t *testing.T) { } for _, test := range tests { - tags, err := loadTagsFromFile(test.path) + tags, err := escfg.LoadTagsFromFile(test.path) if test.error { require.Error(t, err) assert.Nil(t, tags) diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 407ad8225ee..c5471f1769e 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -60,7 +60,7 @@ func (s *BadgerIntegrationStorage) initialize() error { s.logger = logger // TODO: remove this flag after badger support returning spanKind when get operations - s.notSupportSpanKindWithOperation = true + s.NotSupportSpanKindWithOperation = true return nil } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index d899577f4b8..c55a96b311f 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -85,7 +85,7 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error s.Refresh = s.esRefresh s.esCleanUp(allTagsAsFields, archive) // TODO: remove this flag after ES support returning spanKind when get operations - s.notSupportSpanKindWithOperation = true + s.NotSupportSpanKindWithOperation = true return nil } @@ -170,7 +170,7 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) { s := &ESStorageIntegration{} require.NoError(t, s.initializeES(allTagsAsFields, archive)) - s.Fixtures = loadAndParseQueryTestCases(t, "fixtures/queries_es.json") + s.Fixtures = LoadAndParseQueryTestCases(t, "fixtures/queries_es.json") if archive { t.Run("ArchiveTrace", s.testArchiveTrace) diff --git a/plugin/storage/integration/integration_test.go b/plugin/storage/integration/integration.go similarity index 91% rename from plugin/storage/integration/integration_test.go rename to plugin/storage/integration/integration.go index 03a0c7d29f5..75dc69d3779 100644 --- a/plugin/storage/integration/integration_test.go +++ b/plugin/storage/integration/integration.go @@ -21,8 +21,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "os" - "path/filepath" "strings" "testing" "time" @@ -41,21 +39,7 @@ const ( iterations = 30 ) -func TestParseAllFixtures(t *testing.T) { - fileList := []string{} - err := filepath.Walk("fixtures/traces", func(path string, f os.FileInfo, err error) error { - if !f.IsDir() && strings.HasSuffix(path, ".json") { - fileList = append(fileList, path) - } - return nil - }) - require.NoError(t, err) - for _, file := range fileList { - t.Logf("Parsing %s", file) - getTraceFixtureExact(t, file) - } -} - +// StorageIntegration holds components for storage integration test type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader @@ -63,7 +47,8 @@ type StorageIntegration struct { DependencyReader dependencystore.Reader Fixtures []*QueryFixtures // TODO: remove this flag after all storage plugins returns spanKind with operationNames - notSupportSpanKindWithOperation bool + NotSupportSpanKindWithOperation bool + FixturesPath string // CleanUp() should ensure that the storage backend is clean before another test. // called either before or after each test, and should be idempotent @@ -153,7 +138,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { defer s.cleanUp(t) var expected []spanstore.Operation - if s.notSupportSpanKindWithOperation { + if s.NotSupportSpanKindWithOperation { expected = []spanstore.Operation{ {Name: "example-operation-1"}, {Name: "example-operation-3"}, @@ -208,8 +193,12 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { func (s *StorageIntegration) testFindTraces(t *testing.T) { defer s.cleanUp(t) + fixturesPath := s.FixturesPath + if s.FixturesPath == "" { + fixturesPath = "." + } // Note: all cases include ServiceName + StartTime range - s.Fixtures = append(s.Fixtures, loadAndParseQueryTestCases(t, "fixtures/queries.json")...) + s.Fixtures = append(s.Fixtures, LoadAndParseQueryTestCases(t, fmt.Sprintf("%s/fixtures/queries.json", fixturesPath))...) // Each query test case only specifies matching traces, but does not provide counterexamples. // To improve coverage we get all possible traces and store all of them before running queries. @@ -220,7 +209,7 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { for _, traceFixture := range queryTestCase.ExpectedFixtures { trace, ok := allTraceFixtures[traceFixture] if !ok { - trace = getTraceFixture(t, traceFixture) + trace = s.getTraceFixture(t, traceFixture) err := s.writeTrace(t, trace) require.NoError(t, err, "Unexpected error when writing trace %s to storage", traceFixture) allTraceFixtures[traceFixture] = trace @@ -264,14 +253,14 @@ func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) error } func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model.Trace { - trace := getTraceFixture(t, "example_trace") + trace := s.getTraceFixture(t, "example_trace") err := s.writeTrace(t, trace) require.NoError(t, err, "Not expecting error when writing example_trace to storage") return trace } func (s *StorageIntegration) loadParseAndWriteLargeTrace(t *testing.T) *model.Trace { - trace := getTraceFixture(t, "example_trace") + trace := s.getTraceFixture(t, "example_trace") span := trace.Spans[0] spns := make([]*model.Span, 1, 10008) trace.Spans = spns @@ -287,8 +276,12 @@ func (s *StorageIntegration) loadParseAndWriteLargeTrace(t *testing.T) *model.Tr return trace } -func getTraceFixture(t *testing.T, fixture string) *model.Trace { - fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture) +func (s *StorageIntegration) getTraceFixture(t *testing.T, fixture string) *model.Trace { + fixturesPath := s.FixturesPath + if s.FixturesPath == "" { + fixturesPath = "." + } + fileName := fmt.Sprintf("%s/fixtures/traces/%s.json", fixturesPath, fixture) return getTraceFixtureExact(t, fileName) } @@ -299,19 +292,22 @@ func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { } func loadAndParseJSONPB(t *testing.T, path string, object proto.Message) { + // #nosec inStr, err := ioutil.ReadFile(path) require.NoError(t, err, "Not expecting error when loading fixture %s", path) err = jsonpb.Unmarshal(bytes.NewReader(correctTime(inStr)), object) require.NoError(t, err, "Not expecting error when unmarshaling fixture %s", path) } -func loadAndParseQueryTestCases(t *testing.T, queriesFile string) []*QueryFixtures { +// LoadAndParseQueryTestCases loads and parses query test cases +func LoadAndParseQueryTestCases(t *testing.T, queriesFile string) []*QueryFixtures { var queries []*QueryFixtures loadAndParseJSON(t, queriesFile, &queries) return queries } func loadAndParseJSON(t *testing.T, path string, object interface{}) { + // #nosec inStr, err := ioutil.ReadFile(path) require.NoError(t, err, "Not expecting error when loading fixture %s", path) err = json.Unmarshal(correctTime(inStr), object) @@ -373,6 +369,7 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) { assert.EqualValues(t, expected, actual) } +// IntegrationTestAll runs all integration tests func (s *StorageIntegration) IntegrationTestAll(t *testing.T) { t.Run("GetServices", s.testGetServices) t.Run("GetOperations", s.testGetOperations) diff --git a/plugin/storage/integration/domain_trace_compare_test.go b/plugin/storage/integration/trace_compare.go similarity index 96% rename from plugin/storage/integration/domain_trace_compare_test.go rename to plugin/storage/integration/trace_compare.go index d295fccfaab..7ec924a4da5 100644 --- a/plugin/storage/integration/domain_trace_compare_test.go +++ b/plugin/storage/integration/trace_compare.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) +// CompareSliceOfTraces compares two trace slices func CompareSliceOfTraces(t *testing.T, expected []*model.Trace, actual []*model.Trace) { require.Equal(t, len(expected), len(actual), "Unequal number of expected vs. actual traces") model.SortTraces(expected) @@ -47,6 +48,7 @@ func CompareSliceOfTraces(t *testing.T, expected []*model.Trace, actual []*model } } +// CompareTraces compares two traces func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) { if expected.Spans == nil { require.Nil(t, actual.Spans) diff --git a/scripts/travis/es-integration-test.sh b/scripts/travis/es-integration-test.sh index 63a8d5ee841..4abeb4435df 100755 --- a/scripts/travis/es-integration-test.sh +++ b/scripts/travis/es-integration-test.sh @@ -8,8 +8,12 @@ run_integration_test() { ES_VERSION=$1 docker pull docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} CID=$(docker run --rm -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION}) - STORAGE=elasticsearch make storage-integration-test - make index-cleaner-integration-test + if [ "$ES_OTEL_INTEGRATION_TEST" == true ]; then + make es-otel-exporter-integration-test + else + STORAGE=elasticsearch make storage-integration-test + make index-cleaner-integration-test + fi docker kill $CID } @@ -17,6 +21,11 @@ run_integration_test "5.6.16" run_integration_test "6.8.2" run_integration_test "7.3.0" +if [ "$ES_OTEL_INTEGRATION_TEST" == true ]; then + echo "OpenTelemetry ES exporter test finished, skipping ES script tests and token propagation" + exit 0 +fi + echo "Executing token propatagion test" # Mock UI, needed only for build query service.