Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter-otlp-proto-common] Include metric info in encoding exception #4154

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove `_start_time_unix_nano` attribute from `_ViewInstrumentMatch` in favor
of using `time_ns()` at the moment when the aggregation object is created
([#4137](https://github.com/open-telemetry/opentelemetry-python/pull/4137))
- Include metric info in encoding exceptions
pmcollins marked this conversation as resolved.
Show resolved Hide resolved
([#4137](https://github.com/open-telemetry/opentelemetry-python/pull/4154))
pmcollins marked this conversation as resolved.
Show resolved Hide resolved

## Version 1.26.0/0.47b0 (2024-07-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,152 +173,26 @@ def _get_aggregation(
return instrument_class_aggregation


def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
resource_metrics_dict = {}

for resource_metrics in data.resource_metrics:

resource = resource_metrics.resource
class EncodingException(Exception):
"""
Raised by encode_metrics() when an exception is caught during encoding. Contains the problematic metric so
the misbehaving metric name and details can be logged during exception handling.
"""

# It is safe to assume that each entry in data.resource_metrics is
# associated with an unique resource.
scope_metrics_dict = {}
def __init__(self, original_exception, metric):
super().__init__()
self.original_exception = original_exception
self.metric = metric

resource_metrics_dict[resource] = scope_metrics_dict

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)
def __str__(self):
return f"{self.metric}\n{self.original_exception}"

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)
def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
resource_metrics_dict = {}

if isinstance(metric.data, Gauge):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, HistogramType):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
max=data_point.max,
min=data_point.min,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)

elif isinstance(metric.data, Sum):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
start_time_unix_nano=(
data_point.start_time_unix_nano
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
pb2_metric.sum.data_points.append(pt)

elif isinstance(metric.data, ExponentialHistogramType):
for data_point in metric.data.data_points:

if data_point.positive.bucket_counts:
positive = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.positive.offset,
bucket_counts=data_point.positive.bucket_counts,
)
else:
positive = None

if data_point.negative.bucket_counts:
negative = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.negative.offset,
bucket_counts=data_point.negative.bucket_counts,
)
else:
negative = None

pt = pb2.ExponentialHistogramDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
scale=data_point.scale,
zero_count=data_point.zero_count,
positive=positive,
negative=negative,
flags=data_point.flags,
max=data_point.max,
min=data_point.min,
)
pb2_metric.exponential_histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.exponential_histogram.data_points.append(pt)

else:
_logger.warning(
"unsupported data type %s",
metric.data.__class__.__name__,
)
continue

pb2_scope_metrics.metrics.append(pb2_metric)
for resource_metrics in data.resource_metrics:
_encode_resource_metrics(resource_metrics, resource_metrics_dict)

resource_data = []
for (
Expand All @@ -334,5 +208,137 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
schema_url=sdk_resource.schema_url,
)
)
resource_metrics = resource_data
return ExportMetricsServiceRequest(resource_metrics=resource_metrics)
return ExportMetricsServiceRequest(resource_metrics=resource_data)


def _encode_resource_metrics(resource_metrics, resource_metrics_dict):
resource = resource_metrics.resource
# It is safe to assume that each entry in data.resource_metrics is
# associated with an unique resource.
scope_metrics_dict = {}
resource_metrics_dict[resource] = scope_metrics_dict
for scope_metrics in resource_metrics.scope_metrics:
instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)

try:
_encode_metric(metric, pb2_metric)
except Exception as ex:
# `from None` so we don't get "During handling of the above exception, another exception occurred:"
raise EncodingException(ex, metric) from None

pb2_scope_metrics.metrics.append(pb2_metric)


def _encode_metric(metric, pb2_metric):
if isinstance(metric.data, Gauge):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, HistogramType):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=data_point.start_time_unix_nano,
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
max=data_point.max,
min=data_point.min,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)

elif isinstance(metric.data, Sum):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(data_point.attributes),
start_time_unix_nano=data_point.start_time_unix_nano,
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
pb2_metric.sum.data_points.append(pt)

elif isinstance(metric.data, ExponentialHistogramType):
for data_point in metric.data.data_points:

if data_point.positive.bucket_counts:
positive = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.positive.offset,
bucket_counts=data_point.positive.bucket_counts,
)
else:
positive = None

if data_point.negative.bucket_counts:
negative = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.negative.offset,
bucket_counts=data_point.negative.bucket_counts,
)
else:
negative = None

pt = pb2.ExponentialHistogramDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=data_point.start_time_unix_nano,
count=data_point.count,
sum=data_point.sum,
scale=data_point.scale,
zero_count=data_point.zero_count,
positive=positive,
negative=negative,
flags=data_point.flags,
max=data_point.max,
min=data_point.min,
)
pb2_metric.exponential_histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.exponential_histogram.data_points.append(pt)

else:
_logger.warning(
"unsupported data type %s",
metric.data.__class__.__name__,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# pylint: disable=protected-access
import unittest

from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
EncodingException,
)
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
Expand Down Expand Up @@ -814,3 +817,35 @@ def test_encode_exponential_histogram(self):
# pylint: disable=protected-access
actual = encode_metrics(metrics_data)
self.assertEqual(expected, actual)

def test_encoding_exception_reraise(self):
# this number is too big to fit in a signed 64-bit proto field and causes a ValueError
big_number = 2**63
metrics_data = MetricsData(
resource_metrics=[
ResourceMetrics(
resource=Resource(
attributes={},
schema_url="resource_schema_url",
),
scope_metrics=[
ScopeMetrics(
scope=SDKInstrumentationScope(
name="first_name",
version="first_version",
schema_url="insrumentation_scope_schema_url",
),
metrics=[_generate_sum("sum_double", big_number)],
schema_url="instrumentation_scope_schema_url",
)
],
schema_url="resource_schema_url",
)
]
)
with self.assertRaises(EncodingException) as context:
encode_metrics(metrics_data)

# assert that the EncodingException wraps the metric and original exception
assert isinstance(context.exception.metric, Metric)
assert isinstance(context.exception.original_exception, ValueError)
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ def _receive_metrics(
self._exporter.export(
metrics_data, timeout_millis=timeout_millis
)
except Exception as e:
_logger.exception("Exception while exporting metrics %s", str(e))
except Exception:
_logger.exception("Exception while exporting metrics")
detach(token)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
Expand Down