Skip to content

Commit

Permalink
Refactor aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Dec 17, 2021
1 parent 49e41aa commit dec0945
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 80 deletions.
1 change: 1 addition & 0 deletions opentelemetry-sdk/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ include_package_data = True
install_requires =
opentelemetry-api == 1.8.0
opentelemetry-semantic-conventions == 0.27b0
dataclasses == 0.8; python_version < '3.7'

[options.packages.find]
where = src
Expand Down
160 changes: 115 additions & 45 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,138 @@
from collections import OrderedDict
from logging import getLogger
from math import inf
from threading import Lock
from typing import Generic, Optional, Sequence, TypeVar

from opentelemetry._metrics.instrument import _Monotonic
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Histogram, PointT, Sum
from opentelemetry.util._time import _time_ns

# FIXME this is being copied directly from
# opentelemetry.proto.metrics.v1.metrics_pb2. The only reason for doing so is
# to avoid havinv protobuf as a indirect dependency in the SDK. This
# duplication of code is not ideal.

AGGREGATION_TEMPORALITY_UNSPECIFIED = 0
AGGREGATION_TEMPORALITY_DELTA = 1
AGGREGATION_TEMPORALITY_CUMULATIVE = 2

_PointVarT = TypeVar("_PointVarT", bound=PointT)

_logger = getLogger(__name__)


class Aggregation(ABC):
class Aggregation(ABC, Generic[_PointVarT]):
def __init__(self, is_monotonic: bool):
self._value = None
self._is_monotonic = is_monotonic
self._lock = Lock()

@property
def is_monotonic(self):
return self._is_monotonic

@property
def value(self):
return self._value # pylint: disable=no-member
return self._value

@abstractmethod
def aggregate(self, value):
def aggregate(self, measurement: Measurement) -> None:
pass

@abstractmethod
def make_point_and_reset(self):
"""
Atomically return a point for the current value of the metric and reset the internal state.
"""

def _collect(self) -> Optional[_PointVarT]:
pass

class SumAggregation(Aggregation):
"""
This aggregation collects data for the SDK sum metric point.
"""

def __init__(self, instrument):
class SynchronousSumAggregation(Aggregation[Sum]):
def __init__(self, is_monotonic: bool):
super().__init__(is_monotonic)
self._value = 0
self._start_time_unix_nano = _time_ns()

def aggregate(self, value):
self._value = self._value + value
def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = self._value + measurement.value

def make_point_and_reset(self):
pass
def _collect(self):
now = _time_ns()

with self._lock:
self._value = 0
self._start_time_unix_nano = now + 1

class LastValueAggregation(Aggregation):
return Sum(
aggregation_temporality=AGGREGATION_TEMPORALITY_DELTA,
is_monotonic=self._is_monotonic,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=now,
value=self._value,
)

"""
This aggregation collects data for the SDK sum metric point.
"""

def __init__(self, instrument):
self._value = None
self._timestamp = _time_ns()
class AsynchronousSumAggregation(Aggregation[Sum]):
def __init__(self, is_monotonic: bool):
super().__init__(is_monotonic)
self._start_time_unix_nano = _time_ns()

def aggregate(self, value):
self._value = value
self._timestamp = _time_ns()
def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = measurement.value

def make_point_and_reset(self):
pass
def _collect(self):
if self._value is None:
return None

return Sum(
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=_time_ns(),
value=self._value,
aggregation_temporality=AGGREGATION_TEMPORALITY_CUMULATIVE,
is_monotonic=self._is_monotonic,
)


class ExplicitBucketHistogramAggregation(Aggregation):
class LastValueAggregation(Aggregation[Gauge]):
def aggregate(self, measurement: Measurement):
with self._lock:
self._value = measurement.value

def _collect(self):
if self._value is None:
return None

return Gauge(
time_unix_nano=_time_ns(),
value=self._value,
)

"""
This aggregation collects data for the SDK sum metric point.
"""

class ExplicitBucketHistogramAggregation(Aggregation[Histogram]):
def __init__(
self,
instrument,
*args,
boundaries=(0, 5, 10, 25, 50, 75, 100, 250, 500, 1000),
record_min_max=True,
is_monotonic: bool,
boundaries: Sequence[int] = (
0,
5,
10,
25,
50,
75,
100,
250,
500,
1000,
),
record_min_max: bool = True,
):
super().__init__()
super().__init__(is_monotonic)
self._value = OrderedDict([(key, 0) for key in (*boundaries, inf)])
self._min = inf
self._max = -inf
self._sum = 0
self._instrument = instrument
self._record_min_max = record_min_max
self._start_time_unix_nano = _time_ns()

@property
def min(self):
Expand All @@ -109,7 +165,7 @@ def max(self):

@property
def sum(self):
if isinstance(self._instrument, _Monotonic):
if self._is_monotonic:
return self._sum

_logger.warning(
Expand All @@ -118,12 +174,15 @@ def sum(self):
)
return None

def aggregate(self, value):
def aggregate(self, measurement: Measurement):

value = measurement.value

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

if isinstance(self._instrument, _Monotonic):
if self._is_monotonic:
self._sum += value

for key in self._value.keys():
Expand All @@ -133,5 +192,16 @@ def aggregate(self, value):

break

def make_point_and_reset(self):
pass
def _collect(self):
now = _time_ns()

with self._lock:
self._value = 0
self._start_time_unix_nano = now + 1

return Histogram(
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=now,
value=self._value,
aggregation_temporality=AGGREGATION_TEMPORALITY_CUMULATIVE,
)
21 changes: 20 additions & 1 deletion opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Optional, Union

from opentelemetry.util.types import Attributes


@dataclass(frozen=True)
class Measurement:
pass
value: Union[int, float]
attributes: Optional[Attributes] = None


# Can't make Exemplar a subclass of Measurement because that makes
# time_unix_nano to be defined after attribute, a keyword argument
@dataclass(frozen=True)
class Exemplar:
value: Union[int, float]
time_unix_nano: int
span_id: int
trace_id: int
attributes: Optional[Attributes] = None
filtered_attributes: Optional[Attributes] = None
49 changes: 49 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Union


@dataclass(frozen=True)
class Sum:
# NumberDataPoint attributes
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]
# Sum attributes
aggregation_temporality: int
is_monotonic: bool


@dataclass(frozen=True)
class Gauge:
# NumberDataPoint attributes
# start_time_unix_nano is not added here because the proto says it is to
# be ignored.
time_unix_nano: int
value: Union[int, float]


@dataclass(frozen=True)
class Histogram:
# NumberDataPoint attributes
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]
# Histogram attributes
aggregation_temporality: int


PointT = Union[Sum, Gauge, Histogram]
Loading

0 comments on commit dec0945

Please sign in to comment.