Skip to content

Commit

Permalink
Merge branch 'main' into audit-and-test-opentelemetry-instrumentation…
Browse files Browse the repository at this point in the history
…-dbapi-no-op-tracer
  • Loading branch information
srikanthccv authored Feb 5, 2023
2 parents a985f1d + 66ceef5 commit d172a55
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 39 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
- Add metric instrumentation for urllib
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x
([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481))
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
- Fix aiohttp bug with unset `trace_configs`
([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
- `opentelemetry-instrumentation-django` Allow explicit `excluded_urls` configuration through `instrument()`
([#1618](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1618))

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

| Instrumentation | Supported Packages | Metrics support |
| --------------- | ------------------ | --------------- |
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [

[project.optional-dependencies]
instruments = [
"aio_pika ~= 7.2.0",
"aio_pika >= 7.2.0, < 9.0.0",
]
test = [
"opentelemetry-instrumentation-aio-pika[instruments]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
from typing import Collection

_instruments: Collection[str] = ("aio_pika ~= 7.2.0",)
_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from opentelemetry.trace import Span, SpanKind, Tracer

_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'}
_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"}


class SpanBuilder:
Expand All @@ -49,18 +49,30 @@ def set_destination(self, destination: str):
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination

def set_channel(self, channel: AbstractChannel):
url = channel.connection.connection.url
self._attributes.update({
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port
})
connection = channel.connection
if getattr(connection, "connection", None):
# aio_rmq 7
url = connection.connection.url
else:
# aio_rmq 8
url = connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port,
}
)

def set_message(self, message: AbstractMessage):
properties = message.properties
if properties.message_id:
self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id
self._attributes[
SpanAttributes.MESSAGING_MESSAGE_ID
] = properties.message_id
if properties.correlation_id:
self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id
self._attributes[
SpanAttributes.MESSAGING_CONVERSATION_ID
] = properties.correlation_id

def build(self) -> Optional[Span]:
if not is_instrumentation_enabled():
Expand All @@ -69,9 +81,11 @@ def build(self) -> Optional[Span]:
self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value
else:
self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True
span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes)
span = self._tracer.start_span(
self._generate_span_name(), kind=self._kind, attributes=self._attributes
)
return span

def _generate_span_name(self) -> str:
operation_value = self._operation.value if self._operation else 'send'
return f'{self._destination} {operation_value}'
operation_value = self._operation.value if self._operation else "send"
return f"{self._destination} {operation_value}"
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
SERVER_URL = URL(
f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/"
)
CONNECTION = Namespace(connection=Namespace(url=SERVER_URL))
CHANNEL = Namespace(connection=CONNECTION, loop=None)
CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL))
CONNECTION_8 = Namespace(url=SERVER_URL)
CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None)
CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None)
MESSAGE = Namespace(
properties=Namespace(
message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import TestCase, mock
from unittest import TestCase, mock, skipIf

from aio_pika import Queue
from aio_pika import Queue, version_info

from opentelemetry.instrumentation.aio_pika.callback_decorator import (
CallbackDecorator,
Expand All @@ -23,7 +23,8 @@
from opentelemetry.trace import SpanKind, get_tracer

from .consts import (
CHANNEL,
CHANNEL_7,
CHANNEL_8,
CORRELATION_ID,
EXCHANGE_NAME,
MESSAGE,
Expand All @@ -35,7 +36,8 @@
)


class TestInstrumentedQueue(TestCase):
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedQueueAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
Expand All @@ -52,7 +54,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)

def test_get_callback_span(self):
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
Expand All @@ -62,7 +64,47 @@ def test_get_callback_span(self):
)

def test_decorate_callback(self):
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
callback = mock.MagicMock(return_value=asyncio.sleep(0))
with mock.patch.object(
CallbackDecorator, "_get_span"
) as mocked_get_callback_span:
callback_decorator = CallbackDecorator(self.tracer, queue)
decorated_callback = callback_decorator.decorate(callback)
self.loop.run_until_complete(decorated_callback(MESSAGE))
mocked_get_callback_span.assert_called_once()
callback.assert_called_once_with(MESSAGE)


@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedQueueAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
}

def setUp(self):
self.tracer = get_tracer(__name__)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def test_get_callback_span(self):
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME} receive",
kind=SpanKind.CONSUMER,
attributes=self.EXPECTED_ATTRIBUTES,
)

def test_decorate_callback(self):
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
callback = mock.MagicMock(return_value=asyncio.sleep(0))
with mock.patch.object(
CallbackDecorator, "_get_span"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.
import asyncio
from typing import Type
from unittest import TestCase, mock
from unittest import TestCase, mock, skipIf

from aio_pika import Exchange, RobustExchange
from aio_pika import Exchange, RobustExchange, version_info

from opentelemetry.instrumentation.aio_pika.publish_decorator import (
PublishDecorator,
Expand All @@ -24,8 +24,10 @@
from opentelemetry.trace import SpanKind, get_tracer

from .consts import (
CHANNEL,
CONNECTION,
CHANNEL_7,
CHANNEL_8,
CONNECTION_7,
CONNECTION_8,
CORRELATION_ID,
EXCHANGE_NAME,
MESSAGE,
Expand All @@ -37,7 +39,8 @@
)


class TestInstrumentedExchange(TestCase):
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
Expand All @@ -54,7 +57,7 @@ def setUp(self):
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
Expand All @@ -66,7 +69,60 @@ def test_get_publish_span(self):
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME)
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()

def test_publish(self):
self._test_publish(Exchange)

def test_robust_publish(self):
self._test_publish(RobustExchange)


@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
}

def setUp(self):
self.tracer = get_tracer(__name__)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def test_get_publish_span(self):
exchange = Exchange(CHANNEL_8, EXCHANGE_NAME)
tracer = mock.MagicMock()
PublishDecorator(tracer, exchange)._get_publish_span(
MESSAGE, ROUTING_KEY
)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME},{ROUTING_KEY} send",
kind=SpanKind.PRODUCER,
attributes=self.EXPECTED_ATTRIBUTES,
)

def _test_publish(self, exchange_type: Type[Exchange]):
exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ class TestBuilder(TestCase):
def test_build(self):
builder = SpanBuilder(get_tracer(__name__))
builder.set_as_consumer()
builder.set_destination('destination')
builder.set_destination("destination")
span = builder.build()
self.assertTrue(isinstance(span, Span))
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ def test_instrument_connection_after_instrument(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

def test_no_op_tracer_provider(self):
cnx = async_call(aiopg.connect(database="test"))
AiopgInstrumentor().instrument_connection(
cnx, tracer_provider=trace_api.NoOpTracerProvider()
)

cursor = async_call(cnx.cursor())
query = "SELECT * FROM test"
async_call(cursor.execute(query))

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

def test_custom_tracer_provider_instrument_connection(self):
resource = resources.Resource.create(
{"service.name": "db-test-service"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind
from opentelemetry.trace import NoOpTracerProvider, SpanKind
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator,
)
Expand Down Expand Up @@ -413,3 +413,12 @@ def test_uninstrument(self):
mock_execute_lambda(MOCK_LAMBDA_API_GATEWAY_HTTP_API_EVENT)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_no_op_tracer_provider(self):
tracer_provider = NoOpTracerProvider()
AwsLambdaInstrumentor().instrument(tracer_provider=tracer_provider)

mock_execute_lambda(MOCK_LAMBDA_API_GATEWAY_HTTP_API_EVENT)
spans = self.memory_exporter.get_finished_spans()
assert spans is not None
self.assertEqual(len(spans), 0)
Loading

0 comments on commit d172a55

Please sign in to comment.