diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py index bee967b770..d80ea28076 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py @@ -97,7 +97,7 @@ def func(): from opentelemetry.trace import get_tracer from opentelemetry.trace.status import Status, StatusCode -ASYNCIO_PREFIX = "asyncio." +ASYNCIO_PREFIX = "asyncio" class AsyncioInstrumentor(BaseInstrumentor): @@ -118,8 +118,8 @@ class AsyncioInstrumentor(BaseInstrumentor): def __init__(self): super().__init__() - self.process_duration_metric = None - self.process_counts_metric = None + self.process_duration_histogram = None + self.process_counts_counter = None self._tracer = None self._meter = None @@ -131,8 +131,9 @@ def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs): - tracer_provider = kwargs.get("tracer_provider") - self._tracer = get_tracer(__name__, __version__, tracer_provider) + self._tracer = get_tracer( + __name__, __version__, kwargs.get("tracer_provider") + ) self._meter = get_meter( __name__, __version__, kwargs.get("meter_provider") ) @@ -141,12 +142,12 @@ def _instrument(self, **kwargs): self._future_active_enabled = get_future_trace_enabled() self._to_thread_name_to_trace = get_to_thread_to_trace() - self.process_duration_metric = self._meter.create_histogram( + self.process_duration_histogram = self._meter.create_histogram( name="asyncio.process.duration", description="Duration of asyncio process", unit="seconds", ) - self.process_counts_metric = self._meter.create_up_down_counter( + self.process_counts_counter = self._meter.create_counter( name="asyncio.process.count", description="Number of asyncio process", unit="1", @@ -166,7 +167,7 @@ def _uninstrument(self, **kwargs): uninstrument_to_thread() uninstrument_taskgroup_create_task() - def instrument_method_with_coroutine(self, method_name): + def instrument_method_with_coroutine(self, method_name: str): """ Instruments specified asyncio method. """ @@ -201,12 +202,12 @@ def wrap_coros_or_futures(method, instance, args, kwargs): _wrap(asyncio, "gather", wrap_coros_or_futures) - def instrument_to_thread(self): + def instrument_to_thread(self) -> None: # to_thread was added in Python 3.9 if sys.version_info < (3, 9): return - def wrap_to_thread(method, instance, args, kwargs): + def wrap_to_thread(method, instance, args, kwargs) -> None: if args: first_arg = args[0] # Wrap the first argument @@ -218,12 +219,12 @@ def wrap_to_thread(method, instance, args, kwargs): _wrap(asyncio, "to_thread", wrap_to_thread) - def instrument_taskgroup_create_task(self): + def instrument_taskgroup_create_task(self) -> None: # TaskGroup.create_task was added in Python 3.11 if sys.version_info < (3, 11): return - def wrap_taskgroup_create_task(method, instance, args, kwargs): + def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None: if args: coro = args[0] wrapped_coro = self.trace_coroutine(coro) @@ -237,18 +238,17 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs): wrap_taskgroup_create_task, ) - def trace_to_thread(self, func): + def trace_to_thread(self, func: callable): """Trace a function.""" start = default_timer() span = ( self._tracer.start_span( - f"{ASYNCIO_PREFIX}to_thread_func-" + func.__name__ + f"{ASYNCIO_PREFIX} to_thread-" + func.__name__ ) if func.__name__ in self._to_thread_name_to_trace else None ) attr = {"type": "to_thread", "name": func.__name__} - duration_attr = attr.copy() exception = None try: attr["state"] = "finished" @@ -257,14 +257,7 @@ def trace_to_thread(self, func): attr["state"] = "exception" raise finally: - duration = max(round((default_timer() - start) * 1000), 0) - self.process_duration_metric.record(duration, duration_attr) - self.process_counts_metric.add(1, attr) - if span: - if span.is_recording() and exception: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(exception) - span.end() + self.record_process(start, attr, span, exception) def trace_item(self, coro_or_future): """Trace a coroutine or future item.""" @@ -283,9 +276,8 @@ async def trace_coroutine(self, coro): "type": "coroutine", "name": coro.__name__, } - duration_attr = attr.copy() span = ( - self._tracer.start_span(f"{ASYNCIO_PREFIX}coro-" + coro.__name__) + self._tracer.start_span(f"{ASYNCIO_PREFIX} coro-" + coro.__name__) if coro.__name__ in self._coros_name_to_trace else None ) @@ -304,46 +296,51 @@ async def trace_coroutine(self, coro): attr["state"] = state raise finally: - duration = max(round(default_timer() - start), 0) - self.process_duration_metric.record(duration, duration_attr) - self.process_counts_metric.add(1, attr) - - if span: - if span.is_recording() and exception: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(exception) - span.end() + self.record_process(start, attr, span, exception) - def trace_future(self, future): + def trace_future(self, future) -> futures.Future: start = default_timer() span = ( - self._tracer.start_span(f"{ASYNCIO_PREFIX}future") + self._tracer.start_span(f"{ASYNCIO_PREFIX} future") if self._future_active_enabled else None ) def callback(f): - duration = max(round(default_timer() - start), 0) exception = f.exception() attr = { "type": "future", } - duration_attr = attr.copy() state = determine_state(exception) attr["state"] = state - self.process_counts_metric.add(1, attr) - self.process_duration_metric.record(duration, duration_attr) - if span: - if span.is_recording() and exception: - span.set_status(Status(StatusCode.ERROR)) - span.record_exception(exception) - span.end() + self.record_process(start, attr, span, exception) future.add_done_callback(callback) return future + def record_process( + self, start: float, attr: dict, span=None, exception=None + ) -> None: + """ + Record the processing time, update histogram and counter, and handle span. + + :param start: Start time of the process. + :param attr: Attributes for the histogram and counter. + :param span: Optional span for tracing. + :param exception: Optional exception occurred during the process. + """ + duration = max(default_timer() - start, 0) + self.process_duration_histogram.record(duration, attr) + self.process_counts_counter.add(1, attr) + + if span: + if span.is_recording() and exception: + span.set_status(Status(StatusCode.ERROR)) + span.record_exception(exception) + span.end() + -def determine_state(exception): +def determine_state(exception: Exception) -> str: if isinstance(exception, asyncio.CancelledError): return "cancelled" if isinstance(exception, asyncio.TimeoutError): @@ -353,25 +350,25 @@ def determine_state(exception): return "finished" -def uninstrument_taskgroup_create_task(): +def uninstrument_taskgroup_create_task() -> None: # TaskGroup.create_task was added in Python 3.11 if sys.version_info < (3, 11): return unwrap(asyncio.TaskGroup, "create_task") # pylint: disable=no-member -def uninstrument_to_thread(): +def uninstrument_to_thread() -> None: # to_thread was added in Python 3.9 if sys.version_info < (3, 9): return unwrap(asyncio, "to_thread") -def uninstrument_gather(): +def uninstrument_gather() -> None: unwrap(asyncio, "gather") -def uninstrument_method_with_coroutine(method_name): +def uninstrument_method_with_coroutine(method_name: str) -> None: """ Uninstrument specified asyncio method. """ diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py index c196a7bda7..f95281949c 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/utils.py @@ -21,7 +21,7 @@ ) -def separate_coro_names_by_comma(coro_names) -> set: +def separate_coro_names_by_comma(coro_names: str) -> set: """ Function to separate the coroutines to be traced by comma """ diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/common_test_func.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/common_test_func.py index ed8803a1f0..5d06641bc0 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/common_test_func.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/common_test_func.py @@ -19,7 +19,7 @@ async def async_func(): await asyncio.sleep(0.1) -async def factorial(number): +async def factorial(number: int): factorial_value = 1 for value in range(2, number + 1): await asyncio.sleep(0) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py index 32fc1caffa..62905e3eee 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_cancellation.py @@ -51,8 +51,8 @@ def test_cancel(self): self.assertEqual(spans[0].context.trace_id, spans[1].context.trace_id) self.assertEqual(spans[2].context.trace_id, spans[1].context.trace_id) - self.assertEqual(spans[0].name, "asyncio.coro-cancellable_coroutine") - self.assertEqual(spans[1].name, "asyncio.coro-cancellation_coro") + self.assertEqual(spans[0].name, "asyncio coro-cancellable_coroutine") + self.assertEqual(spans[1].name, "asyncio coro-cancellation_coro") for metric in ( self.memory_metrics_reader.get_metrics_data() .resource_metrics[0] diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py index 38962c6b1e..9df8d9d14b 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_create_task.py @@ -49,4 +49,4 @@ async def async_func(): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) - self.assertEqual(spans[0].name, "asyncio.coro-sleep") + self.assertEqual(spans[0].name, "asyncio coro-sleep") diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py index 6d9533c271..395b46b698 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_gather.py @@ -48,6 +48,6 @@ async def gather_factorial(): asyncio.run(gather_factorial()) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 3) - self.assertEqual(spans[0].name, "asyncio.coro-factorial") - self.assertEqual(spans[1].name, "asyncio.coro-factorial") - self.assertEqual(spans[2].name, "asyncio.coro-factorial") + self.assertEqual(spans[0].name, "asyncio coro-factorial") + self.assertEqual(spans[1].name, "asyncio coro-factorial") + self.assertEqual(spans[2].name, "asyncio coro-factorial") diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py index 8f2538af21..54c85374c9 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_to_thread.py @@ -56,26 +56,18 @@ async def to_thread(): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 2) - assert spans[0].name == "asyncio.to_thread_func-multiply" + assert spans[0].name == "asyncio to_thread-multiply" for metric in ( self.memory_metrics_reader.get_metrics_data() .resource_metrics[0] .scope_metrics[0] .metrics ): - if metric.name == "asyncio.to_thread.duration": - self.assertEqual(metric.data.data_points[0].count, 1) - elif metric.name == "asyncio.to_thread.active": - self.assertEqual(metric.data.data_points[0].value, 0) - elif metric.name == "asyncio.to_thread.created": - self.assertEqual(metric.data.data_points[0].value, 1) - elif metric.name == "asyncio.to_thread.finished": - self.assertEqual(metric.data.data_points[0].value, 1) - elif metric.name == "asyncio.to_thread.exceptions": - self.assertEqual(metric.data.data_points[0].value, 0) - elif metric.name == "asyncio.to_thread.cancelled": - self.assertEqual(metric.data.data_points[0].value, 0) - elif metric.name == "asyncio.to_thread.name": - self.assertEqual( - metric.data.data_points[0].value, "multiply" - ) + if metric.name == "asyncio.process.duration": + for point in metric.data.data_points: + self.assertEqual(point.attributes["type"], "to_thread") + self.assertEqual(point.attributes["name"], "multiply") + if metric.name == "asyncio.process.count": + for point in metric.data.data_points: + self.assertEqual(point.attributes["type"], "to_thread") + self.assertEqual(point.attributes["name"], "multiply")