Skip to content

Commit

Permalink
Add variadic arguments to metric exporter/reader interfaces
Browse files Browse the repository at this point in the history
Fixes #2650
  • Loading branch information
ocelotl committed May 2, 2022
1 parent 5456988 commit 8d7b958
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ def _translate_data(
)
)

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> MetricExportResult:
return self._export(metrics)

def shutdown(self):
def shutdown(self, *args, **kwargs):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ def __init__(self, prefix: str = "") -> None:
REGISTRY.register(self._collector)
self._collector._callback = self.collect

def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
) -> None:
if metrics is None:
return
self._collector.add_metrics_data(metrics)

def shutdown(self) -> bool:
def shutdown(self, *args, **kwargs) -> bool:
REGISTRY.unregister(self._collector)
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class MetricExporter(ABC):
"""

@abstractmethod
def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> "MetricExportResult":
"""Exports a batch of telemetry data.
Args:
Expand All @@ -63,7 +65,7 @@ def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
"""

@abstractmethod
def shutdown(self) -> None:
def shutdown(self, *args, **kwargs) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
Expand All @@ -87,13 +89,15 @@ def __init__(
self.out = out
self.formatter = formatter

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

def shutdown(self) -> None:
def shutdown(self, *args, **kwargs) -> None:
pass


Expand Down Expand Up @@ -123,11 +127,11 @@ def get_metrics(self) -> List[Metric]:
self._metrics = []
return metrics

def _receive_metrics(self, metrics: Iterable[Metric]):
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
with self._lock:
self._metrics = list(metrics)

def shutdown(self):
def shutdown(self, *args, **kwargs):
pass


Expand Down Expand Up @@ -193,7 +197,9 @@ def _ticker(self) -> None:
# one last collection below before shutting down completely
self.collect()

def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
) -> None:
if metrics is None:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
Expand All @@ -203,7 +209,7 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self):
def shutdown(self, *args, **kwargs):
def _shutdown():
self._shutdown = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@

class MeasurementConsumer(ABC):
@abstractmethod
def consume_measurement(self, measurement: Measurement) -> None:
def consume_measurement(
self, measurement: Measurement, *args, **kwargs
) -> None:
pass

@abstractmethod
def register_asynchronous_instrument(self, instrument: "_Asynchronous"):
def register_asynchronous_instrument(
self, instrument: "_Asynchronous", *args, **kwargs
):
pass

@abstractmethod
def collect(
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
*args,
**kwargs
) -> Iterable[Metric]:
pass

Expand All @@ -60,12 +66,14 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
}
self._async_instruments: List["_Asynchronous"] = []

def consume_measurement(self, measurement: Measurement) -> None:
def consume_measurement(
self, measurement: Measurement, *args, **kwargs
) -> None:
for reader_storage in self._reader_storages.values():
reader_storage.consume_measurement(measurement)

def register_asynchronous_instrument(
self, instrument: "_Asynchronous"
self, instrument: "_Asynchronous", *args, **kwargs
) -> None:
with self._lock:
self._async_instruments.append(instrument)
Expand All @@ -74,6 +82,8 @@ def collect(
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
*args,
**kwargs
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ def _set_collect_callback(
self._collect = func

@abstractmethod
def _receive_metrics(self, metrics: Iterable[Metric]):
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
"""Called by `MetricReader.collect` when it receives a batch of metrics"""

@abstractmethod
def shutdown(self):
def shutdown(self, *args, **kwargs):
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down

0 comments on commit 8d7b958

Please sign in to comment.