From dec09451342796029a5c18f054b96ca34ae849d4 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 25 Nov 2021 17:15:10 -0600 Subject: [PATCH] Refactor aggregations Fixes #2305 --- opentelemetry-sdk/setup.cfg | 1 + .../opentelemetry/sdk/_metrics/aggregation.py | 160 +++++++++++++----- .../opentelemetry/sdk/_metrics/measurement.py | 21 ++- .../src/opentelemetry/sdk/_metrics/point.py | 49 ++++++ .../tests/metrics/test_aggregation.py | 68 ++++---- 5 files changed, 219 insertions(+), 80 deletions(-) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py diff --git a/opentelemetry-sdk/setup.cfg b/opentelemetry-sdk/setup.cfg index 5606788199c..ca5f78eeba6 100644 --- a/opentelemetry-sdk/setup.cfg +++ b/opentelemetry-sdk/setup.cfg @@ -45,6 +45,7 @@ include_package_data = True install_requires = opentelemetry-api == 1.8.0 opentelemetry-semantic-conventions == 0.27b0 + dataclasses == 0.8; python_version < '3.7' [options.packages.find] where = src diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 456e4471621..525a46e6dd0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -16,82 +16,138 @@ from collections import OrderedDict from logging import getLogger from math import inf +from threading import Lock +from typing import Generic, Optional, Sequence, TypeVar -from opentelemetry._metrics.instrument import _Monotonic +from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import Gauge, Histogram, PointT, Sum from opentelemetry.util._time import _time_ns +# FIXME this is being copied directly from +# opentelemetry.proto.metrics.v1.metrics_pb2. The only reason for doing so is +# to avoid havinv protobuf as a indirect dependency in the SDK. This +# duplication of code is not ideal. + +AGGREGATION_TEMPORALITY_UNSPECIFIED = 0 +AGGREGATION_TEMPORALITY_DELTA = 1 +AGGREGATION_TEMPORALITY_CUMULATIVE = 2 + +_PointVarT = TypeVar("_PointVarT", bound=PointT) + _logger = getLogger(__name__) -class Aggregation(ABC): +class Aggregation(ABC, Generic[_PointVarT]): + def __init__(self, is_monotonic: bool): + self._value = None + self._is_monotonic = is_monotonic + self._lock = Lock() + + @property + def is_monotonic(self): + return self._is_monotonic + @property def value(self): - return self._value # pylint: disable=no-member + return self._value @abstractmethod - def aggregate(self, value): + def aggregate(self, measurement: Measurement) -> None: pass @abstractmethod - def make_point_and_reset(self): - """ - Atomically return a point for the current value of the metric and reset the internal state. - """ - + def _collect(self) -> Optional[_PointVarT]: + pass -class SumAggregation(Aggregation): - """ - This aggregation collects data for the SDK sum metric point. - """ - def __init__(self, instrument): +class SynchronousSumAggregation(Aggregation[Sum]): + def __init__(self, is_monotonic: bool): + super().__init__(is_monotonic) self._value = 0 + self._start_time_unix_nano = _time_ns() - def aggregate(self, value): - self._value = self._value + value + def aggregate(self, measurement: Measurement) -> None: + with self._lock: + self._value = self._value + measurement.value - def make_point_and_reset(self): - pass + def _collect(self): + now = _time_ns() + with self._lock: + self._value = 0 + self._start_time_unix_nano = now + 1 -class LastValueAggregation(Aggregation): + return Sum( + aggregation_temporality=AGGREGATION_TEMPORALITY_DELTA, + is_monotonic=self._is_monotonic, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=now, + value=self._value, + ) - """ - This aggregation collects data for the SDK sum metric point. - """ - def __init__(self, instrument): - self._value = None - self._timestamp = _time_ns() +class AsynchronousSumAggregation(Aggregation[Sum]): + def __init__(self, is_monotonic: bool): + super().__init__(is_monotonic) + self._start_time_unix_nano = _time_ns() - def aggregate(self, value): - self._value = value - self._timestamp = _time_ns() + def aggregate(self, measurement: Measurement) -> None: + with self._lock: + self._value = measurement.value - def make_point_and_reset(self): - pass + def _collect(self): + if self._value is None: + return None + + return Sum( + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=_time_ns(), + value=self._value, + aggregation_temporality=AGGREGATION_TEMPORALITY_CUMULATIVE, + is_monotonic=self._is_monotonic, + ) -class ExplicitBucketHistogramAggregation(Aggregation): +class LastValueAggregation(Aggregation[Gauge]): + def aggregate(self, measurement: Measurement): + with self._lock: + self._value = measurement.value + + def _collect(self): + if self._value is None: + return None + + return Gauge( + time_unix_nano=_time_ns(), + value=self._value, + ) - """ - This aggregation collects data for the SDK sum metric point. - """ +class ExplicitBucketHistogramAggregation(Aggregation[Histogram]): def __init__( self, - instrument, - *args, - boundaries=(0, 5, 10, 25, 50, 75, 100, 250, 500, 1000), - record_min_max=True, + is_monotonic: bool, + boundaries: Sequence[int] = ( + 0, + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 1000, + ), + record_min_max: bool = True, ): - super().__init__() + super().__init__(is_monotonic) self._value = OrderedDict([(key, 0) for key in (*boundaries, inf)]) self._min = inf self._max = -inf self._sum = 0 - self._instrument = instrument self._record_min_max = record_min_max + self._start_time_unix_nano = _time_ns() @property def min(self): @@ -109,7 +165,7 @@ def max(self): @property def sum(self): - if isinstance(self._instrument, _Monotonic): + if self._is_monotonic: return self._sum _logger.warning( @@ -118,12 +174,15 @@ def sum(self): ) return None - def aggregate(self, value): + def aggregate(self, measurement: Measurement): + + value = measurement.value + if self._record_min_max: self._min = min(self._min, value) self._max = max(self._max, value) - if isinstance(self._instrument, _Monotonic): + if self._is_monotonic: self._sum += value for key in self._value.keys(): @@ -133,5 +192,16 @@ def aggregate(self, value): break - def make_point_and_reset(self): - pass + def _collect(self): + now = _time_ns() + + with self._lock: + self._value = 0 + self._start_time_unix_nano = now + 1 + + return Histogram( + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=now, + value=self._value, + aggregation_temporality=AGGREGATION_TEMPORALITY_CUMULATIVE, + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py index fbaae02c786..13fb5b30abf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py @@ -12,6 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import dataclass +from typing import Optional, Union +from opentelemetry.util.types import Attributes + + +@dataclass(frozen=True) class Measurement: - pass + value: Union[int, float] + attributes: Optional[Attributes] = None + + +# Can't make Exemplar a subclass of Measurement because that makes +# time_unix_nano to be defined after attribute, a keyword argument +@dataclass(frozen=True) +class Exemplar: + value: Union[int, float] + time_unix_nano: int + span_id: int + trace_id: int + attributes: Optional[Attributes] = None + filtered_attributes: Optional[Attributes] = None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py new file mode 100644 index 00000000000..96d935a0753 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py @@ -0,0 +1,49 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Union + + +@dataclass(frozen=True) +class Sum: + # NumberDataPoint attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + # Sum attributes + aggregation_temporality: int + is_monotonic: bool + + +@dataclass(frozen=True) +class Gauge: + # NumberDataPoint attributes + # start_time_unix_nano is not added here because the proto says it is to + # be ignored. + time_unix_nano: int + value: Union[int, float] + + +@dataclass(frozen=True) +class Histogram: + # NumberDataPoint attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + # Histogram attributes + aggregation_temporality: int + + +PointT = Union[Sum, Gauge, Histogram] diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 1c4fa1420e6..91be5514ddb 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -16,34 +16,34 @@ from logging import WARNING from math import inf from unittest import TestCase -from unittest.mock import Mock from opentelemetry.sdk._metrics.aggregation import ( ExplicitBucketHistogramAggregation, LastValueAggregation, - SumAggregation, + SynchronousSumAggregation, ) +from opentelemetry.sdk._metrics.measurement import Measurement -class TestSumAggregation(TestCase): +class TestSynchronousSumAggregation(TestCase): def test_aggregate(self): """ - `SumAggregation` collects data for sum metric points + `SynchronousSumAggregation` collects data for sum metric points """ - sum_aggregation = SumAggregation(Mock()) + sum_aggregation = SynchronousSumAggregation(True) - sum_aggregation.aggregate(1) - sum_aggregation.aggregate(2) - sum_aggregation.aggregate(3) + sum_aggregation.aggregate(Measurement(1)) + sum_aggregation.aggregate(Measurement(2)) + sum_aggregation.aggregate(Measurement(3)) self.assertEqual(sum_aggregation.value, 6) - sum_aggregation = SumAggregation(Mock()) + sum_aggregation = SynchronousSumAggregation(True) - sum_aggregation.aggregate(1) - sum_aggregation.aggregate(-2) - sum_aggregation.aggregate(3) + sum_aggregation.aggregate(Measurement(1)) + sum_aggregation.aggregate(Measurement(-2)) + sum_aggregation.aggregate(Measurement(3)) self.assertEqual(sum_aggregation.value, 2) @@ -55,15 +55,15 @@ def test_aggregate(self): temporality """ - last_value_aggregation = LastValueAggregation(Mock()) + last_value_aggregation = LastValueAggregation(True) - last_value_aggregation.aggregate(1) + last_value_aggregation.aggregate(Measurement(1)) self.assertEqual(last_value_aggregation.value, 1) - last_value_aggregation.aggregate(2) + last_value_aggregation.aggregate(Measurement(2)) self.assertEqual(last_value_aggregation.value, 2) - last_value_aggregation.aggregate(3) + last_value_aggregation.aggregate(Measurement(3)) self.assertEqual(last_value_aggregation.value, 3) @@ -74,14 +74,14 @@ def test_aggregate(self): """ explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(Mock()) + ExplicitBucketHistogramAggregation(True) ) - explicit_bucket_histogram_aggregation.aggregate(-1) - explicit_bucket_histogram_aggregation.aggregate(2) - explicit_bucket_histogram_aggregation.aggregate(7) - explicit_bucket_histogram_aggregation.aggregate(8) - explicit_bucket_histogram_aggregation.aggregate(9999) + explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(7)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(8)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(9999)) self.assertEqual(explicit_bucket_histogram_aggregation.value[0], -1) self.assertEqual(explicit_bucket_histogram_aggregation.value[5], 2) @@ -97,27 +97,27 @@ def test_min_max(self): """ explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(Mock()) + ExplicitBucketHistogramAggregation(True) ) - explicit_bucket_histogram_aggregation.aggregate(-1) - explicit_bucket_histogram_aggregation.aggregate(2) - explicit_bucket_histogram_aggregation.aggregate(7) - explicit_bucket_histogram_aggregation.aggregate(8) - explicit_bucket_histogram_aggregation.aggregate(9999) + explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(7)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(8)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(9999)) self.assertEqual(explicit_bucket_histogram_aggregation.min, -1) self.assertEqual(explicit_bucket_histogram_aggregation.max, 9999) explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(Mock(), record_min_max=False) + ExplicitBucketHistogramAggregation(True, record_min_max=False) ) - explicit_bucket_histogram_aggregation.aggregate(-1) - explicit_bucket_histogram_aggregation.aggregate(2) - explicit_bucket_histogram_aggregation.aggregate(7) - explicit_bucket_histogram_aggregation.aggregate(8) - explicit_bucket_histogram_aggregation.aggregate(9999) + explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(7)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(8)) + explicit_bucket_histogram_aggregation.aggregate(Measurement(9999)) with self.assertLogs(level=WARNING): self.assertEqual(explicit_bucket_histogram_aggregation.min, inf)