diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 3adf3327f63..eca29fb009c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -398,8 +398,12 @@ def __init__( record_min_max: bool = True, ): super().__init__(attributes) + + self._start_time_unix_nano = start_time_unix_nano self._boundaries = tuple(boundaries) - self._bucket_counts = self._get_empty_bucket_counts() + self._record_min_max = record_min_max + + self._current_value = None self._min = inf self._max = -inf self._sum = 0 @@ -416,15 +420,19 @@ def _get_empty_bucket_counts(self) -> List[int]: def aggregate(self, measurement: Measurement) -> None: - value = measurement.value + with self._lock: - if self._record_min_max: - self._min = min(self._min, value) - self._max = max(self._max, value) + if self._current_value is None: + self._current_value = self._get_empty_bucket_counts() - self._sum += value + value = measurement.value - self._bucket_counts[bisect_left(self._boundaries, value)] += 1 + if self._record_min_max: + self._min = min(self._min, value) + self._max = max(self._max, value) + + self._current_value[bisect_left(self._boundaries, value)] += 1 + self._sum = self._sum + value def collect( self, @@ -435,14 +443,8 @@ def collect( Atomically return a point for the current value of the metric. """ with self._lock: - if not any(self._bucket_counts): - return None - bucket_counts = self._bucket_counts - start_time_unix_nano = self._start_time_unix_nano - sum_ = self._sum - max_ = self._max - min_ = self._min + if aggregation_temporality is AggregationTemporality.DELTA: self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = collection_start_nano @@ -487,31 +489,39 @@ def collect( current_point.bucket_counts, self._previous_point.bucket_counts, ) - ] - else: - start_time_unix_nano = self._previous_point.time_unix_nano - sum_ = current_point.sum - self._previous_point.sum - bucket_counts = [ - curr_count - prev_count - for curr_count, prev_count in zip( - current_point.bucket_counts, - self._previous_point.bucket_counts, + + self._current_value = None + self._min = inf + self._max = -inf + self._sum = 0 + self._previous_collection_start_nano = collection_start_nano + + if current_value is None: + return None + + return HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=sum(current_value), + sum=sum_, + bucket_counts=tuple(current_value), + explicit_bounds=self._boundaries, + min=min_, + max=max_, ) - ] - current_point = HistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - count=sum(bucket_counts), - sum=sum_, - bucket_counts=tuple(bucket_counts), - explicit_bounds=current_point.explicit_bounds, - min=min_, - max=max_, - ) - self._previous_point = current_point - return current_point + return HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=sum(self._current_value), + sum=self._sum, + bucket_counts=tuple(self._current_value), + explicit_bounds=self._boundaries, + min=self._min, + max=self._max, + ) # pylint: disable=protected-access diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py new file mode 100644 index 00000000000..052c5b4edbc --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py @@ -0,0 +1,250 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from platform import system +from unittest import TestCase + +from pytest import mark + +from opentelemetry.sdk.metrics import Histogram, MeterProvider +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + InMemoryMetricReader, +) +from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation + + +class TestExplicitBucketHistogramAggregation(TestCase): + + values = [ + 1.0, + 6.0, + 11.0, + 26.0, + 51.0, + 76.0, + 101.0, + 251.0, + 501.0, + 751.0, + ] + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_delta_temporality(self): + + aggregation = ExplicitBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={Histogram: AggregationTemporality.DELTA}, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for value in self.values: + histogram.record(value) + results.append(reader.get_metrics_data()) + + previous_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .time_unix_nano + ) + + self.assertEqual( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .bucket_counts + ), + (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + ) + + self.assertLess( + ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ), + previous_time_unix_nano, + ) + + for index, metrics_data in enumerate(results[1:]): + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_time_unix_nano, metric_data.start_time_unix_nano + ) + previous_time_unix_nano = metric_data.time_unix_nano + + self.assertEqual( + metric_data.bucket_counts, + tuple([1 if value == index + 2 else 0 for value in range(16)]), + ) + self.assertLess( + metric_data.start_time_unix_nano, metric_data.time_unix_nano + ) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_cumulative_temporality(self): + + aggregation = ExplicitBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={ + Histogram: AggregationTemporality.CUMULATIVE + }, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for value in self.values: + histogram.record(value) + results.append(reader.get_metrics_data()) + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for index, metrics_data in enumerate(results): + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual( + metric_data.bucket_counts, + tuple( + [ + 1 + if inner_index <= index + 1 and inner_index > 0 + else 0 + for inner_index, value in enumerate(range(16)) + ] + ), + ) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for metrics_data in results: + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual( + metric_data.bucket_counts, + (0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0), + ) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py index 81d419819a4..eef81524d18 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py @@ -65,17 +65,26 @@ def test_histogram_counter_collection(self): metric_data = in_memory_metric_reader.get_metrics_data() - # FIXME ExplicitBucketHistogramAggregation is resetting counts to zero - # even if aggregation temporality is cumulative. self.assertEqual( - len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1 + len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2 ) + self.assertEqual( ( metric_data.resource_metrics[0] .scope_metrics[0] .metrics[0] .data.data_points[0] + .bucket_counts + ), + (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + ) + self.assertEqual( + ( + metric_data.resource_metrics[0] + .scope_metrics[0] + .metrics[1] + .data.data_points[0] .value ), 1, diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 50561022fb1..0e5a475eea1 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -284,22 +284,22 @@ def test_aggregate(self): # The first bucket keeps count of values between (-inf, 0] (-1 and 0) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[0], 2 + explicit_bucket_histogram_aggregation._current_value[0], 2 ) # The second bucket keeps count of values between (0, 2] (1 and 2) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[1], 2 + explicit_bucket_histogram_aggregation._current_value[1], 2 ) # The third bucket keeps count of values between (2, 4] (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[2], 2 + explicit_bucket_histogram_aggregation._current_value[2], 2 ) # The fourth bucket keeps count of values between (4, inf) (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[3], 1 + explicit_bucket_histogram_aggregation._current_value[3], 1 ) histo = explicit_bucket_histogram_aggregation.collect(