diff --git a/elasticapm/instrumentation/packages/elasticsearch.py b/elasticapm/instrumentation/packages/elasticsearch.py index a8ec64412..02fd91a7f 100644 --- a/elasticapm/instrumentation/packages/elasticsearch.py +++ b/elasticapm/instrumentation/packages/elasticsearch.py @@ -71,9 +71,13 @@ def call(self, module, method, wrapped, instance, args, kwargs): result = wrapped(*args, **kwargs) if hasattr(result, "meta"): # elasticsearch-py 8.x+ status_code = result.meta.status + cluster = result.meta.headers.get("x-found-handling-cluster") else: status_code = result[0] + cluster = result[1].get("x-found-handling-cluster") span.context["http"] = {"status_code": status_code} + if cluster: + span.context["db"] = {"instance": cluster} return result diff --git a/tests/instrumentation/elasticsearch_tests.py b/tests/instrumentation/elasticsearch_tests.py index 09408cbe4..dbdc9a808 100644 --- a/tests/instrumentation/elasticsearch_tests.py +++ b/tests/instrumentation/elasticsearch_tests.py @@ -35,6 +35,7 @@ import json import os import urllib.parse +from unittest import mock from elasticsearch import VERSION as ES_VERSION from elasticsearch import Elasticsearch @@ -126,6 +127,41 @@ def test_ping(instrument, elasticapm_client, elasticsearch): assert span["context"]["http"]["status_code"] == 200 +@pytest.mark.integrationtest +def test_ping_db_instance(instrument, elasticapm_client, elasticsearch): + # some ugly code to smuggle the "x-found-handling-cluster" header into + # the response from Elasticsearch + if hasattr(elasticsearch, "transport"): + pool = elasticsearch.transport.connection_pool.get_connection().pool + else: + pool = elasticsearch._transport.node_pool.get().pool + original_urlopen = pool.urlopen + + def wrapper(*args, **kwargs): + result = original_urlopen(*args, **kwargs) + result.headers.update({"x-found-handling-cluster": "foo"}) + return result + + elasticapm_client.begin_transaction("test") + with mock.patch.object(pool, "urlopen", wraps=wrapper): + result = elasticsearch.ping() + + elasticapm_client.end_transaction("test", "OK") + parsed_url = urllib.parse.urlparse(os.environ["ES_URL"]) + + transaction = elasticapm_client.events[TRANSACTION][0] + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 1 + span = spans[0] + assert span["name"] == "ES HEAD /" + assert span["context"]["destination"] == { + "address": parsed_url.hostname, + "port": parsed_url.port, + "service": {"name": "", "resource": "elasticsearch/foo", "type": ""}, + } + assert span["context"]["db"]["instance"] == "foo" + + @pytest.mark.integrationtest def test_info(instrument, elasticapm_client, elasticsearch): elasticapm_client.begin_transaction("test")