Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
shalevr authored Dec 6, 2023
2 parents d12e081 + 4bf3577 commit f4babd5
Show file tree
Hide file tree
Showing 13 changed files with 905 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- 'release/*'
pull_request:
env:
CORE_REPO_SHA: 9831afaff5b4d371fd9a14266ab47884546bd971
CORE_REPO_SHA: 35a021194787359324c46f5ca99d31802e4c92bd

jobs:
build:
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1987](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1987))
- `opentelemetry-instrumentation-httpx` Fix mixing async and non async hooks
([#1920](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1920))
- `opentelemetry-instrumentation-requests` Implement new semantic convention opt-in with stable http semantic conventions
([#2002](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2002))
- `opentelemetry-instrument-grpc` Fix arity of context.abort for AIO RPCs
([#2066](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2066))

### Fixed

- `opentelemetry-instrumentation-urllib`/`opentelemetry-instrumentation-urllib3` Fix metric descriptions to match semantic conventions
([#1959]((https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1959))
([#1959](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1959))

## Version 1.21.0/0.42b0 (2023-11-01)

Expand All @@ -34,6 +38,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording.
([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969))
- Fix version of Flask dependency `werkzeug`
([#1980](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1980))
- `opentelemetry-resource-detector-azure` Using new Cloud Resource ID attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async def decorated_publish(
if not span:
return await publish(message, routing_key, **kwargs)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(message.properties.headers)
propagate.inject(message.properties.headers)
return_value = await publish(message, routing_key, **kwargs)
return return_value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
from typing import Type
from unittest import TestCase, mock, skipIf
from unittest.mock import MagicMock

from aio_pika import Exchange, RobustExchange

Expand Down Expand Up @@ -92,6 +93,36 @@ def test_publish(self):
def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)


@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
Expand Down Expand Up @@ -144,3 +175,33 @@ def test_publish(self):

def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,63 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc
import grpc.aio

from ._server import (
OpenTelemetryServerInterceptor,
_OpenTelemetryServicerContext,
_wrap_rpc_behavior,
)
import wrapt

from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

from ._server import OpenTelemetryServerInterceptor, _wrap_rpc_behavior


# pylint:disable=abstract-method
class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy):
def __init__(self, servicer_context, active_span):
super().__init__(servicer_context)
self._self_active_span = active_span
self._self_code = grpc.StatusCode.OK
self._self_details = None

async def abort(self, code, details="", trailing_metadata=tuple()):
self._self_code = code
self._self_details = details
self._self_active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
return await self.__wrapped__.abort(code, details, trailing_metadata)

def set_code(self, code):
self._self_code = code
details = self._self_details or code.value[1]
self._self_active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
if code != grpc.StatusCode.OK:
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
return self.__wrapped__.set_code(code)

def set_details(self, details):
self._self_details = details
if self._self_code != grpc.StatusCode.OK:
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{self._self_code}:{details}",
)
)
return self.__wrapped__.set_details(details)


class OpenTelemetryAioServerInterceptor(
Expand Down Expand Up @@ -66,7 +116,7 @@ async def _unary_interceptor(request_or_iterator, context):
set_status_on_exception=False,
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)
context = _OpenTelemetryAioServicerContext(context, span)

# And now we run the actual RPC.
try:
Expand All @@ -91,7 +141,7 @@ async def _stream_interceptor(request_or_iterator, context):
context,
set_status_on_exception=False,
) as span:
context = _OpenTelemetryServicerContext(context, span)
context = _OpenTelemetryAioServicerContext(context, span)

try:
async for response in behavior(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ async def run_with_test_server(
channel = grpc.aio.insecure_channel(f"localhost:{port:d}")

await server.start()
resp = await runnable(channel)
await server.stop(1000)

try:
resp = await runnable(channel)
finally:
await server.stop(1000)

return resp

Expand Down Expand Up @@ -514,9 +517,79 @@ async def request(channel):
request = Request(client_id=1, request_data=failure_message)
msg = request.SerializeToString()

with testcase.assertRaises(Exception):
with testcase.assertRaises(grpc.RpcError) as cm:
await channel.unary_unary(rpc_call)(msg)

self.assertEqual(
cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION
)
self.assertEqual(cm.exception.details(), failure_message)

await run_with_test_server(request, servicer=AbortServicer())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)

# make sure this span errored, with the right status and detail
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}",
)

# Check attributes
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[
0
],
},
)

async def test_abort_with_trailing_metadata(self):
"""Check that we can catch an abort properly when trailing_metadata provided"""
rpc_call = "/GRPCTestServer/SimpleMethod"
failure_message = "failure message"

class AbortServicer(GRPCTestServerServicer):
# pylint:disable=C0103
async def SimpleMethod(self, request, context):
metadata = (("meta", "data"),)
await context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
failure_message,
trailing_metadata=metadata,
)

testcase = self

async def request(channel):
request = Request(client_id=1, request_data=failure_message)
msg = request.SerializeToString()

with testcase.assertRaises(grpc.RpcError) as cm:
await channel.unary_unary(rpc_call)(msg)

self.assertEqual(
cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION
)
self.assertEqual(cm.exception.details(), failure_message)

await run_with_test_server(request, servicer=AbortServicer())

spans_list = self.memory_exporter.get_finished_spans()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ def decorated_function(
exchange, routing_key, body, properties, mandatory
)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def test_decorate_basic_publish(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
Expand Down Expand Up @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

Expand Down Expand Up @@ -393,7 +391,55 @@ def test_decorate_basic_publish_with_hook(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish_when_span_is_not_recording(
self,
use_span: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
exchange_name = "test-exchange"
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
publish_hook = mock.MagicMock()

mocked_span = mock.MagicMock()
mocked_span.is_recording.return_value = False
get_span.return_value = mocked_span

decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer, publish_hook
)
retval = decorated_basic_publish(
exchange_name, routing_key, mock_body, properties
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=exchange_name,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
Expand Down
Loading

0 comments on commit f4babd5

Please sign in to comment.