Skip to content

Commit

Permalink
add context.db.instance to elasticsearch spans
Browse files Browse the repository at this point in the history
The cluster name is taken as instance. This only
works for Elastic Cloud clusters, as they expose
the cluster name in form of a header.

closes elastic#1586
  • Loading branch information
beniwohli committed Oct 19, 2022
1 parent fc7064f commit 8600b1e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
4 changes: 4 additions & 0 deletions elasticapm/instrumentation/packages/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ def call(self, module, method, wrapped, instance, args, kwargs):
result = wrapped(*args, **kwargs)
if hasattr(result, "meta"): # elasticsearch-py 8.x+
status_code = result.meta.status
cluster = result.meta.headers.get("x-found-handling-cluster")
else:
status_code = result[0]
cluster = result[1].get("x-found-handling-cluster")
span.context["http"] = {"status_code": status_code}
if cluster:
span.context["db"] = {"instance": cluster}

return result

Expand Down
36 changes: 36 additions & 0 deletions tests/instrumentation/elasticsearch_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import json
import os
import urllib.parse
from unittest import mock

from elasticsearch import VERSION as ES_VERSION
from elasticsearch import Elasticsearch
Expand Down Expand Up @@ -126,6 +127,41 @@ def test_ping(instrument, elasticapm_client, elasticsearch):
assert span["context"]["http"]["status_code"] == 200


@pytest.mark.integrationtest
def test_ping_db_instance(instrument, elasticapm_client, elasticsearch):
# some ugly code to smuggle the "x-found-handling-cluster" header into
# the response from Elasticsearch
if hasattr(elasticsearch, "transport"):
pool = elasticsearch.transport.connection_pool.get_connection().pool
else:
pool = elasticsearch._transport.node_pool.get().pool
original_urlopen = pool.urlopen

def wrapper(*args, **kwargs):
result = original_urlopen(*args, **kwargs)
result.headers.update({"x-found-handling-cluster": "foo"})
return result

elasticapm_client.begin_transaction("test")
with mock.patch.object(pool, "urlopen", wraps=wrapper):
result = elasticsearch.ping()

elasticapm_client.end_transaction("test", "OK")
parsed_url = urllib.parse.urlparse(os.environ["ES_URL"])

transaction = elasticapm_client.events[TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
span = spans[0]
assert span["name"] == "ES HEAD /"
assert span["context"]["destination"] == {
"address": parsed_url.hostname,
"port": parsed_url.port,
"service": {"name": "", "resource": "elasticsearch/foo", "type": ""},
}
assert span["context"]["db"]["instance"] == "foo"


@pytest.mark.integrationtest
def test_info(instrument, elasticapm_client, elasticsearch):
elasticapm_client.begin_transaction("test")
Expand Down

0 comments on commit 8600b1e

Please sign in to comment.