Skip to content

Commit

Permalink
Add native OTEL ES exporter (#2295)
Browse files Browse the repository at this point in the history
* Reimplement OTEL elasticsearch exporter

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix typo in comment

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Jul 1, 2020
1 parent 7c9e1d3 commit 00b6e96
Show file tree
Hide file tree
Showing 26 changed files with 1,980 additions and 98 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ matrix:
- go: "1.14.x"
env:
- ES_INTEGRATION_TEST=true
- go: "1.14.x"
env:
- ES_OTEL_INTEGRATION_TEST=true
- go: "1.14.x"
env:
- KAFKA_INTEGRATION_TEST=true
Expand Down Expand Up @@ -71,6 +74,7 @@ script:
- if [ "$DOCKER" == true ]; then bash ./scripts/travis/upload-all-docker-images.sh ; else echo 'skipping docker upload'; fi
- if [ "$DEPLOY" == true ]; then make build-all-platforms ; else echo 'skipping build-all-platforms'; fi
- if [ "$ES_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/es-integration-test.sh ; else echo 'skipping elastic search integration test'; fi
- if [ "$ES_OTEL_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/es-integration-test.sh ; else echo 'skipping elastic search integration test'; fi
- if [ "$KAFKA_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/kafka-integration-test.sh ; else echo 'skipping kafka integration test'; fi
- if [ "$CASSANDRA_INTEGRATION_TEST" == true ]; then bash ./scripts/travis/cassandra-integration-test.sh ; else echo 'skipping cassandra integration test'; fi
- if [ "$HOTROD" == true ]; then bash ./scripts/travis/hotrod-integration-test.sh ; else echo 'skipping hotrod example'; fi
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ storage-integration-test: go-gen
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) $(STORAGE_PKGS) | $(COLORIZE)"

.PHONY: es-otel-exporter-integration-test
es-otel-exporter-integration-test: go-gen
go clean -testcache
bash -c "set -e; set -o pipefail; cd ${OTEL_COLLECTOR_DIR} && go clean -testcache && $(GOTEST) -tags=integration ./app/exporter/elasticsearchexporter | $(COLORIZE)"

.PHONY: test-compile-es-scripts
test-compile-es-scripts:
docker run --rm -it -v ${PWD}:/tmp/jaeger python:3-alpine /usr/local/bin/python -m py_compile /tmp/jaeger/plugin/storage/es/esRollover.py
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package esclient

import (
"bytes"
"fmt"
"io"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

// ElasticsearchClient exposes Elasticsearch API used by Jaeger
type ElasticsearchClient interface {
// PutTemplate creates index template
PutTemplate(name string, template io.Reader) error
// Bulk submits a bulk request
Bulk(bulkBody io.Reader) (*BulkResponse, error)
// AddDataToBulkBuffer creates bulk item from data, index and typ and adds it to bulkBody
AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string)
}

// BulkResponse is a response returned by Elasticsearch Bulk API
type BulkResponse struct {
Errors bool `json:"errors"`
Items []struct {
Index struct {
ID string `json:"_id"`
Result string `json:"result"`
Status int `json:"status"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Cause struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"caused_by"`
} `json:"error"`
} `json:"index"`
} `json:"items"`
}

// NewElasticsearchClient returns an instance of Elasticsearch client
func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (ElasticsearchClient, error) {
roundTripper, err := config.GetHTTPRoundTripper(&params, logger)
if err != nil {
return nil, err
}
if params.GetVersion() == 0 {
esPing := elasticsearchPing{
username: params.Username,
password: params.Password,
roundTripper: roundTripper,
}
esVersion, err := esPing.getVersion(params.Servers[0])
if err != nil {
return nil, err
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
params.Version = uint(esVersion)
}
switch params.Version {
case 5, 6:
return newElasticsearch6Client(params, roundTripper)
case 7:
return newElasticsearch7Client(params, roundTripper)
default:
return nil, fmt.Errorf("could not create Elasticseach client for version %d", params.Version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package esclient

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

func TestGetClient(t *testing.T) {
tests := []struct {
version int
err string
}{
{
version: 5,
},
{
version: 6,
},
{
version: 7,
},
{
version: 1,
err: "could not create Elasticseach client for version 1",
},
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
client, err := NewElasticsearchClient(config.Configuration{Servers: []string{""}, Version: uint(test.version)}, zap.NewNop())
if test.err == "" {
require.NoError(t, err)
}
switch test.version {
case 5, 6:
assert.IsType(t, &elasticsearch6Client{}, client)
case 7:
assert.IsType(t, &elasticsearch7Client{}, client)
default:
assert.EqualError(t, err, test.err)
assert.Nil(t, client)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package esclient

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"

elasticsearch6 "github.com/elastic/go-elasticsearch/v6"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

const (
bulkES6MetaFormat = `{"index":{"_index":"%s","_type":"%s"}}` + "\n"
)

type elasticsearch6Client struct {
client *elasticsearch6.Client
}

var _ ElasticsearchClient = (*elasticsearch6Client)(nil)

func newElasticsearch6Client(params config.Configuration, roundTripper http.RoundTripper) (*elasticsearch6Client, error) {
client, err := elasticsearch6.NewClient(elasticsearch6.Config{
DiscoverNodesOnStart: params.Sniffer,
Addresses: params.Servers,
Username: params.Username,
Password: params.Password,
Transport: roundTripper,
})
if err != nil {
return nil, err
}
return &elasticsearch6Client{
client: client,
}, nil
}

func (es *elasticsearch6Client) PutTemplate(name string, body io.Reader) error {
resp, err := es.client.Indices.PutTemplate(name, body)
if err != nil {
return err
}
resp.Body.Close()
return nil
}

func (es *elasticsearch6Client) AddDataToBulkBuffer(buffer *bytes.Buffer, data []byte, index, typ string) {
meta := []byte(fmt.Sprintf(bulkES6MetaFormat, index, typ))
buffer.Grow(len(meta) + len(data) + len("\n"))
buffer.Write(meta)
buffer.Write(data)
buffer.Write([]byte("\n"))
}

func (es *elasticsearch6Client) Bulk(reader io.Reader) (*BulkResponse, error) {
response, err := es.client.Bulk(reader)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode >= 400 {
return nil, fmt.Errorf("bulk request failed with code %d", response.StatusCode)
}
var blk BulkResponse
err = json.NewDecoder(response.Body).Decode(&blk)
if err != nil {
return nil, err
}
return &blk, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package esclient

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

type mockTransport struct {
Response *http.Response
RoundTripFn func(req *http.Request) (*http.Response, error)
}

func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.RoundTripFn(req)
}

func TestES6NewClient_err(t *testing.T) {
client, err := newElasticsearch6Client(config.Configuration{
Sniffer: true,
Servers: []string{"$%"},
}, &http.Transport{})
require.Error(t, err)
assert.Nil(t, client)
}

func TestES6PutTemplateES6Client(t *testing.T) {
mocktrans := &mockTransport{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(`{}`)),
},
}
mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil }
client, err := newElasticsearch6Client(config.Configuration{}, mocktrans)
require.NoError(t, err)
assert.NotNil(t, client)
err = client.PutTemplate("foo", strings.NewReader("bar"))
require.NoError(t, err)
}

func TestES6AddDataToBulk(t *testing.T) {
client, err := newElasticsearch6Client(config.Configuration{}, &http.Transport{})
require.NoError(t, err)
assert.NotNil(t, client)

buf := &bytes.Buffer{}
client.AddDataToBulkBuffer(buf, []byte("data"), "foo", "bar")
assert.Equal(t, "{\"index\":{\"_index\":\"foo\",\"_type\":\"bar\"}}\ndata\n", buf.String())
}

func TestES6Bulk(t *testing.T) {
tests := []struct {
resp *http.Response
err string
}{
{
resp: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("{}")),
},
},
{
resp: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("{#}")),
},
err: "looking for beginning of object key string",
},
{
resp: &http.Response{
StatusCode: http.StatusBadRequest,
Body: ioutil.NopCloser(strings.NewReader("{#}")),
},
err: "bulk request failed with code 400",
},
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
mocktrans := &mockTransport{
Response: test.resp,
}
mocktrans.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mocktrans.Response, nil }

client, err := newElasticsearch6Client(config.Configuration{}, mocktrans)
require.NoError(t, err)
assert.NotNil(t, client)
_, err = client.Bulk(strings.NewReader("data"))
if test.err != "" {
fmt.Println()
assert.Contains(t, err.Error(), test.err)
} else {
require.NoError(t, err)
}
})
}
}
Loading

0 comments on commit 00b6e96

Please sign in to comment.