diff --git a/CHANGELOG.md b/CHANGELOG.md index 796fda683e2..e3dd0a0c56a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +- PeriodicExportingMetricReader will continue if collection times out + ([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100)) + ## Version 1.16.0/0.37b0 (2023-02-17) - Change ``__all__`` to be statically defined. @@ -28,10 +31,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3156](https://github.com/open-telemetry/opentelemetry-python/pull/3156)) - deprecate jaeger exporters ([#3158](https://github.com/open-telemetry/opentelemetry-python/pull/3158)) - - Create a single resource instance ([#3118](https://github.com/open-telemetry/opentelemetry-python/pull/3118)) + ## Version 1.15.0/0.36b0 (2022-12-09) - PeriodicExportingMetricsReader with +Inf interval diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 836bb8eecef..5bd94d5aacc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -41,6 +41,7 @@ AggregationTemporality, DefaultAggregation, ) +from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from opentelemetry.sdk.metrics._internal.instrument import ( Counter, Histogram, @@ -497,7 +498,14 @@ def _at_fork_reinit(self): def _ticker(self) -> None: interval_secs = self._export_interval_millis / 1e3 while not self._shutdown_event.wait(interval_secs): - self.collect(timeout_millis=self._export_timeout_millis) + try: + self.collect(timeout_millis=self._export_timeout_millis) + except MetricsTimeoutError: + _logger.warning( + "Metric collection timed out. Will try again after %s seconds", + interval_secs, + exc_info=True, + ) # one last collection below before shutting down completely self.collect(timeout_millis=self._export_interval_millis) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 532f25b9e7f..aa0eed285d7 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -14,12 +14,12 @@ import math from time import sleep, time_ns -from typing import Sequence +from typing import Optional, Sequence from unittest.mock import Mock from flaky import flaky -from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Counter, MetricsTimeoutError from opentelemetry.sdk.metrics._internal import _Counter from opentelemetry.sdk.metrics.export import ( AggregationTemporality, @@ -67,6 +67,25 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True +class ExceptionAtCollectionPeriodicExportingMetricReader( + PeriodicExportingMetricReader +): + def __init__( + self, + exporter: MetricExporter, + exception: Exception, + export_interval_millis: Optional[float] = None, + export_timeout_millis: Optional[float] = None, + ) -> None: + super().__init__( + exporter, export_interval_millis, export_timeout_millis + ) + self._collect_exception = exception + + def collect(self, timeout_millis: float = 10_000) -> None: + raise self._collect_exception + + metrics_list = [ Metric( name="sum_name", @@ -111,11 +130,13 @@ def test_defaults(self): pmr.shutdown() def _create_periodic_reader( - self, metrics, exporter, collect_wait=0, interval=60000 + self, metrics, exporter, collect_wait=0, interval=60000, timeout=30000 ): pmr = PeriodicExportingMetricReader( - exporter, export_interval_millis=interval + exporter, + export_interval_millis=interval, + export_timeout_millis=timeout, ) def _collect(reader, timeout_millis): @@ -219,3 +240,15 @@ def test_exporter_aggregation_preference(self): self.assertTrue(isinstance(value, DefaultAggregation)) else: self.assertTrue(isinstance(value, LastValueAggregation)) + + def test_metric_timeout_does_not_kill_worker_thread(self): + exporter = FakeMetricsExporter() + pmr = ExceptionAtCollectionPeriodicExportingMetricReader( + exporter, + MetricsTimeoutError("test timeout"), + export_timeout_millis=1, + ) + + sleep(0.1) + self.assertTrue(pmr._daemon_thread.is_alive()) + pmr.shutdown()