Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aio-pika instrumentation: Removed check for non-sampled span when inject message header. #1969

Merged
merged 10 commits into from
Nov 27, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording.
([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969))
- Fix version of Flask dependency `werkzeug`
([#1980](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1980))
- `opentelemetry-resource-detector-azure` Using new Cloud Resource ID attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async def decorated_publish(
if not span:
return await publish(message, routing_key, **kwargs)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(message.properties.headers)
propagate.inject(message.properties.headers)
return_value = await publish(message, routing_key, **kwargs)
return return_value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
from typing import Type
from unittest import TestCase, mock, skipIf
from unittest.mock import MagicMock

from aio_pika import Exchange, RobustExchange

Expand Down Expand Up @@ -92,6 +93,36 @@ def test_publish(self):
def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)


@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
Expand Down Expand Up @@ -144,3 +175,33 @@ def test_publish(self):

def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ def decorated_function(
exchange, routing_key, body, properties, mandatory
)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def test_decorate_basic_publish(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
Expand Down Expand Up @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

Expand Down Expand Up @@ -393,7 +391,55 @@ def test_decorate_basic_publish_with_hook(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish_when_span_is_not_recording(
self,
use_span: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
exchange_name = "test-exchange"
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
publish_hook = mock.MagicMock()

mocked_span = mock.MagicMock()
mocked_span.is_recording.return_value = False
get_span.return_value = mocked_span

decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer, publish_hook
)
retval = decorated_basic_publish(
exchange_name, routing_key, mock_body, properties
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=exchange_name,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
Expand Down
Loading