Skip to content

Commit

Permalink
Fix ExplicitBucketHistogramAggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Nov 24, 2023
1 parent 10dfb5f commit 2ca4a99
Showing 1 changed file with 126 additions and 77 deletions.
203 changes: 126 additions & 77 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
boundaries: Sequence[float] = (
0.0,
Expand All @@ -397,33 +398,40 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max

self._start_time_unix_nano = start_time_unix_nano
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._instrument_aggregation_temporality = (
instrument_aggregation_temporality
)

self._current_value = None

self._previous_collection_start_nano = self._start_time_unix_nano
self._previous_cumulative_value = self._get_empty_bucket_counts()

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

value = measurement.value
value = measurement.value

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
self._sum += value

self._sum += value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
self._current_value[bisect_left(self._boundaries, value)] += 1

def collect(
self,
Expand All @@ -433,84 +441,125 @@ def collect(
"""
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
with self._lock:
current_value = self._current_value
sum_ = self._sum
max_ = self._max
min_ = self._min
max_ = self._max

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
self._current_value = None
self._sum = 0
self._min = inf
self._max = -inf

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
):
# This happens when the corresponding instrument for this
# aggregation is synchronous.
if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

if self._previous_point is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
):
self._previous_point = current_point
return current_point
if current_value is None:
return None

max_ = current_point.max
min_ = current_point.min
previous_collection_start_nano = (
self._previous_collection_start_nano
)
self._previous_collection_start_nano = (
collection_start_nano
)

if (
collection_aggregation_temporality
is AggregationTemporality.CUMULATIVE
):
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

if current_value is None:
current_value = self._get_empty_bucket_counts()

self._previous_cumulative_value = [
current_value_element + previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,

# This happens when the corresponding instrument for this
# aggregation is asynchronous.

if current_value is None:
# This happens when the corresponding instrument callback
# does not produce measurements.
return None

if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

result_value = [
current_value_element - previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]

self._previous_cumulative_value = current_value

previous_collection_start_nano = (
self._previous_collection_start_nano
)
]
self._previous_collection_start_nano = collection_start_nano

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(result_value),
sum=sum_,
bucket_counts=tuple(result_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)


# pylint: disable=protected-access
Expand Down

0 comments on commit 2ca4a99

Please sign in to comment.