Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor aggregations #2308

Merged
merged 36 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ba80a97
Refactor aggregations
ocelotl Nov 25, 2021
bbe7097
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py
ocelotl Dec 15, 2021
fa2f9ce
Removing is_monotonic property
ocelotl Dec 15, 2021
85965d3
Make collect public
ocelotl Dec 15, 2021
7f185b4
Remove warning
ocelotl Dec 15, 2021
4803336
Add docstring
ocelotl Dec 15, 2021
eba1d82
Removing properties
ocelotl Dec 15, 2021
63fb67b
Change to delta
ocelotl Dec 15, 2021
3a12e3f
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py
ocelotl Dec 15, 2021
6452eae
Remove comments
ocelotl Dec 15, 2021
322e531
Remove Exemplars
ocelotl Dec 15, 2021
040f119
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py
ocelotl Dec 15, 2021
628a067
Adding AggregationTemporality enum
ocelotl Dec 15, 2021
d0787db
Removing Generic
ocelotl Dec 15, 2021
6965cad
Remove types from aggregations
ocelotl Dec 15, 2021
b1fd1fa
Fix lint
ocelotl Dec 15, 2021
e2c4bad
Fix histogram test cases
ocelotl Dec 15, 2021
c313eb4
Revert "Removing Generic"
ocelotl Dec 15, 2021
3712d38
Revert "Remove types from aggregations"
ocelotl Dec 15, 2021
c0d7f56
Fix self._value
ocelotl Dec 15, 2021
74d6c8d
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
ocelotl Dec 16, 2021
59bd4aa
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
ocelotl Dec 16, 2021
e0a78db
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
ocelotl Dec 16, 2021
a347bb4
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
ocelotl Dec 16, 2021
8595ad8
Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
ocelotl Dec 16, 2021
eb67095
Use self._boundaries
ocelotl Dec 16, 2021
38ebe4a
Remove aggregation temporality prefix
ocelotl Dec 16, 2021
66ce908
Add MonotonicitySensitiveAggregation
ocelotl Dec 17, 2021
6c84c66
Add collect test cases
ocelotl Dec 17, 2021
8ab680d
Use assertEqual instead
ocelotl Dec 17, 2021
0ee91c4
Add delay
ocelotl Dec 17, 2021
3c68beb
Use Aware instead of Sensitive
ocelotl Dec 17, 2021
c40bfd6
Rename is_monotonic
ocelotl Dec 21, 2021
e511c96
Make Histogram non monotonicity aware
ocelotl Jan 11, 2022
0ebac64
Update SHA
ocelotl Jan 11, 2022
4fafc44
Merge branch 'main' into issue_2305
ocelotl Jan 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ install_requires =
opentelemetry-api == 1.8.0
opentelemetry-semantic-conventions == 0.27b0
setuptools >= 16.0
dataclasses == 0.8; python_version < '3.7'

[options.packages.find]
where = src
Expand Down
206 changes: 138 additions & 68 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,124 +14,194 @@

from abc import ABC, abstractmethod
from collections import OrderedDict
from enum import IntEnum
from logging import getLogger
from math import inf
from threading import Lock
from typing import Generic, Optional, Sequence, TypeVar

from opentelemetry._metrics.instrument import _Monotonic
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Histogram, PointT, Sum
from opentelemetry.util._time import _time_ns


class AggregationTemporality(IntEnum):
UNSPECIFIED = 0
DELTA = 1
CUMULATIVE = 2


_PointVarT = TypeVar("_PointVarT", bound=PointT)

_logger = getLogger(__name__)


class Aggregation(ABC):
@property
def value(self):
return self._value # pylint: disable=no-member
class _InstrumentMonotonicityAwareAggregation:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, instrument_is_monotonic: bool):
self._instrument_is_monotonic = instrument_is_monotonic
super().__init__()


class Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()

@abstractmethod
def aggregate(self, value):
def aggregate(self, measurement: Measurement) -> None:
pass

@abstractmethod
def make_point_and_reset(self):
def collect(self) -> Optional[_PointVarT]:
pass


class SynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = 0
self._start_time_unix_nano = _time_ns()
aabmass marked this conversation as resolved.
Show resolved Hide resolved

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = self._value + measurement.value

def collect(self) -> Optional[Sum]:
"""
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
Atomically return a point for the current value of the metric and reset the internal state.
Atomically return a point for the current value of the metric and
reset the aggregation value.
lzchen marked this conversation as resolved.
Show resolved Hide resolved
"""
now = _time_ns()

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

class SumAggregation(Aggregation):
"""
This aggregation collects data for the SDK sum metric point.
"""
self._value = 0
self._start_time_unix_nano = now + 1

def __init__(self, instrument):
self._value = 0
return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)

def aggregate(self, value):
self._value = self._value + value

def make_point_and_reset(self):
pass
class AsynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = None
self._start_time_unix_nano = _time_ns()

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = measurement.value

class LastValueAggregation(Aggregation):
def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
return None

return Sum(
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=_time_ns(),
value=self._value,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=self._instrument_is_monotonic,
)

"""
This aggregation collects data for the SDK sum metric point.
"""

def __init__(self, instrument):
class LastValueAggregation(Aggregation[Gauge]):
def __init__(self):
super().__init__()
self._value = None
self._timestamp = _time_ns()

def aggregate(self, value):
self._value = value
self._timestamp = _time_ns()

def make_point_and_reset(self):
pass
def aggregate(self, measurement: Measurement):
with self._lock:
self._value = measurement.value

def collect(self) -> Optional[Gauge]:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
return None

class ExplicitBucketHistogramAggregation(Aggregation):
return Gauge(
time_unix_nano=_time_ns(),
value=self._value,
)

"""
This aggregation collects data for the SDK sum metric point.
"""

class ExplicitBucketHistogramAggregation(Aggregation[Histogram]):
def __init__(
self,
instrument,
*args,
boundaries=(0, 5, 10, 25, 50, 75, 100, 250, 500, 1000),
record_min_max=True,
boundaries: Sequence[int] = (
0,
5,
10,
25,
50,
75,
100,
250,
500,
1000,
),
record_min_max: bool = True,
):
super().__init__()
self._value = OrderedDict([(key, 0) for key in (*boundaries, inf)])
self._min = inf
self._max = -inf
self._sum = 0
self._instrument = instrument
self._record_min_max = record_min_max
self._start_time_unix_nano = _time_ns()
self._boundaries = boundaries

@property
def min(self):
if not self._record_min_max:
_logger.warning("Min is not being recorded")

return self._min
def aggregate(self, measurement: Measurement) -> None:

@property
def max(self):
if not self._record_min_max:
_logger.warning("Max is not being recorded")
value = measurement.value

return self._max

@property
def sum(self):
if isinstance(self._instrument, _Monotonic):
return self._sum

_logger.warning(
"Sum is not filled out when the associated "
"instrument is not monotonic"
)
return None

def aggregate(self, value):
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

if isinstance(self._instrument, _Monotonic):
self._sum += value
self._sum += value

for key in self._value.keys():

if value < key:
self._value[key] = self._value[key] + value
self._value[key] = self._value[key] + 1

break

def make_point_and_reset(self):
pass
def collect(self) -> Optional[Histogram]:
"""
Atomically return a point for the current value of the metric.
"""
now = _time_ns()

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

self._value = OrderedDict(
[(key, 0) for key in (*self._boundaries, inf)]
)
self._start_time_unix_nano = now + 1

return Histogram(
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
bucket_counts=tuple(value.values()),
explicit_bounds=self._boundaries,
aggregation_temporality=AggregationTemporality.DELTA,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Union

from opentelemetry.util.types import Attributes


@dataclass(frozen=True)
class Measurement:
pass
value: Union[int, float]
attributes: Attributes = None
43 changes: 43 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Sequence, Union


@dataclass(frozen=True)
class Sum:
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]
aggregation_temporality: int
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
is_monotonic: bool


@dataclass(frozen=True)
class Gauge:
time_unix_nano: int
value: Union[int, float]


@dataclass(frozen=True)
class Histogram:
start_time_unix_nano: int
time_unix_nano: int
bucket_counts: Sequence[int]
explicit_bounds: Sequence[float]
aggregation_temporality: int


PointT = Union[Sum, Gauge, Histogram]
Loading