From 8d7cba05f401f76babc6acd646a3acee02a715ae Mon Sep 17 00:00:00 2001 From: alrex Date: Wed, 7 Oct 2020 09:07:49 -0700 Subject: [PATCH] Adding Resource to MetricRecord (#1209) Co-authored-by: Diego Hurtado --- .../opencensus/metrics_exporter/__init__.py | 2 +- .../test_otcollector_metrics_exporter.py | 33 +++++++++++-- .../otlp/metrics_exporter/__init__.py | 6 +-- .../tests/test_otlp_metric_exporter.py | 7 ++- .../tests/test_prometheus_exporter.py | 17 +++++-- opentelemetry-sdk/CHANGELOG.md | 5 +- .../src/opentelemetry/sdk/metrics/__init__.py | 3 +- .../sdk/metrics/export/__init__.py | 6 ++- .../sdk/metrics/export/processor.py | 8 +++- .../tests/metrics/export/test_export.py | 48 ++++++++++++------- .../tests/metrics/test_metrics.py | 8 ++-- 11 files changed, 98 insertions(+), 45 deletions(-) diff --git a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py index db7af753aa7..de3c8a8b453 100644 --- a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py @@ -191,7 +191,7 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point: def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource: - resource_attributes = metric_record.instrument.meter.resource.attributes + resource_attributes = metric_record.resource.attributes return resource_pb2.Resource( type=infer_oc_resource_type(resource_attributes), labels={k: str(v) for k, v in resource_attributes.items()}, diff --git a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py index 1ec1a574487..4dd34bf1ba1 100644 --- a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py @@ -100,7 +100,12 @@ def test_get_collector_point(self): "testName", "testDescription", "unit", float, ValueRecorder ) result = metrics_exporter.get_collector_point( - MetricRecord(int_counter, self._key_labels, aggregator) + MetricRecord( + int_counter, + self._key_labels, + aggregator, + metrics.get_meter_provider().resource, + ) ) self.assertIsInstance(result, metrics_pb2.Point) self.assertIsInstance(result.timestamp, Timestamp) @@ -108,13 +113,23 @@ def test_get_collector_point(self): aggregator.update(123.5) aggregator.take_checkpoint() result = metrics_exporter.get_collector_point( - MetricRecord(float_counter, self._key_labels, aggregator) + MetricRecord( + float_counter, + self._key_labels, + aggregator, + metrics.get_meter_provider().resource, + ) ) self.assertEqual(result.double_value, 123.5) self.assertRaises( TypeError, metrics_exporter.get_collector_point( - MetricRecord(valuerecorder, self._key_labels, aggregator) + MetricRecord( + valuerecorder, + self._key_labels, + aggregator, + metrics.get_meter_provider().resource, + ) ), ) @@ -130,7 +145,10 @@ def test_export(self): "testname", "testdesc", "unit", int, Counter, self._labels.keys(), ) record = MetricRecord( - test_metric, self._key_labels, aggregate.SumAggregator(), + test_metric, + self._key_labels, + aggregate.SumAggregator(), + metrics.get_meter_provider().resource, ) result = collector_exporter.export([record]) @@ -155,7 +173,12 @@ def test_translate_to_collector(self): aggregator = aggregate.SumAggregator() aggregator.update(123) aggregator.take_checkpoint() - record = MetricRecord(test_metric, self._key_labels, aggregator,) + record = MetricRecord( + test_metric, + self._key_labels, + aggregator, + metrics.get_meter_provider().resource, + ) start_timestamp = Timestamp() output_metrics = metrics_exporter.translate_to_collector( [record], start_timestamp, diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index 08a47c601e2..1fa1bf24f11 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -137,11 +137,11 @@ def _translate_data( # ValueObserver Gauge() for sdk_metric in data: - if sdk_metric.instrument.meter.resource not in ( + if sdk_metric.resource not in ( sdk_resource_instrumentation_library_metrics.keys() ): sdk_resource_instrumentation_library_metrics[ - sdk_metric.instrument.meter.resource + sdk_metric.resource ] = InstrumentationLibraryMetrics() type_class = { @@ -217,7 +217,7 @@ def _translate_data( argument = type_class[value_type]["gauge"]["argument"] sdk_resource_instrumentation_library_metrics[ - sdk_metric.instrument.meter.resource + sdk_metric.resource ].metrics.append( OTLPMetric( **{ diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py index 1218fbbb330..1eba2bef66d 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py @@ -44,20 +44,19 @@ class TestOTLPMetricExporter(TestCase): def setUp(self): self.exporter = OTLPMetricsExporter() - + resource = SDKResource(OrderedDict([("a", 1), ("b", False)])) self.counter_metric_record = MetricRecord( Counter( "a", "b", "c", int, - MeterProvider( - resource=SDKResource(OrderedDict([("a", 1), ("b", False)])) - ).get_meter(__name__), + MeterProvider(resource=resource,).get_meter(__name__), ("d",), ), OrderedDict([("e", "f")]), SumAggregator(), + resource, ) def test_translate_metrics(self): diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index 936a6eb8221..4e2075b8b6a 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -67,7 +67,10 @@ def test_shutdown(self): def test_export(self): with self._registry_register_patch: record = MetricRecord( - self._test_metric, self._labels_key, SumAggregator(), + self._test_metric, + self._labels_key, + SumAggregator(), + get_meter_provider().resource, ) exporter = PrometheusMetricsExporter() result = exporter.export([record]) @@ -86,7 +89,9 @@ def test_min_max_sum_aggregator_to_prometheus(self): aggregator.update(123) aggregator.update(456) aggregator.take_checkpoint() - record = MetricRecord(metric, key_labels, aggregator) + record = MetricRecord( + metric, key_labels, aggregator, get_meter_provider().resource + ) collector = CustomCollector("testprefix") collector.add_metrics_data([record]) result_bytes = generate_latest(collector) @@ -104,7 +109,9 @@ def test_counter_to_prometheus(self): aggregator = SumAggregator() aggregator.update(123) aggregator.take_checkpoint() - record = MetricRecord(metric, key_labels, aggregator) + record = MetricRecord( + metric, key_labels, aggregator, get_meter_provider().resource + ) collector = CustomCollector("testprefix") collector.add_metrics_data([record]) @@ -132,7 +139,9 @@ def test_invalid_metric(self): ) labels = {"environment": "staging"} key_labels = get_dict_as_key(labels) - record = MetricRecord(metric, key_labels, None) + record = MetricRecord( + metric, key_labels, None, get_meter_provider().resource + ) collector = CustomCollector("testprefix") collector.add_metrics_data([record]) collector.collect() diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 388296fbe3e..6890a2e2b7f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,7 +14,10 @@ ([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203)) - Protect access to Span implementation ([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188)) -- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162)) +- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. + ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162)) +- Adding Resource to MeterRecord + ([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209)) ## Version 0.13b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 01242b7d074..382dbeb9b9d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -352,8 +352,7 @@ def __init__( instrumentation_info: "InstrumentationInfo", ): self.instrumentation_info = instrumentation_info - self.processor = Processor(source.stateful) - self.resource = source.resource + self.processor = Processor(source.stateful, source.resource) self.metrics = set() self.observers = set() self.metrics_lock = threading.Lock() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 16911f94efb..e7882217ec4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -17,6 +17,7 @@ from opentelemetry import metrics as metrics_api from opentelemetry.sdk.metrics.export.aggregate import Aggregator +from opentelemetry.sdk.resources import Resource class MetricsExportResult(Enum): @@ -30,10 +31,12 @@ def __init__( instrument: metrics_api.InstrumentT, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, + resource: Resource, ): self.instrument = instrument self.labels = labels self.aggregator = aggregator + self.resource = resource class MetricsExporter: @@ -77,11 +80,12 @@ def export( ) -> "MetricsExportResult": for record in metric_records: print( - '{}(data="{}", labels="{}", value={})'.format( + '{}(data="{}", labels="{}", value={}, resource={})'.format( type(self).__name__, record.instrument, record.labels, record.aggregator.checkpoint, + record.resource.attributes, ) ) return MetricsExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py index c012d7382bf..fa16f5f4ea7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py @@ -15,6 +15,7 @@ from typing import Sequence from opentelemetry.sdk.metrics.export import MetricRecord +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util import get_dict_as_key @@ -26,13 +27,14 @@ class Processor: will be sent to an exporter for exporting. """ - def __init__(self, stateful: bool): + def __init__(self, stateful: bool, resource: Resource): self._batch_map = {} # stateful=True indicates the processor computes checkpoints from over # the process lifetime. False indicates the processor computes # checkpoints which describe the updates of a single collection period # (deltas) self.stateful = stateful + self._resource = resource def checkpoint_set(self) -> Sequence[MetricRecord]: """Returns a list of MetricRecords used for exporting. @@ -46,7 +48,9 @@ def checkpoint_set(self) -> Sequence[MetricRecord]: (instrument, aggregator_type, _, labels), aggregator, ) in self._batch_map.items(): - metric_records.append(MetricRecord(instrument, labels, aggregator)) + metric_records.append( + MetricRecord(instrument, labels, aggregator, self._resource) + ) return metric_records def finished_collection(self): diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index efa6bcd24e1..b0c74e7093d 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -31,13 +31,15 @@ ) from opentelemetry.sdk.metrics.export.controller import PushController from opentelemetry.sdk.metrics.export.processor import Processor +from opentelemetry.sdk.resources import Resource # pylint: disable=protected-access class TestConsoleMetricsExporter(unittest.TestCase): # pylint: disable=no-self-use def test_export(self): - meter = metrics.MeterProvider().get_meter(__name__) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) exporter = ConsoleMetricsExporter() metric = metrics.Counter( "available memory", @@ -49,12 +51,15 @@ def test_export(self): ) labels = {"environment": "staging"} aggregator = SumAggregator() - record = MetricRecord(metric, labels, aggregator) - result = '{}(data="{}", labels="{}", value={})'.format( + record = MetricRecord( + metric, labels, aggregator, meter_provider.resource + ) + result = '{}(data="{}", labels="{}", value={}, resource={})'.format( ConsoleMetricsExporter.__name__, metric, labels, aggregator.checkpoint, + meter_provider.resource.attributes, ) with mock.patch("sys.stdout") as mock_stdout: exporter.export([record]) @@ -63,8 +68,9 @@ def test_export(self): class TestProcessor(unittest.TestCase): def test_checkpoint_set(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(True) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + processor = Processor(True, meter_provider.resource) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -81,13 +87,14 @@ def test_checkpoint_set(self): self.assertEqual(records[0].aggregator, aggregator) def test_checkpoint_set_empty(self): - processor = Processor(True) + processor = Processor(True, Resource.create_empty()) records = processor.checkpoint_set() self.assertEqual(len(records), 0) def test_finished_collection_stateless(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(False) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + processor = Processor(False, meter_provider.resource) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -101,8 +108,9 @@ def test_finished_collection_stateless(self): self.assertEqual(len(processor._batch_map), 0) def test_finished_collection_stateful(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(True) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + processor = Processor(True, meter_provider.resource) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -116,8 +124,9 @@ def test_finished_collection_stateful(self): self.assertEqual(len(processor._batch_map), 1) def test_processor_process_exists(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(True) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + processor = Processor(True, meter_provider.resource) aggregator = SumAggregator() aggregator2 = SumAggregator() metric = metrics.Counter( @@ -137,8 +146,9 @@ def test_processor_process_exists(self): self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0) def test_processor_process_not_exists(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(True) + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + processor = Processor(True, meter_provider.resource) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -156,11 +166,15 @@ def test_processor_process_not_exists(self): self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0) def test_processor_process_not_stateful(self): - meter = metrics.MeterProvider().get_meter(__name__) - processor = Processor(True) + meter_provider = metrics.MeterProvider() + processor = Processor(True, meter_provider.resource) aggregator = SumAggregator() metric = metrics.Counter( - "available memory", "available memory", "bytes", int, meter + "available memory", + "available memory", + "bytes", + int, + meter_provider.get_meter(__name__), ) labels = () _batch_map = {} diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 2f833f47820..1fcb2bbda1b 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -33,14 +33,12 @@ def test_stateful(self): def test_resource(self): resource = resources.Resource.create({}) meter_provider = metrics.MeterProvider(resource=resource) - meter = meter_provider.get_meter(__name__) - self.assertIs(meter.resource, resource) + self.assertIs(meter_provider.resource, resource) def test_resource_empty(self): meter_provider = metrics.MeterProvider() - meter = meter_provider.get_meter(__name__) # pylint: disable=protected-access - self.assertEqual(meter.resource, resources._DEFAULT_RESOURCE) + self.assertEqual(meter_provider.resource, resources._DEFAULT_RESOURCE) def test_start_pipeline(self): exporter = mock.Mock() @@ -167,7 +165,7 @@ def test_create_metric(self): self.assertIsInstance(counter, metrics.Counter) self.assertEqual(counter.value_type, int) self.assertEqual(counter.name, "name") - self.assertIs(counter.meter.resource, resource) + self.assertIs(meter_provider.resource, resource) self.assertEqual(counter.meter, meter) def test_create_updowncounter(self):