Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback options with timeout to observable callbacks #2664

Merged
merged 3 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docs/examples/metrics/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
get_meter_provider,
set_meter_provider,
)
from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import (
OTLPMetricExporter,
)
Expand All @@ -17,15 +18,17 @@
set_meter_provider(provider)


def observable_counter_func() -> Iterable[Observation]:
def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(1, {})


def observable_up_down_counter_func() -> Iterable[Observation]:
def observable_up_down_counter_func(
options: CallbackOptions,
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
) -> Iterable[Observation]:
yield Observation(-10, {})


def observable_gauge_func() -> Iterable[Observation]:
def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(9, {})


Expand All @@ -37,7 +40,8 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async Counter
observable_counter = meter.create_observable_counter(
"observable_counter", observable_counter_func
"observable_counter",
[observable_counter_func],
)

# UpDownCounter
Expand All @@ -47,12 +51,12 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async UpDownCounter
observable_updown_counter = meter.create_observable_up_down_counter(
"observable_updown_counter", observable_up_down_counter_func
"observable_updown_counter", [observable_up_down_counter_func]
)

# Histogram
histogram = meter.create_histogram("histogram")
histogram.record(99.9)

# Async Gauge
gauge = meter.create_observable_gauge("gauge", observable_gauge_func)
gauge = meter.create_observable_gauge("gauge", [observable_gauge_func])
15 changes: 9 additions & 6 deletions docs/getting_started/metrics_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Iterable

from opentelemetry._metrics import (
CallbackOptions,
Observation,
get_meter_provider,
set_meter_provider,
Expand All @@ -34,15 +35,17 @@
set_meter_provider(provider)


def observable_counter_func() -> Iterable[Observation]:
def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(1, {})


def observable_up_down_counter_func() -> Iterable[Observation]:
def observable_up_down_counter_func(
options: CallbackOptions,
) -> Iterable[Observation]:
yield Observation(-10, {})


def observable_gauge_func() -> Iterable[Observation]:
def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(9, {})


Expand All @@ -54,7 +57,7 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async Counter
observable_counter = meter.create_observable_counter(
"observable_counter", observable_counter_func
"observable_counter", [observable_counter_func]
)

# UpDownCounter
Expand All @@ -64,12 +67,12 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async UpDownCounter
observable_updown_counter = meter.create_observable_up_down_counter(
"observable_updown_counter", observable_up_down_counter_func
"observable_updown_counter", [observable_up_down_counter_func]
)

# Histogram
histogram = meter.create_histogram("histogram")
histogram.record(99.9)

# Async Gauge
gauge = meter.create_observable_gauge("gauge", observable_gauge_func)
gauge = meter.create_observable_gauge("gauge", [observable_gauge_func])
3 changes: 3 additions & 0 deletions opentelemetry-api/src/opentelemetry/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from opentelemetry._metrics._internal.instrument import (
Asynchronous,
CallbackOptions,
CallbackT,
Counter,
Histogram,
Expand All @@ -71,6 +72,7 @@
Counter,
Synchronous,
Asynchronous,
CallbackOptions,
get_meter_provider,
get_meter,
Histogram,
Expand All @@ -95,6 +97,7 @@
obj.__module__ = __name__

__all__ = [
"CallbackOptions",
"MeterProvider",
"NoOpMeterProvider",
"Meter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,13 @@ def create_observable_counter(
"""Creates an `ObservableCounter` instrument

An observable counter observes a monotonically increasing count by calling provided
callbacks which returns multiple :class:`~opentelemetry._metrics.Observation`.
callbacks which accept a :class:`~opentelemetry._metrics.CallbackOptions` and return
multiple :class:`~opentelemetry._metrics.Observation`.

For example, an observable counter could be used to report system CPU
time periodically. Here is a basic implementation::

def cpu_time_callback() -> Iterable[Observation]:
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
Expand All @@ -308,7 +309,7 @@ def cpu_time_callback() -> Iterable[Observation]:
To reduce memory usage, you can use generator callbacks instead of
building the full list::

def cpu_time_callback() -> Iterable[Observation]:
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
Expand All @@ -322,6 +323,8 @@ def cpu_time_callback() -> Iterable[Observation]:
callbacks, which each should return iterables of :class:`~opentelemetry._metrics.Observation`::

def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observation]]:
# accept options sent in from OpenTelemetry
options = yield
while True:
observations = []
with open("/proc/stat") as procstat:
Expand All @@ -334,7 +337,8 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat
if "nice" in states_to_include:
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
# ... other states
yield observations
# yield the observations and receive the options for next iteration
options = yield observations

meter.create_observable_counter(
"system.cpu.time",
Expand All @@ -343,6 +347,15 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat
description="CPU time"
)

The :class:`~opentelemetry._metrics.CallbackOptions` contain a timeout which the
callback should respect. For example if the callback does asynchronous work, like
making HTTP requests, it should respect the timeout::

def scrape_http_callback(options: CallbackOptions) -> Iterable[Observation]:
r = requests.get('http://scrapethis.com', timeout=options.timeout_millis / 10**3)
for value in r.json():
yield Observation(value)

Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import getLogger
from re import ASCII
from re import compile as re_compile
Expand All @@ -36,19 +37,31 @@
from opentelemetry._metrics._internal.observation import Observation
from opentelemetry.util.types import Attributes

InstrumentT = TypeVar("InstrumentT", bound="Instrument")
CallbackT = Union[
Callable[[], Iterable[Observation]],
Generator[Iterable[Observation], None, None],
]


_logger = getLogger(__name__)

_name_regex = re_compile(r"[a-zA-Z][-.\w]{0,62}", ASCII)
_unit_regex = re_compile(r"\w{0,63}", ASCII)


@dataclass(frozen=True)
class CallbackOptions:
"""Options for the callback

Args:
timeout_millis: Timeout for the callback's execution. If the callback does asynchronous
work (e.g. HTTP requests), it should respect this timeout.
"""

timeout_millis: float = 10_000


InstrumentT = TypeVar("InstrumentT", bound="Instrument")
CallbackT = Union[
Callable[[CallbackOptions], Iterable[Observation]],
Generator[Iterable[Observation], CallbackOptions, None],
]


class Instrument(ABC):
"""Abstract class that serves as base for all instruments."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
# pylint: disable=too-many-ancestors

from logging import getLogger
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
from typing import (
TYPE_CHECKING,
Dict,
Generator,
Iterable,
List,
Optional,
Union,
)

from opentelemetry._metrics import CallbackT
from opentelemetry._metrics import Counter as APICounter
Expand All @@ -26,6 +34,7 @@
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics import UpDownCounter as APIUpDownCounter
from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

Expand Down Expand Up @@ -93,32 +102,41 @@ def __init__(
self._measurement_consumer = measurement_consumer
super().__init__(name, callbacks, unit=unit, description=description)

self._callbacks = []
self._callbacks: List[CallbackT] = []

if callbacks is not None:

for callback in callbacks:
for i, callback in enumerate(callbacks):
aabmass marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(callback, Generator):

def inner(callback=callback) -> Iterable[Measurement]:
return next(callback)
# advance generator to it's first yield
next(callback)

def inner(
options: CallbackOptions,
callback=callback,
) -> Iterable[Measurement]:
try:
return callback.send(options)
except StopIteration:
return []

self._callbacks.append(inner)
else:
self._callbacks.append(callback)

def callback(self) -> Iterable[Measurement]:
def callback(
self, callback_options: CallbackOptions
) -> Iterable[Measurement]:
for callback in self._callbacks:
try:
for api_measurement in callback():
for api_measurement in callback(callback_options):
yield Measurement(
api_measurement.value,
instrument=self,
attributes=api_measurement.attributes,
)
except StopIteration:
pass
except Exception: # pylint: disable=broad-except
_logger.exception(
"Callback failed for instrument %s.", self.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from threading import Lock
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping

from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.sdk._metrics._internal.metric_reader_storage import (
MetricReaderStorage,
)
Expand All @@ -27,7 +28,7 @@
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric

if TYPE_CHECKING:
from opentelemetry.sdk._metrics.instrument import _Asynchronous
from opentelemetry.sdk._metrics._internal.instrument import _Asynchronous


class MeasurementConsumer(ABC):
Expand Down Expand Up @@ -78,8 +79,10 @@ def collect(
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
callback_options = CallbackOptions()
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback():
for measurement in async_instrument.callback(callback_options):
metric_reader_storage.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(
instrument_type_temporality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Generator, Iterable, List
from unittest import TestCase

from opentelemetry._metrics import Instrument, Observation
from opentelemetry._metrics import CallbackOptions, Instrument, Observation
from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.measurement import Measurement

Expand Down Expand Up @@ -138,7 +138,9 @@ def create_measurements_expected(
]

def test_cpu_time_callback(self):
def cpu_time_callback() -> Iterable[Observation]:
def cpu_time_callback(
options: CallbackOptions,
) -> Iterable[Observation]:
procstat = io.StringIO(self.procstat_str)
procstat.readline() # skip the first line
for line in procstat:
Expand Down Expand Up @@ -180,7 +182,7 @@ def cpu_time_callback() -> Iterable[Observation]:
unit="s",
description="CPU time",
)
measurements = list(observable_counter.callback())
measurements = list(observable_counter.callback(CallbackOptions()))
self.assertEqual(
measurements, self.create_measurements_expected(observable_counter)
)
Expand All @@ -189,7 +191,9 @@ def test_cpu_time_generator(self):
def cpu_time_generator() -> Generator[
Iterable[Observation], None, None
]:
options = yield
while True:
self.assertIsInstance(options, CallbackOptions)
measurements = []
procstat = io.StringIO(self.procstat_str)
procstat.readline() # skip the first line
Expand Down Expand Up @@ -250,7 +254,7 @@ def cpu_time_generator() -> Generator[
{"cpu": cpu, "state": "guest_nice"},
)
)
yield measurements
options = yield measurements

meter = MeterProvider().get_meter("name")
observable_counter = meter.create_observable_counter(
Expand All @@ -259,7 +263,7 @@ def cpu_time_generator() -> Generator[
unit="s",
description="CPU time",
)
measurements = list(observable_counter.callback())
measurements = list(observable_counter.callback(CallbackOptions()))
self.assertEqual(
measurements, self.create_measurements_expected(observable_counter)
)
Expand Down
Loading