diff --git a/opentelemetry-sdk/pyproject.toml b/opentelemetry-sdk/pyproject.toml index 4e3da94bb5e..0f41503e0a9 100644 --- a/opentelemetry-sdk/pyproject.toml +++ b/opentelemetry-sdk/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "opentelemetry-api == 1.20.0.dev", "opentelemetry-semantic-conventions == 0.41b0.dev", "typing-extensions >= 3.7.4", + "ipdb" ] [project.optional-dependencies] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index ab4645c82f3..d5d8064293e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -16,7 +16,7 @@ from logging import getLogger from threading import Lock from time import time_ns -from typing import Dict, List, Sequence +from typing import Dict, List, Sequence, Optional from opentelemetry.metrics import Instrument from opentelemetry.sdk.metrics._internal.aggregation import ( @@ -126,7 +126,7 @@ def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nanos: int, - ) -> Sequence[DataPointT]: + ) -> Optional[Sequence[DataPointT]]: data_points: List[DataPointT] = [] with self._lock: @@ -136,4 +136,6 @@ def collect( ) if data_point is not None: data_points.append(data_point) - return data_points + if data_points: + return data_points + return None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 5bd94d5aacc..0568270ae6b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -322,10 +322,14 @@ def collect(self, timeout_millis: float = 10_000) -> None: ) return - self._receive_metrics( - self._collect(self, timeout_millis=timeout_millis), - timeout_millis=timeout_millis, - ) + metrics = self._collect(self, timeout_millis=timeout_millis) + + if metrics is not None: + + self._receive_metrics( + metrics, + timeout_millis=timeout_millis, + ) @final def _set_collect_callback( @@ -515,8 +519,7 @@ def _receive_metrics( timeout_millis: float = 10_000, **kwargs, ) -> None: - if metrics_data is None: - return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: with self._export_lock: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 9daf1eff461..fe9936f51fc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -17,7 +17,7 @@ from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import Iterable, List, Mapping +from typing import Iterable, List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics @@ -94,7 +94,7 @@ def collect( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, - ) -> Iterable[Metric]: + ) -> Optional[Iterable[Metric]]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] @@ -123,4 +123,6 @@ def collect( for measurement in measurements: metric_reader_storage.consume_measurement(measurement) - return self._reader_storages[metric_reader].collect() + result = self._reader_storages[metric_reader].collect() + + return result diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index bef57eaab09..453d11a4ed5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -152,6 +152,13 @@ def collect(self) -> MetricsData: for view_instrument_match in view_instrument_matches: + data_points = view_instrument_match.collect( + aggregation_temporality, collection_start_nanos + ) + + if data_points is None: + continue + if isinstance( # pylint: disable=protected-access view_instrument_match._aggregation, @@ -159,9 +166,7 @@ def collect(self) -> MetricsData: ): data = Sum( aggregation_temporality=aggregation_temporality, - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, is_monotonic=isinstance( instrument, (Counter, ObservableCounter) ), @@ -171,20 +176,14 @@ def collect(self) -> MetricsData: view_instrument_match._aggregation, _LastValueAggregation, ): - data = Gauge( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ) - ) + data = Gauge(data_points=data_points) elif isinstance( # pylint: disable=protected-access view_instrument_match._aggregation, _ExplicitBucketHistogramAggregation, ): data = Histogram( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, aggregation_temporality=aggregation_temporality, ) elif isinstance( @@ -200,9 +199,7 @@ def collect(self) -> MetricsData: _ExponentialBucketHistogramAggregation, ): data = ExponentialHistogram( - data_points=view_instrument_match.collect( - aggregation_temporality, collection_start_nanos - ), + data_points=data_points, aggregation_temporality=aggregation_temporality, ) @@ -216,32 +213,38 @@ def collect(self) -> MetricsData: ) ) - if instrument.instrumentation_scope not in ( - instrumentation_scope_scope_metrics - ): - instrumentation_scope_scope_metrics[ - instrument.instrumentation_scope - ] = ScopeMetrics( - scope=instrument.instrumentation_scope, - metrics=metrics, - schema_url=instrument.instrumentation_scope.schema_url, + if metrics: + + if instrument.instrumentation_scope not in ( + instrumentation_scope_scope_metrics + ): + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ] = ScopeMetrics( + scope=instrument.instrumentation_scope, + metrics=metrics, + schema_url=instrument.instrumentation_scope.schema_url, + ) + else: + instrumentation_scope_scope_metrics[ + instrument.instrumentation_scope + ].metrics.extend(metrics) + + if instrumentation_scope_scope_metrics: + + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self._sdk_config.resource, + scope_metrics=list( + instrumentation_scope_scope_metrics.values() + ), + schema_url=self._sdk_config.resource.schema_url, ) - else: - instrumentation_scope_scope_metrics[ - instrument.instrumentation_scope - ].metrics.extend(metrics) - - return MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=self._sdk_config.resource, - scope_metrics=list( - instrumentation_scope_scope_metrics.values() - ), - schema_url=self._sdk_config.resource.schema_url, - ) - ] - ) + ] + ) + + return None def _handle_view_instrument_match( self, diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py b/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py index 60c4227c3b2..1b3283717ae 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_console_exporter.py @@ -72,3 +72,19 @@ def test_console_exporter(self): self.assertEqual(metrics["attributes"], {"a": "b"}) self.assertEqual(metrics["value"], 1) + + def test_console_exporter_no_export(self): + + output = StringIO() + exporter = ConsoleMetricExporter(out=output) + reader = PeriodicExportingMetricReader( + exporter, export_interval_millis=100 + ) + provider = MeterProvider(metric_readers=[reader]) + provider.shutdown() + + output.seek(0) + actual = "".join(output.readlines()) + expected = "" + + self.assertEqual(actual, expected) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index ad90fe9a298..d022456415b 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -31,15 +31,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value1"}) counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - self.assertEqual( - ( - reader.get_metrics_data() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - [], - ) + self.assertIsNone(reader.get_metrics_data()) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py index 045afe0b298..bbc67eac309 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py @@ -74,6 +74,15 @@ class TestExporterConcurrency(ConcurrencyTestBase): > be called again only after the current call returns. https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch + + This test also tests that a thread that calls the a + ``MetricReader.collect`` method using an asynchronous instrument is able + to perform two actions in the same thread lock space (without it being + interrupted by another thread): + + 1. Consume the measurement produced by the callback associated to the + asynchronous instrument. + 2. Export the measurement mentioned in the step above. """ def test_exporter_not_called_concurrently(self): @@ -84,7 +93,11 @@ def test_exporter_not_called_concurrently(self): ) meter_provider = MeterProvider(metric_readers=[reader]) + counter_cb_counter = 0 + def counter_cb(options: CallbackOptions): + nonlocal counter_cb_counter + counter_cb_counter += 1 yield Observation(2) meter_provider.get_meter(__name__).create_observable_counter( @@ -97,6 +110,7 @@ def test_many_threads(): self.run_with_many_threads(test_many_threads, num_threads=100) + self.assertEqual(counter_cb_counter, 100) # no thread should be in export() now self.assertEqual(exporter.count_in_export, 0) # should be one call for each thread diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index 97b5532feae..1da6d5bcf60 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -314,15 +314,7 @@ def test_drop_aggregation(self): ) metric_reader_storage.consume_measurement(Measurement(1, counter)) - self.assertEqual( - [], - ( - metric_reader_storage.collect() - .resource_metrics[0] - .scope_metrics[0] - .metrics - ), - ) + self.assertIsNone(metric_reader_storage.collect()) def test_same_collection_start(self): diff --git a/opentelemetry-sdk/tests/metrics/test_no_data.py b/opentelemetry-sdk/tests/metrics/test_no_data.py new file mode 100644 index 00000000000..aa6a94fe0b0 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_no_data.py @@ -0,0 +1,19 @@ +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, +) +from unittest import TestCase +from time import sleep + + +class TestNoData(TestCase): + + def test_no_data(self): + reader = PeriodicExportingMetricReader( + ConsoleMetricExporter(), + export_interval_millis=10 + ) + provider = MeterProvider(metric_readers=[reader]) + provider + sleep(1)