diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/README.rst b/instrumentation/opentelemetry-instrumentation-asyncio/README.rst index 5749379690..35934f937c 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/README.rst +++ b/instrumentation/opentelemetry-instrumentation-asyncio/README.rst @@ -8,36 +8,104 @@ OpenTelemetry asyncio Instrumentation AsyncioInstrumentor: Tracing Requests Made by the Asyncio Library -Primary Use Case: ------------------ -1. Performance and Error Monitoring: -The AsyncioInstrumentor tool offers significant advantages for developers and system administrators. It's designed to monitor real-time performance bottlenecks and catch exceptions within specific asynchronous tasks. -When It's Not Ideal to Use AsyncioInstrumentor: ------------------------------------------------- -1. Frameworks with Built-in Instrumentation: -If you're utilizing a framework like aiohttp that already includes built-in instrumentation, you might not need this library. In such cases, leveraging the built-in tools of the framework is generally more beneficial than using external ones like AsyncioInstrumentor. +The opentelemetry-instrumentation-asycnio package allows tracing asyncio applications. +The metric for coroutine, future, is generated even if there is no setting to generate a span. -2. Libraries Lacking Instrumentation: -Should you employ a library that isn't inherently instrumented, AsyncioInstrumentor can step in to fill that gap. -3. Concerns about Overhead: -Tracing each task and future consistently can lead to added overhead. As a best practice, it's wise to enable tracing only when crucial, especially during the development stage. +Set the name of the coroutine you want to trace. +----------------------------------------------- +.. code:: + export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=coro_name,coro_name2,coro_name3 -Example -------- +If you want to keep track of which function to use in the to_thread function of asyncio, set the name of the function. +---- +.. code:: + export OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE=func_name,func_name2,func_name3 + +For future, set it up like this +---- +.. code:: + export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true + +Run instrumented taskcoroutine +---- +1. coroutine +---- .. code:: python + # export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=sleep + + import asyncio from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor + AsyncioInstrumentor().instrument() + async def main(): + await asyncio.create_task(asyncio.sleep(0.1)) + + asyncio.run(main()) + +2. future +---- +.. code:: python + + # export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true + + loop = asyncio.get_event_loop() + + future = asyncio.Future() + future.set_result(1) + task = asyncio.ensure_future(future) + loop.run_until_complete(task) + +3. to_thread +---- +.. code:: python + + # export OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE=func + import asyncio + from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor + + AsyncioInstrumentor().instrument() async def main(): - await asyncio.create_task(asyncio.sleep(0.1)) + await asyncio.to_thread(func) + + def func(): + pass asyncio.run(main()) + +asyncio metric types +------- + +* `asyncio.futures.duration` (ms) - Duration of the future +* `asyncio.futures.exceptions` (count) - Number of exceptions raised by the future +* `asyncio.futures.cancelled` (count) - Number of futures cancelled +* `asyncio.futures.created` (count) - Number of futures created +* `asyncio.futures.active` (count) - Number of futures active +* `asyncio.futures.finished` (count) - Number of futures finished +* `asyncio.futures.timeouts` (count) - Number of futures timed out + +* `asyncio.coroutine.duration` (ms) - Duration of the coroutine +* `asyncio.coroutine.exceptions` (count) - Number of exceptions raised by the coroutine +* `asyncio.coroutine.created` (count) - Number of coroutines created +* `asyncio.coroutine.active` (count) - Number of coroutines active +* `asyncio.coroutine.finished` (count) - Number of coroutines finished +* `asyncio.coroutine.timeouts` (count) - Number of coroutines timed out +* `asyncio.coroutine.cancelled` (count) - Number of coroutines cancelled + +* `asyncio.to_thread.duration` (ms) - Duration of the to_thread +* `asyncio.to_thread.exceptions` (count) - Number of exceptions raised by the to_thread +* `asyncio.to_thread.created` (count) - Number of to_thread created +* `asyncio.to_thread.active` (count) - Number of to_thread active +* `asyncio.to_thread.finished` (count) - Number of to_thread finished + + + API --- diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py index f6c2afd41b..dc0643c9f0 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py @@ -14,10 +14,32 @@ """ .. asyncio: https://github.com/python/asyncio -Usage ------ +The opentelemetry-instrumentation-asycnio package allows tracing asyncio applications. +The metric for coroutine, future, is generated even if there is no setting to generate a span. -.. code-block:: python + +Set the name of the coroutine you want to trace. +----------------------------------------------- +.. code:: + export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=coro_name,coro_name2,coro_name3 + +If you want to keep track of which function to use in the to_thread function of asyncio, set the name of the function. +---- +.. code:: + export OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE=func_name,func_name2,func_name3 + +For future, set it up like this +---- +.. code:: + export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true + +Run instrumented taskcoroutine +---- +1. coroutine +---- +.. code:: python + + # export OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE=sleep import asyncio from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor @@ -29,19 +51,83 @@ async def main(): asyncio.run(main()) +2. future +---- +.. code:: python + + # export OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED=true + + loop = asyncio.get_event_loop() + + future = asyncio.Future() + future.set_result(1) + task = asyncio.ensure_future(future) + loop.run_until_complete(task) + +3. to_thread +---- +.. code:: python + + # export OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE=func + + import asyncio + from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor + + AsyncioInstrumentor().instrument() + + async def main(): + await asyncio.to_thread(func) + + def func(): + pass + + asyncio.run(main()) + + +asyncio metric types +------- + +* `asyncio.futures.duration` (ms) - Duration of the future +* `asyncio.futures.exceptions` (count) - Number of exceptions raised by the future +* `asyncio.futures.cancelled` (count) - Number of futures cancelled +* `asyncio.futures.created` (count) - Number of futures created +* `asyncio.futures.active` (count) - Number of futures active +* `asyncio.futures.finished` (count) - Number of futures finished +* `asyncio.futures.timeouts` (count) - Number of futures timed out + +* `asyncio.coroutine.duration` (ms) - Duration of the coroutine +* `asyncio.coroutine.exceptions` (count) - Number of exceptions raised by the coroutine +* `asyncio.coroutine.created` (count) - Number of coroutines created +* `asyncio.coroutine.active` (count) - Number of coroutines active +* `asyncio.coroutine.finished` (count) - Number of coroutines finished +* `asyncio.coroutine.timeouts` (count) - Number of coroutines timed out +* `asyncio.coroutine.cancelled` (count) - Number of coroutines cancelled + +* `asyncio.to_thread.duration` (ms) - Duration of the to_thread +* `asyncio.to_thread.exceptions` (count) - Number of exceptions raised by the to_thread +* `asyncio.to_thread.created` (count) - Number of to_thread created +* `asyncio.to_thread.active` (count) - Number of to_thread active +* `asyncio.to_thread.finished` (count) - Number of to_thread finished + + API --- """ import asyncio import sys from asyncio import futures +from timeit import default_timer from typing import Collection +from opentelemetry.metrics import get_meter from opentelemetry.trace import get_tracer from opentelemetry.trace.status import Status, StatusCode from wrapt import wrap_function_wrapper as _wrap +from opentelemetry.instrumentation.asyncio.metrics import * from opentelemetry.instrumentation.asyncio.package import _instruments +from opentelemetry.instrumentation.asyncio.utils import get_coros_to_trace, get_future_trace_enabled, \ + get_to_thread_to_trace from opentelemetry.instrumentation.asyncio.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap @@ -67,7 +153,33 @@ class AsyncioInstrumentor(BaseInstrumentor): def __init__(self): super().__init__() + self.to_thread_duration_metric = None + self.to_thread_exception_metric = None + self.to_thread_active_metric = None + self.to_thread_created_metric = None + self.to_thread_finished_metric = None + + self.coro_duration_metric = None + self.coro_exception_metric = None + self.coro_cancelled_metric = None + self.coro_active_metric = None + self.coro_created_metric = None + self.coro_finished_metric = None + self.coro_timeout_metric = None + + self.future_duration_metric = None + self.future_exception_metric = None + self.future_cancelled_metric = None + self.future_active_metric = None + self.future_created_metric = None + self.future_finished_metric = None + self.future_timeout_metric = None + self._tracer = None + self._meter = None + self._coros_name_to_trace: set = set() + self._to_thread_name_to_trace: set = set() + self._future_active_enabled: bool = False def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -77,6 +189,18 @@ def _instrument(self, **kwargs): self._tracer = get_tracer( __name__, __version__, tracer_provider ) + self._meter = get_meter( + __name__, __version__, kwargs.get("meter_provider") + ) + + self._coros_name_to_trace = get_coros_to_trace() + self._future_active_enabled = get_future_trace_enabled() + self._to_thread_name_to_trace = get_to_thread_to_trace() + + self.create_coro_metric() + self.create_future_metric() + self.create_to_thread_metric() + for method in self.methods_with_coroutine: self.instrument_method_with_coroutine(method) @@ -140,7 +264,7 @@ def wrap_to_thread(method, instance, args, kwargs): if args: first_arg = args[0] # Wrap the first argument - wrapped_first_arg = self.trace_func(first_arg) + wrapped_first_arg = self.trace_to_thread(first_arg) wrapped_args = (wrapped_first_arg,) + args[1:] return method(*wrapped_args, **kwargs) @@ -173,15 +297,31 @@ def uninstrument_taskgroup_create_task(self): return unwrap(asyncio.TaskGroup, "create_task") - def trace_func(self, func): + def trace_to_thread(self, func): """Trace a function.""" - with self._tracer.start_as_current_span(f"{ASYNCIO_PREFIX}to_thread_func-" + func.__name__) as span: - try: - return func - except Exception as exc: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(exc) - raise + start = default_timer() + self.to_thread_created_metric.add(1) + self.to_thread_active_metric.add(1) + span = self._tracer.start_span( + f"{ASYNCIO_PREFIX}to_thread_func-" + func.__name__) if func.__name__ in self._to_thread_name_to_trace else None + exception = None + try: + return func + except Exception as exc: + exception_attr = {ASYNCIO_EXCEPTIONS_NAME: exc.__class__.__name__} + exception = exc + self.to_thread_exception_metric.add(1, exception_attr) + raise + finally: + duration = max(round((default_timer() - start) * 1000), 0) + self.to_thread_duration_metric.record(duration) + self.to_thread_finished_metric.add(1) + self.to_thread_active_metric.add(-1) + if span: + if span.is_recording() and exception: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(exception) + span.end() def trace_item(self, coro_or_future): """Trace a coroutine or future item.""" @@ -195,39 +335,171 @@ def trace_item(self, coro_or_future): return coro_or_future async def trace_coroutine(self, coro): - - with self._tracer.start_as_current_span(f"{ASYNCIO_PREFIX}coro-" + coro.__name__) as span: - exception = None # Initialize the exception variable - try: - return await coro - # CancelledError is raised when a coroutine is cancelled - # before it has a chance to run. We don't want to record - # this as an error. - except asyncio.CancelledError: - pass - except (asyncio.TimeoutError, - asyncio.InvalidStateError, - asyncio.SendfileNotAvailableError, - asyncio.IncompleteReadError, - asyncio.LimitOverrunError, - asyncio.BrokenBarrierError, - Exception) as exc: # General exception should be the last - exception = exc - raise - finally: + start = default_timer() + coro_attr = { + ASYNCIO_COROUTINE_NAME: coro.__name__, + } + self.coro_created_metric.add(1, coro_attr) + self.coro_active_metric.add(1, coro_attr) + + span = self._tracer.start_span( + f"{ASYNCIO_PREFIX}coro-" + coro.__name__) if coro.__name__ in self._coros_name_to_trace else None + + exception = None + try: + return await coro + # CancelledError is raised when a coroutine is cancelled + # before it has a chance to run. We don't want to record + # this as an error. + except asyncio.CancelledError: + self.coro_cancelled_metric.add(1, coro_attr) + except asyncio.TimeoutError: + self.coro_timeout_metric.add(1, coro_attr) + raise + except Exception as exc: + exception = exc + coro_exception_attr = coro_attr.copy() + coro_exception_attr[ASYNCIO_EXCEPTIONS_NAME] = exc.__class__.__name__ + self.coro_exception_metric.add(1, coro_exception_attr) + raise + finally: + duration = max(round((default_timer() - start) * 1000), 0) + self.coro_duration_metric.record(duration, coro_attr) + self.coro_finished_metric.add(1, coro_attr) + self.coro_active_metric.add(-1, coro_attr) + + if span: if span.is_recording() and exception: span.set_status(Status(StatusCode.ERROR)) span.record_exception(exception) + span.end() def trace_future(self, future): - span = self._tracer.start_span(f"{ASYNCIO_PREFIX}" + future.__class__.__name__) + start = default_timer() + self.future_created_metric.add(1) + self.future_active_metric.add(1) + span = self._tracer.start_span(f"{ASYNCIO_PREFIX}future") if self._future_active_enabled else None def callback(f): exception = f.exception() - if exception: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(exception) - span.end() + if isinstance(exception, asyncio.CancelledError): + self.future_cancelled_metric.add(1) + elif isinstance(exception, asyncio.TimeoutError): + self.future_timeout_metric.add(1) + elif exception: + exception_attr = {ASYNCIO_EXCEPTIONS_NAME: exception.__class__.__name__} + self.future_exception_metric.add(1, exception_attr) + + duration = max(round((default_timer() - start) * 1000), 0) + self.future_duration_metric.record(duration) + self.future_finished_metric.add(1) + self.future_active_metric.add(-1) + if span: + if span.is_recording() and exception: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(exception) + span.end() future.add_done_callback(callback) return future + + def create_coro_metric(self): + self.coro_duration_metric = self._meter.create_histogram( + name=ASYNCIO_COROUTINE_DURATION, + description="Duration of asyncio coroutine", + unit="ms", + ) + self.coro_exception_metric = self._meter.create_counter( + name=ASYNCIO_COROUTINE_EXCEPTIONS, + description="Number of exceptions in asyncio coroutine", + unit="1", + ) + self.coro_cancelled_metric = self._meter.create_counter( + name=ASYNCIO_COROUTINE_CANCELLED, + description="Number of asyncio coroutine cancelled", + unit="1", + ) + self.coro_active_metric = self._meter.create_up_down_counter( + name=ASYNCIO_COROUTINE_ACTIVE, + description="Number of asyncio coroutine active", + unit="1", + ) + self.coro_created_metric = self._meter.create_counter( + name=ASYNCIO_COROUTINE_CREATED, + description="Number of asyncio coroutine created", + unit="1", + ) + self.coro_finished_metric = self._meter.create_counter( + name=ASYNCIO_COROUTINE_FINISHED, + description="Number of asyncio coroutine finished", + unit="1", + ) + self.coro_timeout_metric = self._meter.create_counter( + name=ASYNCIO_COROUTINE_TIMEOUTS, + description="Number of asyncio coroutine timeouts", + unit="1", + ) + + def create_future_metric(self): + self.future_duration_metric = self._meter.create_histogram( + name=ASYNCIO_FUTURES_DURATION, + description="Duration of asyncio future", + unit="ms", + ) + self.future_exception_metric = self._meter.create_counter( + name=ASYNCIO_FUTURES_EXCEPTIONS, + description="Number of exceptions in asyncio future", + unit="1", + ) + self.future_cancelled_metric = self._meter.create_counter( + name=ASYNCIO_FUTURES_CANCELLED, + description="Number of asyncio future cancelled", + unit="1", + ) + self.future_created_metric = self._meter.create_counter( + name=ASYNCIO_FUTURES_CREATED, + description="Number of asyncio future created", + unit="1", + ) + self.future_active_metric = self._meter.create_up_down_counter( + name=ASYNCIO_FUTURES_ACTIVE, + description="Number of asyncio future active", + unit="1", + ) + self.future_finished_metric = self._meter.create_counter( + name=ASYNCIO_FUTURES_FINISHED, + description="Number of asyncio future finished", + unit="1", + ) + self.future_timeout_metric = self._meter.create_counter( + name=ASYNCIO_FUTURES_TIMEOUTS, + description="Number of asyncio future timeouts", + unit="1", + ) + + def create_to_thread_metric(self): + self.to_thread_duration_metric = self._meter.create_histogram( + name=ASYNCIO_TO_THREAD_DURATION, + description="Duration of asyncio function", + unit="ms", + ) + self.to_thread_exception_metric = self._meter.create_counter( + name=ASYNCIO_TO_THREAD_EXCEPTIONS, + description="Number of exceptions in asyncio function", + unit="1", + ) + self.to_thread_created_metric = self._meter.create_counter( + name=ASYNCIO_TO_THREAD_CREATED, + description="Number of asyncio function created", + unit="1", + ) + self.to_thread_active_metric = self._meter.create_up_down_counter( + name=ASYNCIO_TO_THREAD_ACTIVE, + description="Number of asyncio function active", + unit="1", + ) + self.to_thread_finished_metric = self._meter.create_counter( + name=ASYNCIO_TO_THREAD_FINISHED, + description="Number of asyncio function finished", + unit="1", + ) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/environment_variables.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/environment_variables.py new file mode 100644 index 0000000000..4e64eb5d73 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/environment_variables.py @@ -0,0 +1,28 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Enter the names of the coroutines to be traced through the environment variable below, separated by commas. +""" +OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE = "OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE" + +""" +To determines whether the tracing feature for Future of Asyncio in Python is enabled or not. +""" +OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED = "OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED" + +""" +Enter the names of the functions to be traced through the environment variable below, separated by commas. +""" +OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE = "OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE" diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/metrics.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/metrics.py new file mode 100644 index 0000000000..90551bf873 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/metrics.py @@ -0,0 +1,65 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ASYNCIO_COROUTINE_DURATION = "asyncio.coroutine.duration" +ASYNCIO_COROUTINE_EXCEPTIONS = "asyncio.coroutine.exceptions" +ASYNCIO_COROUTINE_CANCELLED = "asyncio.coroutine.cancelled" +ASYNCIO_COROUTINE_ACTIVE = "asyncio.coroutine.active" +ASYNCIO_COROUTINE_CREATED = "asyncio.coroutine.created" +ASYNCIO_COROUTINE_FINISHED = "asyncio.coroutine.finished" +ASYNCIO_COROUTINE_TIMEOUTS = "asyncio.coroutine.timeouts" + +ASYNCIO_COROUTINE_NAME = "coroutine.name" +ASYNCIO_EXCEPTIONS_NAME = "exceptions.name" + +ASYNCIO_FUTURES_DURATION = "asyncio.futures.duration" +ASYNCIO_FUTURES_EXCEPTIONS = "asyncio.futures.exceptions" +ASYNCIO_FUTURES_CANCELLED = "asyncio.futures.cancelled" +ASYNCIO_FUTURES_CREATED = "asyncio.futures.created" +ASYNCIO_FUTURES_ACTIVE = "asyncio.futures.active" +ASYNCIO_FUTURES_FINISHED = "asyncio.futures.finished" +ASYNCIO_FUTURES_TIMEOUTS = "asyncio.futures.timeouts" + +ASYNCIO_TO_THREAD_DURATION = "asyncio.to_thread.duration" +ASYNCIO_TO_THREAD_EXCEPTIONS = "asyncio.to_thread.exceptions" +ASYNCIO_TO_THREAD_CREATED = "asyncio.to_thread.created" +ASYNCIO_TO_THREAD_ACTIVE = "asyncio.to_thread.active" +ASYNCIO_TO_THREAD_FINISHED = "asyncio.to_thread.finished" + +__all__ = [ + "ASYNCIO_COROUTINE_DURATION", + "ASYNCIO_COROUTINE_EXCEPTIONS", + "ASYNCIO_COROUTINE_CANCELLED", + "ASYNCIO_COROUTINE_ACTIVE", + "ASYNCIO_COROUTINE_CREATED", + "ASYNCIO_COROUTINE_FINISHED", + "ASYNCIO_COROUTINE_TIMEOUTS", + + "ASYNCIO_COROUTINE_NAME", + "ASYNCIO_EXCEPTIONS_NAME", + + "ASYNCIO_FUTURES_DURATION", + "ASYNCIO_FUTURES_EXCEPTIONS", + "ASYNCIO_FUTURES_CANCELLED", + "ASYNCIO_FUTURES_CREATED", + "ASYNCIO_FUTURES_ACTIVE", + "ASYNCIO_FUTURES_FINISHED", + "ASYNCIO_FUTURES_TIMEOUTS", + + "ASYNCIO_TO_THREAD_DURATION", + "ASYNCIO_TO_THREAD_EXCEPTIONS", + "ASYNCIO_TO_THREAD_CREATED", + "ASYNCIO_TO_THREAD_ACTIVE", + "ASYNCIO_TO_THREAD_FINISHED", +] diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py new file mode 100644 index 0000000000..133c4ad47a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py @@ -0,0 +1,53 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE, \ + OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED, OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE + + +def separate_coro_names_by_comma(coro_names) -> set: + """ + Function to separate the coroutines to be traced by comma + """ + if coro_names is None: + return set() + return set(coro_name.strip() for coro_name in coro_names.split(",")) + + +def get_coros_to_trace() -> set: + """ + Function to get the coroutines to be traced from the environment variable + """ + coro_names = os.getenv(OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE) + return separate_coro_names_by_comma(coro_names) + + +def get_future_trace_enabled() -> bool: + """ + Function to get the future active enabled flag from the environment variable + default value is False + """ + return os.getenv(OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED, "False").lower() == "true" + + +def get_to_thread_to_trace() -> set: + """ + Function to get the functions to be traced from the environment variable + """ + func_names = os.getenv(OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE) + return separate_coro_names_by_comma(func_names) + + +__all__ = ["get_coros_to_trace", "get_future_trace_enabled", "get_to_thread_to_trace"] diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py index c8419ecc02..9829db50a4 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py @@ -12,15 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +from unittest.mock import patch from opentelemetry.test.test_base import TestBase -from opentelemetry.trace import get_tracer +from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor, ASYNCIO_COROUTINE_CANCELLED, \ + ASYNCIO_COROUTINE_DURATION, ASYNCIO_COROUTINE_ACTIVE, ASYNCIO_COROUTINE_CREATED, ASYNCIO_COROUTINE_FINISHED +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE from .common_test_func import cancellation_create_task -from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor class TestAsyncioCancel(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "cancellation_coro, cancellable_coroutine" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() @@ -33,6 +41,23 @@ def tearDown(self): AsyncioInstrumentor().uninstrument() def test_cancel(self): - asyncio.run(cancellation_create_task()) + with self._tracer.start_as_current_span("root", kind=SpanKind.SERVER): + asyncio.run(cancellation_create_task()) spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 2) + self.assertEqual(len(spans), 3) + self.assertEqual(spans[0].context.trace_id, spans[1].context.trace_id) + self.assertEqual(spans[2].context.trace_id, spans[1].context.trace_id) + + self.assertEqual(spans[0].name, 'asyncio.coro-cancellable_coroutine') + self.assertEqual(spans[1].name, 'asyncio.coro-cancellation_coro') + for metric in self.memory_metrics_reader.get_metrics_data().resource_metrics[0].scope_metrics[0].metrics: + if metric.name == ASYNCIO_COROUTINE_CANCELLED: + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == ASYNCIO_COROUTINE_DURATION: + self.assertNotEquals(metric.data.data_points[0].min, 0) + elif metric.name == ASYNCIO_COROUTINE_ACTIVE: + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == ASYNCIO_COROUTINE_CREATED: + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == ASYNCIO_COROUTINE_FINISHED: + self.assertEqual(metric.data.data_points[0].value, 1) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py index 5d1edb40a2..c98f405759 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py @@ -12,14 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer +from common_test_func import factorial from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE class TestAsyncioCreateTask(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "sleep" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() @@ -34,7 +42,12 @@ def tearDown(self): def test_asyncio_create_task(self): async def async_func(): await asyncio.create_task(asyncio.sleep(0)) + await asyncio.create_task(factorial("A", 3)) asyncio.run(async_func()) spans = self.memory_exporter.get_finished_spans() + """ + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "sleep" + """ self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "asyncio.coro-sleep") diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_ensure_future.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_ensure_future.py index 42ed1a1c29..b45d768aea 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_ensure_future.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_ensure_future.py @@ -12,16 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +from unittest.mock import patch import pytest +from opentelemetry.sdk.metrics._internal.point import HistogramDataPoint, NumberDataPoint from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer -from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor, ASYNCIO_FUTURES_DURATION, \ + ASYNCIO_FUTURES_CANCELLED, ASYNCIO_FUTURES_CREATED, ASYNCIO_FUTURES_ACTIVE, ASYNCIO_FUTURES_EXCEPTIONS, \ + ASYNCIO_FUTURES_FINISHED, ASYNCIO_FUTURES_TIMEOUTS +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED from .common_test_func import async_func +_expected_metric_names = [ + ASYNCIO_FUTURES_DURATION, + ASYNCIO_FUTURES_EXCEPTIONS, + ASYNCIO_FUTURES_CANCELLED, + ASYNCIO_FUTURES_CREATED, + ASYNCIO_FUTURES_ACTIVE, + ASYNCIO_FUTURES_FINISHED, + ASYNCIO_FUTURES_TIMEOUTS +] + class TestAsyncioEnsureFuture(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED: "true" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() @@ -35,23 +55,41 @@ def tearDown(self): @pytest.mark.asyncio def test_asyncio_loop_ensure_future(self): + """ + async_func is not traced because it is not set in the environment variable + """ loop = asyncio.get_event_loop() task = asyncio.ensure_future(async_func()) loop.run_until_complete(task) spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual(spans[0].name, "asyncio.coro-async_func") + self.assertEqual(len(spans), 0) @pytest.mark.asyncio def test_asyncio_ensure_future_with_future(self): - loop = asyncio.get_event_loop() + with self._tracer.start_as_current_span("root") as root: + loop = asyncio.get_event_loop() - future = asyncio.Future() - future.set_result(1) - task = asyncio.ensure_future(future) - loop.run_until_complete(task) + future = asyncio.Future() + future.set_result(1) + task = asyncio.ensure_future(future) + loop.run_until_complete(task) spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual(spans[0].name, "asyncio.Future") + self.assertEqual(len(spans), 2) + self.assertEqual(spans[0].name, "asyncio.future") + for metric in self.memory_metrics_reader.get_metrics_data().resource_metrics[0].scope_metrics[0].metrics: + if metric.name == ASYNCIO_FUTURES_DURATION: + self.assertEquals(metric.data.data_points[0].count, 1) + elif metric.name == ASYNCIO_FUTURES_ACTIVE: + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == ASYNCIO_FUTURES_CREATED: + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == ASYNCIO_FUTURES_FINISHED: + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == ASYNCIO_FUTURES_EXCEPTIONS: + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == ASYNCIO_FUTURES_CANCELLED: + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == ASYNCIO_FUTURES_TIMEOUTS: + self.assertEqual(metric.data.data_points[0].value, 0) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py index ea02612893..1881f6b003 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py @@ -12,15 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE from .common_test_func import factorial from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor class TestAsyncioGather(TestBase): + + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "factorial" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_integration.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_integration.py index b10a327bc6..25bbd68f3f 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_integration.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_integration.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer -from .common_test_func import ensure_future from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE +from .common_test_func import ensure_future class TestAsyncioInstrumentor(TestBase): @@ -30,6 +32,11 @@ def setUp(self): def tearDown(self): super().tearDown() + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "sleep" + } + ) def test_asyncio_integration(self): AsyncioInstrumentor().instrument() diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_run_coroutine_threadsafe.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_run_coroutine_threadsafe.py index 000168509f..573da05c2e 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_run_coroutine_threadsafe.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_run_coroutine_threadsafe.py @@ -14,14 +14,22 @@ import asyncio import threading from concurrent.futures import ThreadPoolExecutor +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE class TestRunCoroutineThreadSafe(TestBase): + + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "coro" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_taskgroup.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_taskgroup.py index fb617770eb..f063a09fbd 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_taskgroup.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_taskgroup.py @@ -13,11 +13,13 @@ # limitations under the License. import asyncio import sys +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE from .common_test_func import async_func py11 = False @@ -26,6 +28,11 @@ class TestAsyncioTaskgroup(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "async_func" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() @@ -39,13 +46,16 @@ def tearDown(self): def test_task_group_create_task(self): # TaskGroup is only available in Python 3.11+ + if not py11: + return + async def main(): - if py11: - async with asyncio.TaskGroup() as tg: - for _ in range(10): - tg.create_task(async_func()) + async with asyncio.TaskGroup() as tg: + for _ in range(10): + tg.create_task(async_func()) - asyncio.run(main()) + with self._tracer.start_as_current_span("root"): + asyncio.run(main()) spans = self.memory_exporter.get_finished_spans() - if py11: - self.assertEqual(len(spans), 10) + self.assertEqual(len(spans), 11) + self.assertEqual(spans[4].context.trace_id, spans[5].context.trace_id) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py index ae1c660ce5..c62cfdff35 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py @@ -13,14 +13,22 @@ # limitations under the License. import asyncio import sys +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import \ + OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE class TestAsyncioToThread(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_TO_THREAD_FUNCTION_NAMES_TO_TRACE: "multiply" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument() @@ -42,8 +50,24 @@ async def to_thread(): result = await asyncio.to_thread(multiply, 2, 3) assert result == 6 - asyncio.run(to_thread()) + with self._tracer.start_as_current_span("root"): + asyncio.run(to_thread()) spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) + self.assertEqual(len(spans), 2) assert spans[0].name == "asyncio.to_thread_func-multiply" + for metric in self.memory_metrics_reader.get_metrics_data().resource_metrics[0].scope_metrics[0].metrics: + if metric.name == "asyncio.to_thread.duration": + self.assertEquals(metric.data.data_points[0].count, 1) + elif metric.name == "asyncio.to_thread.active": + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == "asyncio.to_thread.created": + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == "asyncio.to_thread.finished": + self.assertEqual(metric.data.data_points[0].value, 1) + elif metric.name == "asyncio.to_thread.exceptions": + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == "asyncio.to_thread.cancelled": + self.assertEqual(metric.data.data_points[0].value, 0) + elif metric.name == "asyncio.to_thread.name": + self.assertEqual(metric.data.data_points[0].value, "multiply") diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_utils.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_utils.py new file mode 100644 index 0000000000..f1749d8401 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_utils.py @@ -0,0 +1,41 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE, \ + OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED + + +class TestAsyncioToThread(TestCase): + + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "test1,test2,test3 ,test4" + } + ) + def test_separator(self): + from opentelemetry.instrumentation.asyncio.utils import get_coros_to_trace + self.assertEqual( + get_coros_to_trace(), {"test1", "test2", "test3", "test4"} + ) + + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED: "true" + } + ) + def test_future_trace_enabled(self): + from opentelemetry.instrumentation.asyncio.utils import get_future_trace_enabled + self.assertEqual(get_future_trace_enabled(), True) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py index 070617ca94..b03d9b1652 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py @@ -13,15 +13,22 @@ # limitations under the License. import asyncio import sys +from unittest.mock import patch from opentelemetry.test.test_base import TestBase from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.asyncio.environment_variables import OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE from .common_test_func import async_func from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor class TestAsyncioWait(TestBase): + @patch.dict( + "os.environ", { + OTEL_PYTHON_ASYNCIO_COROUTINE_NAMES_TO_TRACE: "async_func" + } + ) def setUp(self): super().setUp() AsyncioInstrumentor().instrument()