Skip to content

Commit

Permalink
Adding Resource to MetricRecord (#1209)
Browse files Browse the repository at this point in the history
Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
  • Loading branch information
alrex and ocelotl authored Oct 7, 2020
1 parent 803f582 commit 8d7cba0
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_attributes = metric_record.instrument.meter.resource.attributes
resource_attributes = metric_record.resource.attributes
return resource_pb2.Resource(
type=infer_oc_resource_type(resource_attributes),
labels={k: str(v) for k, v in resource_attributes.items()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,36 @@ def test_get_collector_point(self):
"testName", "testDescription", "unit", float, ValueRecorder
)
result = metrics_exporter.get_collector_point(
MetricRecord(int_counter, self._key_labels, aggregator)
MetricRecord(
int_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertIsInstance(result, metrics_pb2.Point)
self.assertIsInstance(result.timestamp, Timestamp)
self.assertEqual(result.int64_value, 0)
aggregator.update(123.5)
aggregator.take_checkpoint()
result = metrics_exporter.get_collector_point(
MetricRecord(float_counter, self._key_labels, aggregator)
MetricRecord(
float_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertEqual(result.double_value, 123.5)
self.assertRaises(
TypeError,
metrics_exporter.get_collector_point(
MetricRecord(valuerecorder, self._key_labels, aggregator)
MetricRecord(
valuerecorder,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
),
)

Expand All @@ -130,7 +145,10 @@ def test_export(self):
"testname", "testdesc", "unit", int, Counter, self._labels.keys(),
)
record = MetricRecord(
test_metric, self._key_labels, aggregate.SumAggregator(),
test_metric,
self._key_labels,
aggregate.SumAggregator(),
metrics.get_meter_provider().resource,
)

result = collector_exporter.export([record])
Expand All @@ -155,7 +173,12 @@ def test_translate_to_collector(self):
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
record = MetricRecord(
test_metric,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ def _translate_data(
# ValueObserver Gauge()
for sdk_metric in data:

if sdk_metric.instrument.meter.resource not in (
if sdk_metric.resource not in (
sdk_resource_instrumentation_library_metrics.keys()
):
sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
] = InstrumentationLibraryMetrics()

type_class = {
Expand Down Expand Up @@ -217,7 +217,7 @@ def _translate_data(
argument = type_class[value_type]["gauge"]["argument"]

sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
].metrics.append(
OTLPMetric(
**{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,19 @@
class TestOTLPMetricExporter(TestCase):
def setUp(self):
self.exporter = OTLPMetricsExporter()

resource = SDKResource(OrderedDict([("a", 1), ("b", False)]))
self.counter_metric_record = MetricRecord(
Counter(
"a",
"b",
"c",
int,
MeterProvider(
resource=SDKResource(OrderedDict([("a", 1), ("b", False)]))
).get_meter(__name__),
MeterProvider(resource=resource,).get_meter(__name__),
("d",),
),
OrderedDict([("e", "f")]),
SumAggregator(),
resource,
)

def test_translate_metrics(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def test_shutdown(self):
def test_export(self):
with self._registry_register_patch:
record = MetricRecord(
self._test_metric, self._labels_key, SumAggregator(),
self._test_metric,
self._labels_key,
SumAggregator(),
get_meter_provider().resource,
)
exporter = PrometheusMetricsExporter()
result = exporter.export([record])
Expand All @@ -86,7 +89,9 @@ def test_min_max_sum_aggregator_to_prometheus(self):
aggregator.update(123)
aggregator.update(456)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
result_bytes = generate_latest(collector)
Expand All @@ -104,7 +109,9 @@ def test_counter_to_prometheus(self):
aggregator = SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])

Expand Down Expand Up @@ -132,7 +139,9 @@ def test_invalid_metric(self):
)
labels = {"environment": "staging"}
key_labels = get_dict_as_key(labels)
record = MetricRecord(metric, key_labels, None)
record = MetricRecord(
metric, key_labels, None, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
collector.collect()
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203))
- Protect access to Span implementation
([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188))
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162))
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager.
([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162))
- Adding Resource to MeterRecord
([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209))

## Version 0.13b0

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ def __init__(
instrumentation_info: "InstrumentationInfo",
):
self.instrumentation_info = instrumentation_info
self.processor = Processor(source.stateful)
self.resource = source.resource
self.processor = Processor(source.stateful, source.resource)
self.metrics = set()
self.observers = set()
self.metrics_lock = threading.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.resources import Resource


class MetricsExportResult(Enum):
Expand All @@ -30,10 +31,12 @@ def __init__(
instrument: metrics_api.InstrumentT,
labels: Tuple[Tuple[str, str]],
aggregator: Aggregator,
resource: Resource,
):
self.instrument = instrument
self.labels = labels
self.aggregator = aggregator
self.resource = resource


class MetricsExporter:
Expand Down Expand Up @@ -77,11 +80,12 @@ def export(
) -> "MetricsExportResult":
for record in metric_records:
print(
'{}(data="{}", labels="{}", value={})'.format(
'{}(data="{}", labels="{}", value={}, resource={})'.format(
type(self).__name__,
record.instrument,
record.labels,
record.aggregator.checkpoint,
record.resource.attributes,
)
)
return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Sequence

from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key


Expand All @@ -26,13 +27,14 @@ class Processor:
will be sent to an exporter for exporting.
"""

def __init__(self, stateful: bool):
def __init__(self, stateful: bool, resource: Resource):
self._batch_map = {}
# stateful=True indicates the processor computes checkpoints from over
# the process lifetime. False indicates the processor computes
# checkpoints which describe the updates of a single collection period
# (deltas)
self.stateful = stateful
self._resource = resource

def checkpoint_set(self) -> Sequence[MetricRecord]:
"""Returns a list of MetricRecords used for exporting.
Expand All @@ -46,7 +48,9 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
(instrument, aggregator_type, _, labels),
aggregator,
) in self._batch_map.items():
metric_records.append(MetricRecord(instrument, labels, aggregator))
metric_records.append(
MetricRecord(instrument, labels, aggregator, self._resource)
)
return metric_records

def finished_collection(self):
Expand Down
48 changes: 31 additions & 17 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
)
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor
from opentelemetry.sdk.resources import Resource


# pylint: disable=protected-access
class TestConsoleMetricsExporter(unittest.TestCase):
# pylint: disable=no-self-use
def test_export(self):
meter = metrics.MeterProvider().get_meter(__name__)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
exporter = ConsoleMetricsExporter()
metric = metrics.Counter(
"available memory",
Expand All @@ -49,12 +51,15 @@ def test_export(self):
)
labels = {"environment": "staging"}
aggregator = SumAggregator()
record = MetricRecord(metric, labels, aggregator)
result = '{}(data="{}", labels="{}", value={})'.format(
record = MetricRecord(
metric, labels, aggregator, meter_provider.resource
)
result = '{}(data="{}", labels="{}", value={}, resource={})'.format(
ConsoleMetricsExporter.__name__,
metric,
labels,
aggregator.checkpoint,
meter_provider.resource.attributes,
)
with mock.patch("sys.stdout") as mock_stdout:
exporter.export([record])
Expand All @@ -63,8 +68,9 @@ def test_export(self):

class TestProcessor(unittest.TestCase):
def test_checkpoint_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -81,13 +87,14 @@ def test_checkpoint_set(self):
self.assertEqual(records[0].aggregator, aggregator)

def test_checkpoint_set_empty(self):
processor = Processor(True)
processor = Processor(True, Resource.create_empty())
records = processor.checkpoint_set()
self.assertEqual(len(records), 0)

def test_finished_collection_stateless(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(False)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(False, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -101,8 +108,9 @@ def test_finished_collection_stateless(self):
self.assertEqual(len(processor._batch_map), 0)

def test_finished_collection_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -116,8 +124,9 @@ def test_finished_collection_stateful(self):
self.assertEqual(len(processor._batch_map), 1)

def test_processor_process_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
aggregator2 = SumAggregator()
metric = metrics.Counter(
Expand All @@ -137,8 +146,9 @@ def test_processor_process_exists(self):
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_processor_process_not_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -156,11 +166,15 @@ def test_processor_process_not_exists(self):
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_processor_process_not_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
"available memory",
"available memory",
"bytes",
int,
meter_provider.get_meter(__name__),
)
labels = ()
_batch_map = {}
Expand Down
Loading

0 comments on commit 8d7cba0

Please sign in to comment.