Skip to content

Commit

Permalink
include feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bourbonkk committed Jan 4, 2024
1 parent a8d5e16 commit 936dc37
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def func():
from opentelemetry.trace import get_tracer
from opentelemetry.trace.status import Status, StatusCode

ASYNCIO_PREFIX = "asyncio."
ASYNCIO_PREFIX = "asyncio"


class AsyncioInstrumentor(BaseInstrumentor):
Expand All @@ -118,8 +118,8 @@ class AsyncioInstrumentor(BaseInstrumentor):

def __init__(self):
super().__init__()
self.process_duration_metric = None
self.process_counts_metric = None
self.process_duration_histogram = None
self.process_counts_counter = None

self._tracer = None
self._meter = None
Expand All @@ -131,8 +131,9 @@ def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
self._tracer = get_tracer(__name__, __version__, tracer_provider)
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)
self._meter = get_meter(
__name__, __version__, kwargs.get("meter_provider")
)
Expand All @@ -141,12 +142,12 @@ def _instrument(self, **kwargs):
self._future_active_enabled = get_future_trace_enabled()
self._to_thread_name_to_trace = get_to_thread_to_trace()

self.process_duration_metric = self._meter.create_histogram(
self.process_duration_histogram = self._meter.create_histogram(
name="asyncio.process.duration",
description="Duration of asyncio process",
unit="seconds",
)
self.process_counts_metric = self._meter.create_up_down_counter(
self.process_counts_counter = self._meter.create_counter(
name="asyncio.process.count",
description="Number of asyncio process",
unit="1",
Expand All @@ -166,7 +167,7 @@ def _uninstrument(self, **kwargs):
uninstrument_to_thread()
uninstrument_taskgroup_create_task()

def instrument_method_with_coroutine(self, method_name):
def instrument_method_with_coroutine(self, method_name: str):
"""
Instruments specified asyncio method.
"""
Expand Down Expand Up @@ -201,12 +202,12 @@ def wrap_coros_or_futures(method, instance, args, kwargs):

_wrap(asyncio, "gather", wrap_coros_or_futures)

def instrument_to_thread(self):
def instrument_to_thread(self) -> None:
# to_thread was added in Python 3.9
if sys.version_info < (3, 9):
return

def wrap_to_thread(method, instance, args, kwargs):
def wrap_to_thread(method, instance, args, kwargs) -> None:
if args:
first_arg = args[0]
# Wrap the first argument
Expand All @@ -218,12 +219,12 @@ def wrap_to_thread(method, instance, args, kwargs):

_wrap(asyncio, "to_thread", wrap_to_thread)

def instrument_taskgroup_create_task(self):
def instrument_taskgroup_create_task(self) -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return

def wrap_taskgroup_create_task(method, instance, args, kwargs):
def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
if args:
coro = args[0]
wrapped_coro = self.trace_coroutine(coro)
Expand All @@ -237,18 +238,17 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs):
wrap_taskgroup_create_task,
)

def trace_to_thread(self, func):
def trace_to_thread(self, func: callable):
"""Trace a function."""
start = default_timer()
span = (
self._tracer.start_span(
f"{ASYNCIO_PREFIX}to_thread_func-" + func.__name__
f"{ASYNCIO_PREFIX} to_thread-" + func.__name__
)
if func.__name__ in self._to_thread_name_to_trace
else None
)
attr = {"type": "to_thread", "name": func.__name__}
duration_attr = attr.copy()
exception = None
try:
attr["state"] = "finished"
Expand All @@ -257,14 +257,7 @@ def trace_to_thread(self, func):
attr["state"] = "exception"
raise
finally:
duration = max(round((default_timer() - start) * 1000), 0)
self.process_duration_metric.record(duration, duration_attr)
self.process_counts_metric.add(1, attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()
self.record_process(start, attr, span, exception)

def trace_item(self, coro_or_future):
"""Trace a coroutine or future item."""
Expand All @@ -283,9 +276,8 @@ async def trace_coroutine(self, coro):
"type": "coroutine",
"name": coro.__name__,
}
duration_attr = attr.copy()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX}coro-" + coro.__name__)
self._tracer.start_span(f"{ASYNCIO_PREFIX} coro-" + coro.__name__)
if coro.__name__ in self._coros_name_to_trace
else None
)
Expand All @@ -304,46 +296,51 @@ async def trace_coroutine(self, coro):
attr["state"] = state
raise
finally:
duration = max(round(default_timer() - start), 0)
self.process_duration_metric.record(duration, duration_attr)
self.process_counts_metric.add(1, attr)

if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()
self.record_process(start, attr, span, exception)

def trace_future(self, future):
def trace_future(self, future) -> futures.Future:
start = default_timer()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX}future")
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
if self._future_active_enabled
else None
)

def callback(f):
duration = max(round(default_timer() - start), 0)
exception = f.exception()
attr = {
"type": "future",
}
duration_attr = attr.copy()
state = determine_state(exception)
attr["state"] = state
self.process_counts_metric.add(1, attr)
self.process_duration_metric.record(duration, duration_attr)
if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()
self.record_process(start, attr, span, exception)

future.add_done_callback(callback)
return future

def record_process(
self, start: float, attr: dict, span=None, exception=None
) -> None:
"""
Record the processing time, update histogram and counter, and handle span.
:param start: Start time of the process.
:param attr: Attributes for the histogram and counter.
:param span: Optional span for tracing.
:param exception: Optional exception occurred during the process.
"""
duration = max(default_timer() - start, 0)
self.process_duration_histogram.record(duration, attr)
self.process_counts_counter.add(1, attr)

if span:
if span.is_recording() and exception:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exception)
span.end()


def determine_state(exception):
def determine_state(exception: Exception) -> str:
if isinstance(exception, asyncio.CancelledError):
return "cancelled"
if isinstance(exception, asyncio.TimeoutError):
Expand All @@ -353,25 +350,25 @@ def determine_state(exception):
return "finished"


def uninstrument_taskgroup_create_task():
def uninstrument_taskgroup_create_task() -> None:
# TaskGroup.create_task was added in Python 3.11
if sys.version_info < (3, 11):
return
unwrap(asyncio.TaskGroup, "create_task") # pylint: disable=no-member


def uninstrument_to_thread():
def uninstrument_to_thread() -> None:
# to_thread was added in Python 3.9
if sys.version_info < (3, 9):
return
unwrap(asyncio, "to_thread")


def uninstrument_gather():
def uninstrument_gather() -> None:
unwrap(asyncio, "gather")


def uninstrument_method_with_coroutine(method_name):
def uninstrument_method_with_coroutine(method_name: str) -> None:
"""
Uninstrument specified asyncio method.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)


def separate_coro_names_by_comma(coro_names) -> set:
def separate_coro_names_by_comma(coro_names: str) -> set:
"""
Function to separate the coroutines to be traced by comma
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def async_func():
await asyncio.sleep(0.1)


async def factorial(number):
async def factorial(number: int):
factorial_value = 1
for value in range(2, number + 1):
await asyncio.sleep(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def test_cancel(self):
self.assertEqual(spans[0].context.trace_id, spans[1].context.trace_id)
self.assertEqual(spans[2].context.trace_id, spans[1].context.trace_id)

self.assertEqual(spans[0].name, "asyncio.coro-cancellable_coroutine")
self.assertEqual(spans[1].name, "asyncio.coro-cancellation_coro")
self.assertEqual(spans[0].name, "asyncio coro-cancellable_coroutine")
self.assertEqual(spans[1].name, "asyncio coro-cancellation_coro")
for metric in (
self.memory_metrics_reader.get_metrics_data()
.resource_metrics[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ async def async_func():
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "asyncio.coro-sleep")
self.assertEqual(spans[0].name, "asyncio coro-sleep")
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ async def gather_factorial():
asyncio.run(gather_factorial())
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)
self.assertEqual(spans[0].name, "asyncio.coro-factorial")
self.assertEqual(spans[1].name, "asyncio.coro-factorial")
self.assertEqual(spans[2].name, "asyncio.coro-factorial")
self.assertEqual(spans[0].name, "asyncio coro-factorial")
self.assertEqual(spans[1].name, "asyncio coro-factorial")
self.assertEqual(spans[2].name, "asyncio coro-factorial")
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,18 @@ async def to_thread():
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 2)
assert spans[0].name == "asyncio.to_thread_func-multiply"
assert spans[0].name == "asyncio to_thread-multiply"
for metric in (
self.memory_metrics_reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
):
if metric.name == "asyncio.to_thread.duration":
self.assertEqual(metric.data.data_points[0].count, 1)
elif metric.name == "asyncio.to_thread.active":
self.assertEqual(metric.data.data_points[0].value, 0)
elif metric.name == "asyncio.to_thread.created":
self.assertEqual(metric.data.data_points[0].value, 1)
elif metric.name == "asyncio.to_thread.finished":
self.assertEqual(metric.data.data_points[0].value, 1)
elif metric.name == "asyncio.to_thread.exceptions":
self.assertEqual(metric.data.data_points[0].value, 0)
elif metric.name == "asyncio.to_thread.cancelled":
self.assertEqual(metric.data.data_points[0].value, 0)
elif metric.name == "asyncio.to_thread.name":
self.assertEqual(
metric.data.data_points[0].value, "multiply"
)
if metric.name == "asyncio.process.duration":
for point in metric.data.data_points:
self.assertEqual(point.attributes["type"], "to_thread")
self.assertEqual(point.attributes["name"], "multiply")
if metric.name == "asyncio.process.count":
for point in metric.data.data_points:
self.assertEqual(point.attributes["type"], "to_thread")
self.assertEqual(point.attributes["name"], "multiply")

0 comments on commit 936dc37

Please sign in to comment.