Skip to content

Commit

Permalink
Fix explicit bucket histogram aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Feb 5, 2024
1 parent 66e7d61 commit 292ad01
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,12 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._start_time_unix_nano = start_time_unix_nano
self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max

self._current_value = None
self._min = inf
self._max = -inf
self._sum = 0
Expand All @@ -416,15 +420,19 @@ def _get_empty_bucket_counts(self) -> List[int]:

def aggregate(self, measurement: Measurement) -> None:

value = measurement.value
with self._lock:

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

self._sum += value
value = measurement.value

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._current_value[bisect_left(self._boundaries, value)] += 1
self._sum = self._sum + value

def collect(
self,
Expand All @@ -435,14 +443,8 @@ def collect(
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
sum_ = self._sum
max_ = self._max
min_ = self._min
if aggregation_temporality is AggregationTemporality.DELTA:

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
Expand Down Expand Up @@ -487,31 +489,39 @@ def collect(
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,

self._current_value = None
self._min = inf
self._max = -inf
self._sum = 0
self._previous_collection_start_nano = collection_start_nano

if current_value is None:
return None

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
]

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(self._current_value),
sum=self._sum,
bucket_counts=tuple(self._current_value),
explicit_bounds=self._boundaries,
min=self._min,
max=self._max,
)


# pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from platform import system
from unittest import TestCase

from pytest import mark

from opentelemetry.sdk.metrics import Histogram, MeterProvider
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
InMemoryMetricReader,
)
from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation


class TestExplicitBucketHistogramAggregation(TestCase):

values = [
1.0,
6.0,
11.0,
26.0,
51.0,
76.0,
101.0,
251.0,
501.0,
751.0,
]

@mark.skipif(
system() != "Linux",
reason=(
"Tests fail because Windows time_ns resolution is too low so "
"two different time measurements may end up having the exact same"
"value."
),
)
def test_synchronous_delta_temporality(self):

aggregation = ExplicitBucketHistogramAggregation()

reader = InMemoryMetricReader(
preferred_aggregation={Histogram: aggregation},
preferred_temporality={Histogram: AggregationTemporality.DELTA},
)

provider = MeterProvider(metric_readers=[reader])
meter = provider.get_meter("name", "version")

histogram = meter.create_histogram("histogram")

results = []

for _ in range(10):

results.append(reader.get_metrics_data())

for metrics_data in results:
self.assertIsNone(metrics_data)

results = []

for value in self.values:
histogram.record(value)
results.append(reader.get_metrics_data())

previous_time_unix_nano = (
results[0]
.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.time_unix_nano
)

self.assertEqual(
(
results[0]
.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.bucket_counts
),
(0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
)

self.assertLess(
(
results[0]
.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.start_time_unix_nano
),
previous_time_unix_nano,
)

for index, metrics_data in enumerate(results[1:]):

metric_data = (
metrics_data.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
)

self.assertEqual(
previous_time_unix_nano, metric_data.start_time_unix_nano
)
previous_time_unix_nano = metric_data.time_unix_nano

self.assertEqual(
metric_data.bucket_counts,
tuple([1 if value == index + 2 else 0 for value in range(16)]),
)
self.assertLess(
metric_data.start_time_unix_nano, metric_data.time_unix_nano
)

results = []

for _ in range(10):

results.append(reader.get_metrics_data())

provider.shutdown()

for metrics_data in results:
self.assertIsNone(metrics_data)

@mark.skipif(
system() != "Linux",
reason=(
"Tests fail because Windows time_ns resolution is too low so "
"two different time measurements may end up having the exact same"
"value."
),
)
def test_synchronous_cumulative_temporality(self):

aggregation = ExplicitBucketHistogramAggregation()

reader = InMemoryMetricReader(
preferred_aggregation={Histogram: aggregation},
preferred_temporality={
Histogram: AggregationTemporality.CUMULATIVE
},
)

provider = MeterProvider(metric_readers=[reader])
meter = provider.get_meter("name", "version")

histogram = meter.create_histogram("histogram")

results = []

for _ in range(10):

results.append(reader.get_metrics_data())

for metrics_data in results:
self.assertIsNone(metrics_data)

results = []

for value in self.values:
histogram.record(value)
results.append(reader.get_metrics_data())

start_time_unix_nano = (
results[0]
.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.start_time_unix_nano
)

for index, metrics_data in enumerate(results):

metric_data = (
metrics_data.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
)

self.assertEqual(
start_time_unix_nano, metric_data.start_time_unix_nano
)
self.assertEqual(
metric_data.bucket_counts,
tuple(
[
1
if inner_index <= index + 1 and inner_index > 0
else 0
for inner_index, value in enumerate(range(16))
]
),
)

results = []

for _ in range(10):

results.append(reader.get_metrics_data())

provider.shutdown()

start_time_unix_nano = (
results[0]
.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.start_time_unix_nano
)

for metrics_data in results:

metric_data = (
metrics_data.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
)

self.assertEqual(
start_time_unix_nano, metric_data.start_time_unix_nano
)
self.assertEqual(
metric_data.bucket_counts,
(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0),
)
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,26 @@ def test_histogram_counter_collection(self):

metric_data = in_memory_metric_reader.get_metrics_data()

# FIXME ExplicitBucketHistogramAggregation is resetting counts to zero
# even if aggregation temporality is cumulative.
self.assertEqual(
len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1
len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2
)

self.assertEqual(
(
metric_data.resource_metrics[0]
.scope_metrics[0]
.metrics[0]
.data.data_points[0]
.bucket_counts
),
(0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
)
self.assertEqual(
(
metric_data.resource_metrics[0]
.scope_metrics[0]
.metrics[1]
.data.data_points[0]
.value
),
1,
Expand Down
Loading

0 comments on commit 292ad01

Please sign in to comment.