Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Resource to MetricRecord #1209

Merged
merged 10 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_attributes = metric_record.instrument.meter.resource.attributes
resource_attributes = metric_record.resource.attributes
return resource_pb2.Resource(
type=infer_oc_resource_type(resource_attributes),
labels={k: str(v) for k, v in resource_attributes.items()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,36 @@ def test_get_collector_point(self):
"testName", "testDescription", "unit", float, ValueRecorder
)
result = metrics_exporter.get_collector_point(
MetricRecord(int_counter, self._key_labels, aggregator)
MetricRecord(
int_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertIsInstance(result, metrics_pb2.Point)
self.assertIsInstance(result.timestamp, Timestamp)
self.assertEqual(result.int64_value, 0)
aggregator.update(123.5)
aggregator.take_checkpoint()
result = metrics_exporter.get_collector_point(
MetricRecord(float_counter, self._key_labels, aggregator)
MetricRecord(
float_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertEqual(result.double_value, 123.5)
self.assertRaises(
TypeError,
metrics_exporter.get_collector_point(
MetricRecord(valuerecorder, self._key_labels, aggregator)
MetricRecord(
valuerecorder,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
),
)

Expand All @@ -130,7 +145,10 @@ def test_export(self):
"testname", "testdesc", "unit", int, Counter, self._labels.keys(),
)
record = MetricRecord(
test_metric, self._key_labels, aggregate.SumAggregator(),
test_metric,
self._key_labels,
aggregate.SumAggregator(),
metrics.get_meter_provider().resource,
)

result = collector_exporter.export([record])
Expand All @@ -155,7 +173,12 @@ def test_translate_to_collector(self):
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
record = MetricRecord(
test_metric,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ def _translate_data(
# ValueObserver Gauge()
for sdk_metric in data:

if sdk_metric.instrument.meter.resource not in (
if sdk_metric.resource not in (
sdk_resource_instrumentation_library_metrics.keys()
):
sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
] = InstrumentationLibraryMetrics()

type_class = {
Expand Down Expand Up @@ -217,7 +217,7 @@ def _translate_data(
argument = type_class[value_type]["gauge"]["argument"]

sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
].metrics.append(
OTLPMetric(
**{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,19 @@
class TestOTLPMetricExporter(TestCase):
def setUp(self):
self.exporter = OTLPMetricsExporter()

resource = SDKResource(OrderedDict([("a", 1), ("b", False)]))
self.counter_metric_record = MetricRecord(
Counter(
"a",
"b",
"c",
int,
MeterProvider(
resource=SDKResource(OrderedDict([("a", 1), ("b", False)]))
).get_meter(__name__),
MeterProvider(resource=resource,).get_meter(__name__),
("d",),
),
OrderedDict([("e", "f")]),
SumAggregator(),
resource,
)

def test_translate_metrics(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def test_shutdown(self):
def test_export(self):
with self._registry_register_patch:
record = MetricRecord(
self._test_metric, self._labels_key, SumAggregator(),
self._test_metric,
self._labels_key,
SumAggregator(),
get_meter_provider().resource,
)
exporter = PrometheusMetricsExporter()
result = exporter.export([record])
Expand All @@ -86,7 +89,9 @@ def test_min_max_sum_aggregator_to_prometheus(self):
aggregator.update(123)
aggregator.update(456)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
result_bytes = generate_latest(collector)
Expand All @@ -104,7 +109,9 @@ def test_counter_to_prometheus(self):
aggregator = SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])

Expand Down Expand Up @@ -132,7 +139,9 @@ def test_invalid_metric(self):
)
labels = {"environment": "staging"}
key_labels = get_dict_as_key(labels)
record = MetricRecord(metric, key_labels, None)
record = MetricRecord(
metric, key_labels, None, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
collector.collect()
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203))
- Protect access to Span implementation
([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188))
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162))
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager.
([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162))
- Adding Resource to MeterRecord
([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209))

## Version 0.13b0

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ def __init__(
instrumentation_info: "InstrumentationInfo",
):
self.instrumentation_info = instrumentation_info
self.processor = Processor(source.stateful)
self.resource = source.resource
self.processor = Processor(source.stateful, source.resource)
Copy link
Contributor

@lzchen lzchen Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at this comment. I believe we should actually decouple Tracer from TracerProvider, and equivalently, Meter from MeterProvider by removing the source attribute. We should be passing in configuration through the Provider into the constructor of both Tracer and Meter. resource would be one of those configurations. Not sure if you want to do this as part of this PR however.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like issue #1181 already tracks this work, I'd rather do this as a follow up to this PR.

self.metrics = set()
self.observers = set()
self.metrics_lock = threading.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.resources import Resource


class MetricsExportResult(Enum):
Expand All @@ -30,10 +31,12 @@ def __init__(
instrument: metrics_api.InstrumentT,
labels: Tuple[Tuple[str, str]],
aggregator: Aggregator,
resource: Resource,
):
self.instrument = instrument
self.labels = labels
self.aggregator = aggregator
self.resource = resource


class MetricsExporter:
Expand Down Expand Up @@ -77,11 +80,12 @@ def export(
) -> "MetricsExportResult":
for record in metric_records:
print(
'{}(data="{}", labels="{}", value={})'.format(
'{}(data="{}", labels="{}", value={}, resource={})'.format(
type(self).__name__,
record.instrument,
record.labels,
record.aggregator.checkpoint,
record.resource.attributes,
)
)
return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Sequence

from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key


Expand All @@ -26,13 +27,14 @@ class Processor:
will be sent to an exporter for exporting.
"""

def __init__(self, stateful: bool):
def __init__(self, stateful: bool, resource: Resource):
self._batch_map = {}
# stateful=True indicates the processor computes checkpoints from over
# the process lifetime. False indicates the processor computes
# checkpoints which describe the updates of a single collection period
# (deltas)
self.stateful = stateful
self._resource = resource

def checkpoint_set(self) -> Sequence[MetricRecord]:
"""Returns a list of MetricRecords used for exporting.
Expand All @@ -46,7 +48,9 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
(instrument, aggregator_type, _, labels),
aggregator,
) in self._batch_map.items():
metric_records.append(MetricRecord(instrument, labels, aggregator))
metric_records.append(
MetricRecord(instrument, labels, aggregator, self._resource)
)
return metric_records

def finished_collection(self):
Expand Down
48 changes: 31 additions & 17 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
)
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor
from opentelemetry.sdk.resources import Resource


# pylint: disable=protected-access
class TestConsoleMetricsExporter(unittest.TestCase):
# pylint: disable=no-self-use
def test_export(self):
meter = metrics.MeterProvider().get_meter(__name__)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
exporter = ConsoleMetricsExporter()
metric = metrics.Counter(
"available memory",
Expand All @@ -49,12 +51,15 @@ def test_export(self):
)
labels = {"environment": "staging"}
aggregator = SumAggregator()
record = MetricRecord(metric, labels, aggregator)
result = '{}(data="{}", labels="{}", value={})'.format(
record = MetricRecord(
metric, labels, aggregator, meter_provider.resource
)
result = '{}(data="{}", labels="{}", value={}, resource={})'.format(
ConsoleMetricsExporter.__name__,
metric,
labels,
aggregator.checkpoint,
meter_provider.resource.attributes,
)
with mock.patch("sys.stdout") as mock_stdout:
exporter.export([record])
Expand All @@ -63,8 +68,9 @@ def test_export(self):

class TestProcessor(unittest.TestCase):
def test_checkpoint_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -81,13 +87,14 @@ def test_checkpoint_set(self):
self.assertEqual(records[0].aggregator, aggregator)

def test_checkpoint_set_empty(self):
processor = Processor(True)
processor = Processor(True, Resource.create_empty())
records = processor.checkpoint_set()
self.assertEqual(len(records), 0)

def test_finished_collection_stateless(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(False)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(False, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -101,8 +108,9 @@ def test_finished_collection_stateless(self):
self.assertEqual(len(processor._batch_map), 0)

def test_finished_collection_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -116,8 +124,9 @@ def test_finished_collection_stateful(self):
self.assertEqual(len(processor._batch_map), 1)

def test_processor_process_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
aggregator2 = SumAggregator()
metric = metrics.Counter(
Expand All @@ -137,8 +146,9 @@ def test_processor_process_exists(self):
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_processor_process_not_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -156,11 +166,15 @@ def test_processor_process_not_exists(self):
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_processor_process_not_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
processor = Processor(True)
meter_provider = metrics.MeterProvider()
processor = Processor(True, meter_provider.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
"available memory",
"available memory",
"bytes",
int,
meter_provider.get_meter(__name__),
)
labels = ()
_batch_map = {}
Expand Down
Loading