From 8d7b95875b9115af23e610a63cffb378ac4f30a5 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Sun, 1 May 2022 15:25:58 -0600 Subject: [PATCH] Add variadic arguments to metric exporter/reader interfaces Fixes #2650 --- .../proto/grpc/_metric_exporter/__init__.py | 6 +++-- .../exporter/prometheus/__init__.py | 6 +++-- .../sdk/_metrics/export/__init__.py | 22 ++++++++++++------- .../sdk/_metrics/measurement_consumer.py | 18 +++++++++++---- .../sdk/_metrics/metric_reader.py | 4 ++-- 5 files changed, 38 insertions(+), 18 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index 1ad5cc4d808..c40ff1ddd21 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -168,8 +168,10 @@ def _translate_data( ) ) - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, metrics: Sequence[Metric], *args, **kwargs + ) -> MetricExportResult: return self._export(metrics) - def shutdown(self): + def shutdown(self, *args, **kwargs): pass diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 09ca38c57b6..20ee0d291a8 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -110,12 +110,14 @@ def __init__(self, prefix: str = "") -> None: REGISTRY.register(self._collector) self._collector._callback = self.collect - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics( + self, metrics: Iterable[Metric], *args, **kwargs + ) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self) -> bool: + def shutdown(self, *args, **kwargs) -> bool: REGISTRY.unregister(self._collector) return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 00db5c1a915..ab91c4a0a51 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -52,7 +52,9 @@ class MetricExporter(ABC): """ @abstractmethod - def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": + def export( + self, metrics: Sequence[Metric], *args, **kwargs + ) -> "MetricExportResult": """Exports a batch of telemetry data. Args: @@ -63,7 +65,7 @@ def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": """ @abstractmethod - def shutdown(self) -> None: + def shutdown(self, *args, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -87,13 +89,15 @@ def __init__( self.out = out self.formatter = formatter - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, metrics: Sequence[Metric], *args, **kwargs + ) -> MetricExportResult: for metric in metrics: self.out.write(self.formatter(metric)) self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self) -> None: + def shutdown(self, *args, **kwargs) -> None: pass @@ -123,11 +127,11 @@ def get_metrics(self) -> List[Metric]: self._metrics = [] return metrics - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): with self._lock: self._metrics = list(metrics) - def shutdown(self): + def shutdown(self, *args, **kwargs): pass @@ -193,7 +197,9 @@ def _ticker(self) -> None: # one last collection below before shutting down completely self.collect() - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics( + self, metrics: Iterable[Metric], *args, **kwargs + ) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) @@ -203,7 +209,7 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None: _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self): + def shutdown(self, *args, **kwargs): def _shutdown(): self._shutdown = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index df77bf71e4d..7c392d9eae8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -31,11 +31,15 @@ class MeasurementConsumer(ABC): @abstractmethod - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement( + self, measurement: Measurement, *args, **kwargs + ) -> None: pass @abstractmethod - def register_asynchronous_instrument(self, instrument: "_Asynchronous"): + def register_asynchronous_instrument( + self, instrument: "_Asynchronous", *args, **kwargs + ): pass @abstractmethod @@ -43,6 +47,8 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], + *args, + **kwargs ) -> Iterable[Metric]: pass @@ -60,12 +66,14 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: } self._async_instruments: List["_Asynchronous"] = [] - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement( + self, measurement: Measurement, *args, **kwargs + ) -> None: for reader_storage in self._reader_storages.values(): reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( - self, instrument: "_Asynchronous" + self, instrument: "_Asynchronous", *args, **kwargs ) -> None: with self._lock: self._async_instruments.append(instrument) @@ -74,6 +82,8 @@ def collect( self, metric_reader: MetricReader, instrument_type_temporality: Dict[type, AggregationTemporality], + *args, + **kwargs ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index 7bcdce465d4..0fd34aa7291 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -162,11 +162,11 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self): + def shutdown(self, *args, **kwargs): """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return