Skip to content

Commit

Permalink
include feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bourbonkk committed Jan 3, 2024
1 parent b036ba3 commit a8d5e16
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 349 deletions.
26 changes: 3 additions & 23 deletions instrumentation/opentelemetry-instrumentation-asyncio/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ OpenTelemetry asyncio Instrumentation
AsyncioInstrumentor: Tracing Requests Made by the Asyncio Library


The opentelemetry-instrumentation-asycnio package allows tracing asyncio applications.
The opentelemetry-instrumentation-asyncio package allows tracing asyncio applications.
The metric for coroutine, future, is generated even if there is no setting to generate a span.


Expand Down Expand Up @@ -85,28 +85,8 @@ Run instrumented application
asyncio metric types
----------------------

* `asyncio.futures.duration` (ms) - Duration of the future
* `asyncio.futures.exceptions` (count) - Number of exceptions raised by the future
* `asyncio.futures.cancelled` (count) - Number of futures cancelled
* `asyncio.futures.created` (count) - Number of futures created
* `asyncio.futures.active` (count) - Number of futures active
* `asyncio.futures.finished` (count) - Number of futures finished
* `asyncio.futures.timeouts` (count) - Number of futures timed out

* `asyncio.coroutine.duration` (ms) - Duration of the coroutine
* `asyncio.coroutine.exceptions` (count) - Number of exceptions raised by the coroutine
* `asyncio.coroutine.created` (count) - Number of coroutines created
* `asyncio.coroutine.active` (count) - Number of coroutines active
* `asyncio.coroutine.finished` (count) - Number of coroutines finished
* `asyncio.coroutine.timeouts` (count) - Number of coroutines timed out
* `asyncio.coroutine.cancelled` (count) - Number of coroutines cancelled

* `asyncio.to_thread.duration` (ms) - Duration of the to_thread
* `asyncio.to_thread.exceptions` (count) - Number of exceptions raised by the to_thread
* `asyncio.to_thread.created` (count) - Number of to_thread created
* `asyncio.to_thread.active` (count) - Number of to_thread active
* `asyncio.to_thread.finished` (count) - Number of to_thread finished

* `asyncio.process.duration` (seconds) - Duration of asyncio process
* `asyncio.process.count` (count) - Number of asyncio process


API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,8 @@ def func():
asyncio metric types
---------------------
* asyncio.futures.duration (ms) - Duration of the future
* asyncio.futures.exceptions (count) - Number of exceptions raised by the future
* asyncio.futures.cancelled (count) - Number of futures cancelled
* asyncio.futures.created (count) - Number of futures created
* asyncio.futures.active (count) - Number of futures active
* asyncio.futures.finished (count) - Number of futures finished
* asyncio.futures.timeouts (count) - Number of futures timed out
* asyncio.coroutine.duration (ms) - Duration of the coroutine
* asyncio.coroutine.exceptions (count) - Number of exceptions raised by the coroutine
* asyncio.coroutine.created (count) - Number of coroutines created
* asyncio.coroutine.active (count) - Number of coroutines active
* asyncio.coroutine.finished (count) - Number of coroutines finished
* asyncio.coroutine.timeouts (count) - Number of coroutines timed out
* asyncio.coroutine.cancelled (count) - Number of coroutines cancelled
* asyncio.to_thread.duration (ms) - Duration of the to_thread
* asyncio.to_thread.exceptions (count) - Number of exceptions raised by the to_thread
* asyncio.to_thread.created (count) - Number of to_thread created
* asyncio.to_thread.active (count) - Number of to_thread active
* asyncio.to_thread.finished (count) - Number of to_thread finished
* asyncio.process.duration (seconds) - Duration of asyncio process
* asyncio.process.count (count) - Number of asyncio process
API
Expand All @@ -103,30 +84,6 @@ def func():

from wrapt import wrap_function_wrapper as _wrap

# pylint: disable=no-name-in-module
from opentelemetry.instrumentation.asyncio.metrics import (
ASYNCIO_COROUTINE_ACTIVE,
ASYNCIO_COROUTINE_CANCELLED,
ASYNCIO_COROUTINE_CREATED,
ASYNCIO_COROUTINE_DURATION,
ASYNCIO_COROUTINE_EXCEPTIONS,
ASYNCIO_COROUTINE_FINISHED,
ASYNCIO_COROUTINE_NAME,
ASYNCIO_COROUTINE_TIMEOUTS,
ASYNCIO_EXCEPTIONS_NAME,
ASYNCIO_FUTURES_ACTIVE,
ASYNCIO_FUTURES_CANCELLED,
ASYNCIO_FUTURES_CREATED,
ASYNCIO_FUTURES_DURATION,
ASYNCIO_FUTURES_EXCEPTIONS,
ASYNCIO_FUTURES_FINISHED,
ASYNCIO_FUTURES_TIMEOUTS,
ASYNCIO_TO_THREAD_ACTIVE,
ASYNCIO_TO_THREAD_CREATED,
ASYNCIO_TO_THREAD_DURATION,
ASYNCIO_TO_THREAD_EXCEPTIONS,
ASYNCIO_TO_THREAD_FINISHED,
)
from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
get_coros_to_trace,
Expand Down Expand Up @@ -161,27 +118,8 @@ class AsyncioInstrumentor(BaseInstrumentor):

def __init__(self):
super().__init__()
self.to_thread_duration_metric = None
self.to_thread_exception_metric = None
self.to_thread_active_metric = None
self.to_thread_created_metric = None
self.to_thread_finished_metric = None

self.coro_duration_metric = None
self.coro_exception_metric = None
self.coro_cancelled_metric = None
self.coro_active_metric = None
self.coro_created_metric = None
self.coro_finished_metric = None
self.coro_timeout_metric = None

self.future_duration_metric = None
self.future_exception_metric = None
self.future_cancelled_metric = None
self.future_active_metric = None
self.future_created_metric = None
self.future_finished_metric = None
self.future_timeout_metric = None
self.process_duration_metric = None
self.process_counts_metric = None

self._tracer = None
self._meter = None
Expand All @@ -203,9 +141,16 @@ def _instrument(self, **kwargs):
self._future_active_enabled = get_future_trace_enabled()
self._to_thread_name_to_trace = get_to_thread_to_trace()

self.create_coro_metric()
self.create_future_metric()
self.create_to_thread_metric()
self.process_duration_metric = self._meter.create_histogram(
name="asyncio.process.duration",
description="Duration of asyncio process",
unit="seconds",
)
self.process_counts_metric = self._meter.create_up_down_counter(
name="asyncio.process.count",
description="Number of asyncio process",
unit="1",
)

for method in self.methods_with_coroutine:
self.instrument_method_with_coroutine(method)
Expand Down Expand Up @@ -295,28 +240,26 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs):
def trace_to_thread(self, func):
"""Trace a function."""
start = default_timer()
self.to_thread_created_metric.add(1)
self.to_thread_active_metric.add(1)
span = (
self._tracer.start_span(
f"{ASYNCIO_PREFIX}to_thread_func-" + func.__name__
)
if func.__name__ in self._to_thread_name_to_trace
else None
)
attr = {"type": "to_thread", "name": func.__name__}
duration_attr = attr.copy()
exception = None
try:
attr["state"] = "finished"
return func
except Exception as exc:
exception_attr = {ASYNCIO_EXCEPTIONS_NAME: exc.__class__.__name__}
exception = exc
self.to_thread_exception_metric.add(1, exception_attr)
except Exception:
attr["state"] = "exception"
raise
finally:
duration = max(round((default_timer() - start) * 1000), 0)
self.to_thread_duration_metric.record(duration)
self.to_thread_finished_metric.add(1)
self.to_thread_active_metric.add(-1)
self.process_duration_metric.record(duration, duration_attr)
self.process_counts_metric.add(1, attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
Expand All @@ -336,42 +279,34 @@ def trace_item(self, coro_or_future):

async def trace_coroutine(self, coro):
start = default_timer()
coro_attr = {
ASYNCIO_COROUTINE_NAME: coro.__name__,
attr = {
"type": "coroutine",
"name": coro.__name__,
}
self.coro_created_metric.add(1, coro_attr)
self.coro_active_metric.add(1, coro_attr)

duration_attr = attr.copy()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX}coro-" + coro.__name__)
if coro.__name__ in self._coros_name_to_trace
else None
)

exception = None
try:
attr["state"] = "finished"
return await coro
# CancelledError is raised when a coroutine is cancelled
# before it has a chance to run. We don't want to record
# this as an error.
except asyncio.CancelledError:
self.coro_cancelled_metric.add(1, coro_attr)
except asyncio.TimeoutError:
self.coro_timeout_metric.add(1, coro_attr)
raise
attr["state"] = "cancelled"
except Exception as exc:
exception = exc
coro_exception_attr = coro_attr.copy()
coro_exception_attr[
ASYNCIO_EXCEPTIONS_NAME
] = exc.__class__.__name__
self.coro_exception_metric.add(1, coro_exception_attr)
state = determine_state(exception)
attr["state"] = state
raise
finally:
duration = max(round((default_timer() - start) * 1000), 0)
self.coro_duration_metric.record(duration, coro_attr)
self.coro_finished_metric.add(1, coro_attr)
self.coro_active_metric.add(-1, coro_attr)
duration = max(round(default_timer() - start), 0)
self.process_duration_metric.record(duration, duration_attr)
self.process_counts_metric.add(1, attr)

if span:
if span.is_recording() and exception:
Expand All @@ -381,30 +316,23 @@ async def trace_coroutine(self, coro):

def trace_future(self, future):
start = default_timer()
self.future_created_metric.add(1)
self.future_active_metric.add(1)
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX}future")
if self._future_active_enabled
else None
)

def callback(f):
duration = max(round(default_timer() - start), 0)
exception = f.exception()
if isinstance(exception, asyncio.CancelledError):
self.future_cancelled_metric.add(1)
elif isinstance(exception, asyncio.TimeoutError):
self.future_timeout_metric.add(1)
elif exception:
exception_attr = {
ASYNCIO_EXCEPTIONS_NAME: exception.__class__.__name__
}
self.future_exception_metric.add(1, exception_attr)

duration = max(round((default_timer() - start) * 1000), 0)
self.future_duration_metric.record(duration)
self.future_finished_metric.add(1)
self.future_active_metric.add(-1)
attr = {
"type": "future",
}
duration_attr = attr.copy()
state = determine_state(exception)
attr["state"] = state
self.process_counts_metric.add(1, attr)
self.process_duration_metric.record(duration, duration_attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
Expand All @@ -414,106 +342,15 @@ def callback(f):
future.add_done_callback(callback)
return future

def create_coro_metric(self):
self.coro_duration_metric = self._meter.create_histogram(
name=ASYNCIO_COROUTINE_DURATION,
description="Duration of asyncio coroutine",
unit="ms",
)
self.coro_exception_metric = self._meter.create_counter(
name=ASYNCIO_COROUTINE_EXCEPTIONS,
description="Number of exceptions in asyncio coroutine",
unit="1",
)
self.coro_cancelled_metric = self._meter.create_counter(
name=ASYNCIO_COROUTINE_CANCELLED,
description="Number of asyncio coroutine cancelled",
unit="1",
)
self.coro_active_metric = self._meter.create_up_down_counter(
name=ASYNCIO_COROUTINE_ACTIVE,
description="Number of asyncio coroutine active",
unit="1",
)
self.coro_created_metric = self._meter.create_counter(
name=ASYNCIO_COROUTINE_CREATED,
description="Number of asyncio coroutine created",
unit="1",
)
self.coro_finished_metric = self._meter.create_counter(
name=ASYNCIO_COROUTINE_FINISHED,
description="Number of asyncio coroutine finished",
unit="1",
)
self.coro_timeout_metric = self._meter.create_counter(
name=ASYNCIO_COROUTINE_TIMEOUTS,
description="Number of asyncio coroutine timeouts",
unit="1",
)

def create_future_metric(self):
self.future_duration_metric = self._meter.create_histogram(
name=ASYNCIO_FUTURES_DURATION,
description="Duration of asyncio future",
unit="ms",
)
self.future_exception_metric = self._meter.create_counter(
name=ASYNCIO_FUTURES_EXCEPTIONS,
description="Number of exceptions in asyncio future",
unit="1",
)
self.future_cancelled_metric = self._meter.create_counter(
name=ASYNCIO_FUTURES_CANCELLED,
description="Number of asyncio future cancelled",
unit="1",
)
self.future_created_metric = self._meter.create_counter(
name=ASYNCIO_FUTURES_CREATED,
description="Number of asyncio future created",
unit="1",
)
self.future_active_metric = self._meter.create_up_down_counter(
name=ASYNCIO_FUTURES_ACTIVE,
description="Number of asyncio future active",
unit="1",
)
self.future_finished_metric = self._meter.create_counter(
name=ASYNCIO_FUTURES_FINISHED,
description="Number of asyncio future finished",
unit="1",
)
self.future_timeout_metric = self._meter.create_counter(
name=ASYNCIO_FUTURES_TIMEOUTS,
description="Number of asyncio future timeouts",
unit="1",
)

def create_to_thread_metric(self):
self.to_thread_duration_metric = self._meter.create_histogram(
name=ASYNCIO_TO_THREAD_DURATION,
description="Duration of asyncio function",
unit="ms",
)
self.to_thread_exception_metric = self._meter.create_counter(
name=ASYNCIO_TO_THREAD_EXCEPTIONS,
description="Number of exceptions in asyncio function",
unit="1",
)
self.to_thread_created_metric = self._meter.create_counter(
name=ASYNCIO_TO_THREAD_CREATED,
description="Number of asyncio function created",
unit="1",
)
self.to_thread_active_metric = self._meter.create_up_down_counter(
name=ASYNCIO_TO_THREAD_ACTIVE,
description="Number of asyncio function active",
unit="1",
)
self.to_thread_finished_metric = self._meter.create_counter(
name=ASYNCIO_TO_THREAD_FINISHED,
description="Number of asyncio function finished",
unit="1",
)
def determine_state(exception):
if isinstance(exception, asyncio.CancelledError):
return "cancelled"
if isinstance(exception, asyncio.TimeoutError):
return "timeout"
if exception:
return "exception"
return "finished"


def uninstrument_taskgroup_create_task():
Expand Down
Loading

0 comments on commit a8d5e16

Please sign in to comment.