Skip to content

Commit

Permalink
Fix ExplicitBucketHistogramAggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Feb 5, 2024
1 parent f9b7647 commit 5242de4
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 91 deletions.
229 changes: 147 additions & 82 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
boundaries: Sequence[float] = (
0.0,
Expand All @@ -398,33 +399,40 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max

self._start_time_unix_nano = start_time_unix_nano
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._instrument_aggregation_temporality = (
instrument_aggregation_temporality
)

self._current_value = None

self._previous_collection_start_nano = self._start_time_unix_nano
self._previous_cumulative_value = self._get_empty_bucket_counts()

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

value = measurement.value
value = measurement.value

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
self._sum += value

self._sum += value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
self._current_value[bisect_left(self._boundaries, value)] += 1

def collect(
self,
Expand All @@ -434,84 +442,125 @@ def collect(
"""
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
with self._lock:
current_value = self._current_value
sum_ = self._sum
max_ = self._max
min_ = self._min
max_ = self._max

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
self._current_value = None
self._sum = 0
self._min = inf
self._max = -inf

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
):
# This happens when the corresponding instrument for this
# aggregation is synchronous.
if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

if self._previous_point is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
):
self._previous_point = current_point
return current_point
if current_value is None:
return None

max_ = current_point.max
min_ = current_point.min
previous_collection_start_nano = (
self._previous_collection_start_nano
)
self._previous_collection_start_nano = (
collection_start_nano
)

if (
collection_aggregation_temporality
is AggregationTemporality.CUMULATIVE
):
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

if current_value is None:
current_value = self._get_empty_bucket_counts()

self._previous_cumulative_value = [
current_value_element + previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,

# This happens when the corresponding instrument for this
# aggregation is asynchronous.

if current_value is None:
# This happens when the corresponding instrument callback
# does not produce measurements.
return None

if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

result_value = [
current_value_element - previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]

self._previous_cumulative_value = current_value

previous_collection_start_nano = (
self._previous_collection_start_nano
)
]
self._previous_collection_start_nano = collection_start_nano

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(result_value),
sum=sum_,
bucket_counts=tuple(result_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)


# pylint: disable=protected-access
Expand Down Expand Up @@ -1100,7 +1149,11 @@ def _create_aggregation(

if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation(
attributes, start_time_unix_nano
attributes,
instrument_aggregation_temporality=(
AggregationTemporality.CUMULATIVE
),
start_time_unix_nano=start_time_unix_nano,
)

if isinstance(instrument, ObservableGauge):
Expand Down Expand Up @@ -1179,8 +1232,18 @@ def _create_aggregation(
attributes: Attributes,
start_time_unix_nano: int,
) -> _Aggregation:

instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _ExplicitBucketHistogramAggregation(
attributes,
instrument_aggregation_temporality,
start_time_unix_nano,
self._boundaries,
self._record_min_max,
Expand All @@ -1200,16 +1263,18 @@ def _create_aggregation(
start_time_unix_nano: int,
) -> _Aggregation:

temporality = AggregationTemporality.UNSPECIFIED
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
temporality = AggregationTemporality.DELTA
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
temporality = AggregationTemporality.CUMULATIVE
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _SumAggregation(
attributes,
isinstance(instrument, (Counter, ObservableCounter)),
temporality,
instrument_aggregation_temporality,
start_time_unix_nano,
)

Expand Down
31 changes: 22 additions & 9 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ def test_aggregate(self):

explicit_bucket_histogram_aggregation = (
_ExplicitBucketHistogramAggregation(
Mock(), 0, boundaries=[0, 2, 4]
Mock(),
AggregationTemporality.CUMULATIVE,
0,
boundaries=[0, 2, 4],
)
)

Expand All @@ -284,22 +287,22 @@ def test_aggregate(self):

# The first bucket keeps count of values between (-inf, 0] (-1 and 0)
self.assertEqual(
explicit_bucket_histogram_aggregation._bucket_counts[0], 2
explicit_bucket_histogram_aggregation._current_value[0], 2
)

# The second bucket keeps count of values between (0, 2] (1 and 2)
self.assertEqual(
explicit_bucket_histogram_aggregation._bucket_counts[1], 2
explicit_bucket_histogram_aggregation._current_value[1], 2
)

# The third bucket keeps count of values between (2, 4] (3 and 4)
self.assertEqual(
explicit_bucket_histogram_aggregation._bucket_counts[2], 2
explicit_bucket_histogram_aggregation._current_value[2], 2
)

# The fourth bucket keeps count of values between (4, inf) (3 and 4)
self.assertEqual(
explicit_bucket_histogram_aggregation._bucket_counts[3], 1
explicit_bucket_histogram_aggregation._current_value[3], 1
)

histo = explicit_bucket_histogram_aggregation.collect(
Expand All @@ -314,7 +317,9 @@ def test_min_max(self):
"""

explicit_bucket_histogram_aggregation = (
_ExplicitBucketHistogramAggregation(Mock(), 0)
_ExplicitBucketHistogramAggregation(
Mock(), AggregationTemporality.CUMULATIVE, 0
)
)

explicit_bucket_histogram_aggregation.aggregate(measurement(-1))
Expand All @@ -328,7 +333,10 @@ def test_min_max(self):

explicit_bucket_histogram_aggregation = (
_ExplicitBucketHistogramAggregation(
Mock(), 0, record_min_max=False
Mock(),
AggregationTemporality.CUMULATIVE,
0,
record_min_max=False,
)
)

Expand All @@ -348,7 +356,10 @@ def test_collect(self):

explicit_bucket_histogram_aggregation = (
_ExplicitBucketHistogramAggregation(
Mock(), 0, boundaries=[0, 1, 2]
Mock(),
AggregationTemporality.CUMULATIVE,
0,
boundaries=[0, 1, 2],
)
)

Expand Down Expand Up @@ -381,7 +392,9 @@ def test_collect(self):

def test_boundaries(self):
self.assertEqual(
_ExplicitBucketHistogramAggregation(Mock(), 0)._boundaries,
_ExplicitBucketHistogramAggregation(
Mock(), AggregationTemporality.CUMULATIVE, 0
)._boundaries,
(
0.0,
5.0,
Expand Down

0 comments on commit 5242de4

Please sign in to comment.