From 10dfb5f22b4ea7c06ece6dd1920cc7829dd791a8 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 23 Nov 2023 18:47:11 -0600 Subject: [PATCH] Revert "Fix explicit bucket histogram aggregation" This reverts commit f1c6683cbf48ad9acdfce8be24b1dbafaa7bc918. --- .../sdk/metrics/_internal/aggregation.py | 84 +++--- ...t_explicit_bucket_histogram_aggregation.py | 250 ------------------ .../integration_test/test_histogram_export.py | 15 +- .../tests/metrics/test_aggregation.py | 8 +- 4 files changed, 44 insertions(+), 313 deletions(-) delete mode 100644 opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index cb55bed942e..62ba091ebfe 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -397,12 +397,8 @@ def __init__( record_min_max: bool = True, ): super().__init__(attributes) - - self._start_time_unix_nano = start_time_unix_nano self._boundaries = tuple(boundaries) - self._record_min_max = record_min_max - - self._current_value = None + self._bucket_counts = self._get_empty_bucket_counts() self._min = inf self._max = -inf self._sum = 0 @@ -419,19 +415,15 @@ def _get_empty_bucket_counts(self) -> List[int]: def aggregate(self, measurement: Measurement) -> None: - with self._lock: + value = measurement.value - if self._current_value is None: - self._current_value = self._get_empty_bucket_counts() + if self._record_min_max: + self._min = min(self._min, value) + self._max = max(self._max, value) - value = measurement.value + self._sum += value - if self._record_min_max: - self._min = min(self._min, value) - self._max = max(self._max, value) - - self._current_value[bisect_left(self._boundaries, value)] += 1 - self._sum = self._sum + value + self._bucket_counts[bisect_left(self._boundaries, value)] += 1 def collect( self, @@ -442,8 +434,14 @@ def collect( Atomically return a point for the current value of the metric. """ with self._lock: + if not any(self._bucket_counts): + return None - if aggregation_temporality is AggregationTemporality.DELTA: + bucket_counts = self._bucket_counts + start_time_unix_nano = self._start_time_unix_nano + sum_ = self._sum + max_ = self._max + min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = collection_start_nano @@ -488,39 +486,31 @@ def collect( current_point.bucket_counts, self._previous_point.bucket_counts, ) - - self._current_value = None - self._min = inf - self._max = -inf - self._sum = 0 - self._previous_collection_start_nano = collection_start_nano - - if current_value is None: - return None - - return HistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=previous_collection_start_nano, - time_unix_nano=collection_start_nano, - count=sum(current_value), - sum=sum_, - bucket_counts=tuple(current_value), - explicit_bounds=self._boundaries, - min=min_, - max=max_, + ] + else: + start_time_unix_nano = self._previous_point.time_unix_nano + sum_ = current_point.sum - self._previous_point.sum + bucket_counts = [ + curr_count - prev_count + for curr_count, prev_count in zip( + current_point.bucket_counts, + self._previous_point.bucket_counts, ) + ] - return HistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=self._start_time_unix_nano, - time_unix_nano=collection_start_nano, - count=sum(self._current_value), - sum=self._sum, - bucket_counts=tuple(self._current_value), - explicit_bounds=self._boundaries, - min=self._min, - max=self._max, - ) + current_point = HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=current_point.explicit_bounds, + min=min_, + max=max_, + ) + self._previous_point = current_point + return current_point # pylint: disable=protected-access diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py deleted file mode 100644 index 052c5b4edbc..00000000000 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py +++ /dev/null @@ -1,250 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from platform import system -from unittest import TestCase - -from pytest import mark - -from opentelemetry.sdk.metrics import Histogram, MeterProvider -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, - InMemoryMetricReader, -) -from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation - - -class TestExplicitBucketHistogramAggregation(TestCase): - - values = [ - 1.0, - 6.0, - 11.0, - 26.0, - 51.0, - 76.0, - 101.0, - 251.0, - 501.0, - 751.0, - ] - - @mark.skipif( - system() != "Linux", - reason=( - "Tests fail because Windows time_ns resolution is too low so " - "two different time measurements may end up having the exact same" - "value." - ), - ) - def test_synchronous_delta_temporality(self): - - aggregation = ExplicitBucketHistogramAggregation() - - reader = InMemoryMetricReader( - preferred_aggregation={Histogram: aggregation}, - preferred_temporality={Histogram: AggregationTemporality.DELTA}, - ) - - provider = MeterProvider(metric_readers=[reader]) - meter = provider.get_meter("name", "version") - - histogram = meter.create_histogram("histogram") - - results = [] - - for _ in range(10): - - results.append(reader.get_metrics_data()) - - for metrics_data in results: - self.assertIsNone(metrics_data) - - results = [] - - for value in self.values: - histogram.record(value) - results.append(reader.get_metrics_data()) - - previous_time_unix_nano = ( - results[0] - .resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - .time_unix_nano - ) - - self.assertEqual( - ( - results[0] - .resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - .bucket_counts - ), - (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), - ) - - self.assertLess( - ( - results[0] - .resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - .start_time_unix_nano - ), - previous_time_unix_nano, - ) - - for index, metrics_data in enumerate(results[1:]): - - metric_data = ( - metrics_data.resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - ) - - self.assertEqual( - previous_time_unix_nano, metric_data.start_time_unix_nano - ) - previous_time_unix_nano = metric_data.time_unix_nano - - self.assertEqual( - metric_data.bucket_counts, - tuple([1 if value == index + 2 else 0 for value in range(16)]), - ) - self.assertLess( - metric_data.start_time_unix_nano, metric_data.time_unix_nano - ) - - results = [] - - for _ in range(10): - - results.append(reader.get_metrics_data()) - - provider.shutdown() - - for metrics_data in results: - self.assertIsNone(metrics_data) - - @mark.skipif( - system() != "Linux", - reason=( - "Tests fail because Windows time_ns resolution is too low so " - "two different time measurements may end up having the exact same" - "value." - ), - ) - def test_synchronous_cumulative_temporality(self): - - aggregation = ExplicitBucketHistogramAggregation() - - reader = InMemoryMetricReader( - preferred_aggregation={Histogram: aggregation}, - preferred_temporality={ - Histogram: AggregationTemporality.CUMULATIVE - }, - ) - - provider = MeterProvider(metric_readers=[reader]) - meter = provider.get_meter("name", "version") - - histogram = meter.create_histogram("histogram") - - results = [] - - for _ in range(10): - - results.append(reader.get_metrics_data()) - - for metrics_data in results: - self.assertIsNone(metrics_data) - - results = [] - - for value in self.values: - histogram.record(value) - results.append(reader.get_metrics_data()) - - start_time_unix_nano = ( - results[0] - .resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - .start_time_unix_nano - ) - - for index, metrics_data in enumerate(results): - - metric_data = ( - metrics_data.resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - ) - - self.assertEqual( - start_time_unix_nano, metric_data.start_time_unix_nano - ) - self.assertEqual( - metric_data.bucket_counts, - tuple( - [ - 1 - if inner_index <= index + 1 and inner_index > 0 - else 0 - for inner_index, value in enumerate(range(16)) - ] - ), - ) - - results = [] - - for _ in range(10): - - results.append(reader.get_metrics_data()) - - provider.shutdown() - - start_time_unix_nano = ( - results[0] - .resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - .start_time_unix_nano - ) - - for metrics_data in results: - - metric_data = ( - metrics_data.resource_metrics[0] - .scope_metrics[0] - .metrics[0] - .data.data_points[0] - ) - - self.assertEqual( - start_time_unix_nano, metric_data.start_time_unix_nano - ) - self.assertEqual( - metric_data.bucket_counts, - (0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0), - ) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py index eef81524d18..81d419819a4 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py @@ -65,26 +65,17 @@ def test_histogram_counter_collection(self): metric_data = in_memory_metric_reader.get_metrics_data() + # FIXME ExplicitBucketHistogramAggregation is resetting counts to zero + # even if aggregation temporality is cumulative. self.assertEqual( - len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2 + len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1 ) - self.assertEqual( ( metric_data.resource_metrics[0] .scope_metrics[0] .metrics[0] .data.data_points[0] - .bucket_counts - ), - (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), - ) - self.assertEqual( - ( - metric_data.resource_metrics[0] - .scope_metrics[0] - .metrics[1] - .data.data_points[0] .value ), 1, diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 927422538c5..b7cfc63cd4f 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -283,22 +283,22 @@ def test_aggregate(self): # The first bucket keeps count of values between (-inf, 0] (-1 and 0) self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[0], 2 + explicit_bucket_histogram_aggregation._bucket_counts[0], 2 ) # The second bucket keeps count of values between (0, 2] (1 and 2) self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[1], 2 + explicit_bucket_histogram_aggregation._bucket_counts[1], 2 ) # The third bucket keeps count of values between (2, 4] (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[2], 2 + explicit_bucket_histogram_aggregation._bucket_counts[2], 2 ) # The fourth bucket keeps count of values between (4, inf) (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[3], 1 + explicit_bucket_histogram_aggregation._bucket_counts[3], 1 ) histo = explicit_bucket_histogram_aggregation.collect(