diff --git a/CHANGELOG.md b/CHANGELOG.md index 4db9af65e73..96c13d34019 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Update PeriodicExportingMetricReader to never call export() concurrently + ([#2873](https://github.com/open-telemetry/opentelemetry-python/pull/2873)) + ## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08 - Add `force_flush` method to metrics exporter diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 3701229346e..7137fd53958 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -18,7 +18,7 @@ from logging import getLogger from os import environ, linesep from sys import stdout -from threading import Event, RLock, Thread +from threading import Event, Lock, RLock, Thread from typing import IO, Callable, Dict, Iterable, Optional from typing_extensions import final @@ -404,6 +404,9 @@ class PeriodicExportingMetricReader(MetricReader): """`PeriodicExportingMetricReader` is an implementation of `MetricReader` that collects metrics based on a user-configurable time interval, and passes the metrics to the configured exporter. + + The configured exporter's :py:meth:`~MetricExporter.export` method will not be called + concurrently. """ def __init__( @@ -417,6 +420,12 @@ def __init__( preferred_temporality=exporter._preferred_temporality, preferred_aggregation=exporter._preferred_aggregation, ) + + # This lock is held whenever calling self._exporter.export() to prevent concurrent + # execution of MetricExporter.export() + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch + self._export_lock = Lock() + self._exporter = exporter if export_interval_millis is None: try: @@ -479,7 +488,10 @@ def _receive_metrics( return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export(metrics_data, timeout_millis=timeout_millis) + with self._export_lock: + self._exporter.export( + metrics_data, timeout_millis=timeout_millis + ) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py new file mode 100644 index 00000000000..045afe0b298 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exporter_concurrency.py @@ -0,0 +1,105 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from threading import Lock + +from opentelemetry.metrics import CallbackOptions, Observation +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + MetricExporter, + MetricExportResult, + MetricsData, + PeriodicExportingMetricReader, +) +from opentelemetry.test.concurrency_test import ConcurrencyTestBase + + +class MaxCountExporter(MetricExporter): + def __init__(self) -> None: + super().__init__(None, None) + self._lock = Lock() + + # the number of threads inside of export() + self.count_in_export = 0 + + # the total count of calls to export() + self.export_count = 0 + + # the maximum number of threads in export() ever + self.max_count_in_export = 0 + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + with self._lock: + self.export_count += 1 + self.count_in_export += 1 + + # yield to other threads + time.sleep(0) + + with self._lock: + self.max_count_in_export = max( + self.max_count_in_export, self.count_in_export + ) + self.count_in_export -= 1 + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + +class TestExporterConcurrency(ConcurrencyTestBase): + """ + Tests the requirement that: + + > `Export` will never be called concurrently for the same exporter instance. `Export` can + > be called again only after the current call returns. + + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch + """ + + def test_exporter_not_called_concurrently(self): + exporter = MaxCountExporter() + reader = PeriodicExportingMetricReader( + exporter=exporter, + export_interval_millis=100_000, + ) + meter_provider = MeterProvider(metric_readers=[reader]) + + def counter_cb(options: CallbackOptions): + yield Observation(2) + + meter_provider.get_meter(__name__).create_observable_counter( + "testcounter", callbacks=[counter_cb] + ) + + # call collect from a bunch of threads to try and enter export() concurrently + def test_many_threads(): + reader.collect() + + self.run_with_many_threads(test_many_threads, num_threads=100) + + # no thread should be in export() now + self.assertEqual(exporter.count_in_export, 0) + # should be one call for each thread + self.assertEqual(exporter.export_count, 100) + # should never have been more than one concurrent call + self.assertEqual(exporter.max_count_in_export, 1)