From 0327842983ddfb6430182f80b9d19c0cf2489a17 Mon Sep 17 00:00:00 2001 From: Bulygin Evgeny Date: Mon, 25 Sep 2023 14:15:20 +0500 Subject: [PATCH 1/2] aio-pika instrumentation: Removed check for non-sampled span when inject message headers. Reason to change is that sampled flag can be propagate https://www.w3.org/TR/trace-context/#sampled-flag and be useful when trace is not sampled. --- CHANGELOG.md | 5 ++ .../aio_pika/publish_decorator.py | 3 +- .../tests/test_publish_decorator.py | 59 +++++++++++++++++++ .../instrumentation/pika/utils.py | 11 ++-- .../tests/test_utils.py | 52 +++++++++++++++- 5 files changed, 119 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1206844e55..0b4d22506b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Fixed + +- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording. + ([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969)) + ## Version 1.20.0/0.41b0 (2023-09-01) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py index cae834a031..03937290ee 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py @@ -45,8 +45,7 @@ async def decorated_publish( if not span: return await publish(message, routing_key, **kwargs) with trace.use_span(span, end_on_exit=True): - if span.is_recording(): - propagate.inject(message.properties.headers) + propagate.inject(message.properties.headers) return_value = await publish(message, routing_key, **kwargs) return return_value diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index d5291e07d9..e49a90c1ad 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -14,6 +14,7 @@ import asyncio from typing import Type from unittest import TestCase, mock, skipIf +from unittest.mock import MagicMock from aio_pika import Exchange, RobustExchange @@ -92,6 +93,35 @@ def test_publish(self): def test_robust_publish(self): self._test_publish(RobustExchange) + def _test_publish_works_with_not_recording_span(self, exchange_type): + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + mocked_not_recording_span = MagicMock() + mocked_not_recording_span.is_recording.return_value = False + mock_get_publish_span.return_value = mocked_not_recording_span + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + with mock.patch( + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + mock_inject.assert_called_once() + + def test_publish_works_with_not_recording_span(self): + self._test_publish_works_with_not_recording_span(Exchange) + + def test_publish_works_with_not_recording_span_robust(self): + self._test_publish_works_with_not_recording_span(RobustExchange) + @skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8") class TestInstrumentedExchangeAioRmq8(TestCase): @@ -144,3 +174,32 @@ def test_publish(self): def test_robust_publish(self): self._test_publish(RobustExchange) + + def _test_publish_works_with_not_recording_span(self, exchange_type): + exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + mocked_not_recording_span = MagicMock() + mocked_not_recording_span.is_recording.return_value = False + mock_get_publish_span.return_value = mocked_not_recording_span + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + with mock.patch( + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + mock_inject.assert_called_once() + + def test_publish_works_with_not_recording_span(self): + self._test_publish_works_with_not_recording_span(Exchange) + + def test_publish_works_with_not_recording_span_robust(self): + self._test_publish_works_with_not_recording_span(RobustExchange) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index e9f819f2d6..881149dbac 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -113,12 +113,11 @@ def decorated_function( exchange, routing_key, body, properties, mandatory ) with trace.use_span(span, end_on_exit=True): - if span.is_recording(): - propagate.inject(properties.headers) - try: - publish_hook(span, body, properties) - except Exception as hook_exception: # pylint: disable=W0703 - _LOG.exception(hook_exception) + propagate.inject(properties.headers) + try: + publish_hook(span, body, properties) + except Exception as hook_exception: # pylint: disable=W0703 + _LOG.exception(hook_exception) retval = original_function( exchange, routing_key, body, properties, mandatory ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index 9b1aed7f49..a72e0d4315 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -292,7 +292,6 @@ def test_decorate_basic_publish( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() inject.assert_called_once_with(properties.headers) callback.assert_called_once_with( exchange_name, routing_key, mock_body, properties, False @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() inject.assert_called_once_with(basic_properties.return_value.headers) self.assertEqual(retval, callback.return_value) @@ -393,7 +391,6 @@ def test_decorate_basic_publish_with_hook( use_span.assert_called_once_with( get_span.return_value, end_on_exit=True ) - get_span.return_value.is_recording.assert_called_once() inject.assert_called_once_with(properties.headers) publish_hook.assert_called_once_with( get_span.return_value, mock_body, properties @@ -402,3 +399,52 @@ def test_decorate_basic_publish_with_hook( exchange_name, routing_key, mock_body, properties, False ) self.assertEqual(retval, callback.return_value) + + @mock.patch("opentelemetry.instrumentation.pika.utils._get_span") + @mock.patch("opentelemetry.propagate.inject") + @mock.patch("opentelemetry.trace.use_span") + def test_decorate_basic_publish_when_span_is_not_recording( + self, + use_span: mock.MagicMock, + inject: mock.MagicMock, + get_span: mock.MagicMock, + ) -> None: + callback = mock.MagicMock() + tracer = mock.MagicMock() + channel = mock.MagicMock(spec=Channel) + exchange_name = "test-exchange" + routing_key = "test-routing-key" + properties = mock.MagicMock() + mock_body = b"mock_body" + publish_hook = mock.MagicMock() + + mocked_span = mock.MagicMock() + mocked_span.is_recording.return_value = False + get_span.return_value = mocked_span + + decorated_basic_publish = utils._decorate_basic_publish( + callback, channel, tracer, publish_hook + ) + retval = decorated_basic_publish( + exchange_name, routing_key, mock_body, properties + ) + get_span.assert_called_once_with( + tracer, + channel, + properties, + destination=exchange_name, + span_kind=SpanKind.PRODUCER, + task_name="(temporary)", + operation=None, + ) + use_span.assert_called_once_with( + get_span.return_value, end_on_exit=True + ) + inject.assert_called_once_with(properties.headers) + publish_hook.assert_called_once_with( + get_span.return_value, mock_body, properties + ) + callback.assert_called_once_with( + exchange_name, routing_key, mock_body, properties, False + ) + self.assertEqual(retval, callback.return_value) \ No newline at end of file From 589ea7cd9548ddfa33a4b4c680e9a4b7b0ba7cce Mon Sep 17 00:00:00 2001 From: Bulygin Evgeny Date: Mon, 27 Nov 2023 10:34:19 +0500 Subject: [PATCH 2/2] black formting --- .../tests/test_publish_decorator.py | 14 ++++++++------ .../tests/test_utils.py | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index e49a90c1ad..41cd11d5a6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -96,16 +96,17 @@ def test_robust_publish(self): def _test_publish_works_with_not_recording_span(self, exchange_type): exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) with mock.patch.object( - PublishDecorator, "_get_publish_span" + PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: mocked_not_recording_span = MagicMock() mocked_not_recording_span.is_recording.return_value = False mock_get_publish_span.return_value = mocked_not_recording_span with mock.patch.object( - Exchange, "publish", return_value=asyncio.sleep(0) + Exchange, "publish", return_value=asyncio.sleep(0) ) as mock_publish: with mock.patch( - "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject: + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject" + ) as mock_inject: decorated_publish = PublishDecorator( self.tracer, exchange ).decorate(mock_publish) @@ -178,16 +179,17 @@ def test_robust_publish(self): def _test_publish_works_with_not_recording_span(self, exchange_type): exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME) with mock.patch.object( - PublishDecorator, "_get_publish_span" + PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: mocked_not_recording_span = MagicMock() mocked_not_recording_span.is_recording.return_value = False mock_get_publish_span.return_value = mocked_not_recording_span with mock.patch.object( - Exchange, "publish", return_value=asyncio.sleep(0) + Exchange, "publish", return_value=asyncio.sleep(0) ) as mock_publish: with mock.patch( - "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject: + "opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject" + ) as mock_inject: decorated_publish = PublishDecorator( self.tracer, exchange ).decorate(mock_publish) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index a72e0d4315..ed33593389 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -447,4 +447,4 @@ def test_decorate_basic_publish_when_span_is_not_recording( callback.assert_called_once_with( exchange_name, routing_key, mock_body, properties, False ) - self.assertEqual(retval, callback.return_value) \ No newline at end of file + self.assertEqual(retval, callback.return_value)