Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data points in exporter shouldnt use bound instruments #1237

Merged
merged 6 commits into from
Oct 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Add Env variables in OTLP exporter
([#1101](https://github.com/open-telemetry/opentelemetry-python/pull/1101))
- Do not use bound instruments in OTLP exporter
([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237))

## Version 0.14b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,48 +58,54 @@
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)
DataPointT = TypeVar("DataPointT", IntDataPoint, DoubleDataPoint)


def _get_data_points(
sdk_metric: MetricRecord, data_point_class: Type[DataPointT]
sdk_metric_record: MetricRecord, data_point_class: Type[DataPointT]
) -> List[DataPointT]:

data_points = []

for (
label,
bound_counter,
) in sdk_metric.instrument.bound_instruments.items():

string_key_values = []

for label_key, label_value in label:
string_key_values.append(
StringKeyValue(key=label_key, value=label_value)
)

for view_data in bound_counter.view_datas:

if view_data.labels == label:

data_points.append(
data_point_class(
labels=string_key_values,
value=view_data.aggregator.current,
start_time_unix_nano=(
view_data.aggregator.last_checkpoint_timestamp
),
time_unix_nano=(
view_data.aggregator.last_update_timestamp
),
)
)
break

return data_points
if isinstance(sdk_metric_record.aggregator, SumAggregator):
value = sdk_metric_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator):
# FIXME: How are values to be interpreted from this aggregator?
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
raise Exception("MinMaxSumCount aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, HistogramAggregator):
# FIXME: How are values to be interpreted from this aggregator?
raise Exception("Histogram aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, LastValueAggregator):
value = sdk_metric_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator):
value = sdk_metric_record.aggregator.checkpoint.last

return [
data_point_class(
labels=[
StringKeyValue(key=str(label_key), value=str(label_value))
for label_key, label_value in sdk_metric_record.labels
],
value=value,
start_time_unix_nano=(
sdk_metric_record.aggregator.initial_checkpoint_timestamp
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
),
time_unix_nano=(
sdk_metric_record.aggregator.last_update_timestamp
),
)
]


class OTLPMetricsExporter(
Expand Down Expand Up @@ -179,13 +185,13 @@ def _translate_data(
# SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true)
# UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false)
# ValueObserver Gauge()
for sdk_metric in data:
for sdk_metric_record in data:

if sdk_metric.resource not in (
if sdk_metric_record.resource not in (
sdk_resource_instrumentation_library_metrics.keys()
):
sdk_resource_instrumentation_library_metrics[
sdk_metric.resource
sdk_metric_record.resource
] = InstrumentationLibraryMetrics()

type_class = {
Expand All @@ -204,70 +210,82 @@ def _translate_data(
},
}

value_type = sdk_metric.instrument.value_type
value_type = sdk_metric_record.instrument.value_type

sum_class = type_class[value_type]["sum"]["class"]
gauge_class = type_class[value_type]["gauge"]["class"]
data_point_class = type_class[value_type]["data_point_class"]

if isinstance(sdk_metric.instrument, Counter):
if isinstance(sdk_metric_record.instrument, Counter):
otlp_metric_data = sum_class(
data_points=_get_data_points(sdk_metric, data_point_class),
data_points=_get_data_points(
sdk_metric_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
),
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric.instrument, UpDownCounter):
elif isinstance(sdk_metric_record.instrument, UpDownCounter):
otlp_metric_data = sum_class(
data_points=_get_data_points(sdk_metric, data_point_class),
data_points=_get_data_points(
sdk_metric_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
),
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric.instrument, (ValueRecorder)):
elif isinstance(sdk_metric_record.instrument, (ValueRecorder)):
logger.warning("Skipping exporting of ValueRecorder metric")
continue

elif isinstance(sdk_metric.instrument, SumObserver):
elif isinstance(sdk_metric_record.instrument, SumObserver):
otlp_metric_data = sum_class(
data_points=_get_data_points(sdk_metric, data_point_class),
data_points=_get_data_points(
sdk_metric_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
),
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric.instrument, UpDownSumObserver):
elif isinstance(sdk_metric_record.instrument, UpDownSumObserver):
otlp_metric_data = sum_class(
data_points=_get_data_points(sdk_metric, data_point_class),
data_points=_get_data_points(
sdk_metric_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
),
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric.instrument, (ValueObserver)):
elif isinstance(sdk_metric_record.instrument, (ValueObserver)):
otlp_metric_data = gauge_class(
data_points=_get_data_points(sdk_metric, data_point_class)
data_points=_get_data_points(
sdk_metric_record, data_point_class
)
)
argument = type_class[value_type]["gauge"]["argument"]

sdk_resource_instrumentation_library_metrics[
sdk_metric.resource
sdk_metric_record.resource
].metrics.append(
OTLPMetric(
**{
"name": sdk_metric.instrument.name,
"description": sdk_metric.instrument.description,
"unit": sdk_metric.instrument.unit,
"name": sdk_metric_record.instrument.name,
"description": (
sdk_metric_record.instrument.description
),
"unit": sdk_metric_record.instrument.unit,
argument: otlp_metric_data,
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def setUp(self):

self.counter_metric_record = MetricRecord(
Counter(
"a",
"b",
"c",
"d",
"e",
int,
MeterProvider(resource=resource,).get_meter(__name__),
("d",),
("f",),
),
OrderedDict([("e", "f")]),
[("g", "h")],
SumAggregator(),
resource,
)
Expand Down Expand Up @@ -97,7 +97,9 @@ def test_translate_metrics(self, mock_time_ns):

mock_time_ns.configure_mock(**{"return_value": 1})

self.counter_metric_record.instrument.add(1, OrderedDict([("a", "b")]))
self.counter_metric_record.aggregator.checkpoint = 1
self.counter_metric_record.aggregator.initial_checkpoint_timestamp = 1
self.counter_metric_record.aggregator.last_update_timestamp = 1

expected = ExportMetricsServiceRequest(
resource_metrics=[
Expand All @@ -114,19 +116,20 @@ def test_translate_metrics(self, mock_time_ns):
InstrumentationLibraryMetrics(
metrics=[
OTLPMetric(
name="a",
description="b",
unit="c",
name="c",
description="d",
unit="e",
int_sum=IntSum(
data_points=[
IntDataPoint(
labels=[
StringKeyValue(
key="a", value="b"
key="g", value="h"
)
],
value=1,
time_unix_nano=1,
start_time_unix_nano=1,
)
],
aggregation_temporality=(
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
([#1282](https://github.com/open-telemetry/opentelemetry-python/pull/1282))
- Span.is_recording() returns false after span has ended
([#1289](https://github.com/open-telemetry/opentelemetry-python/pull/1289))
- Set initial checkpoint timestamp in aggregators
([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237))

## Version 0.14b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Aggregator(abc.ABC):
def __init__(self, config=None):
self._lock = threading.Lock()
self.last_update_timestamp = 0
self.last_checkpoint_timestamp = 0
self.initial_checkpoint_timestamp = 0
self.checkpointed = True
if config is not None:
self.config = config
else:
Expand All @@ -42,21 +43,25 @@ def __init__(self, config=None):
@abc.abstractmethod
def update(self, value):
"""Updates the current with the new value."""
if self.checkpointed:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
self.initial_checkpoint_timestamp = time_ns()
self.checkpointed = False
self.last_update_timestamp = time_ns()

@abc.abstractmethod
def take_checkpoint(self):
"""Stores a snapshot of the current value."""
self.last_checkpoint_timestamp = time_ns()
self.checkpointed = True

@abc.abstractmethod
def merge(self, other):
"""Combines two aggregator values."""
self.last_update_timestamp = max(
self.last_update_timestamp, other.last_update_timestamp
)
self.last_checkpoint_timestamp = max(
self.last_checkpoint_timestamp, other.last_checkpoint_timestamp
self.initial_checkpoint_timestamp = max(
self.initial_checkpoint_timestamp,
other.initial_checkpoint_timestamp,
)

def _verify_type(self, other):
Expand Down
62 changes: 62 additions & 0 deletions tests/util/src/opentelemetry/test/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep

from opentelemetry.context import attach, detach, set_value
from opentelemetry.metrics import Meter
from opentelemetry.sdk.metrics.export import MetricsExporter


class DebugController:
"""A debug controller, used to replace Push controller when debugging

Push controller uses a thread which makes it hard to use the IPython
debugger. This controller does not use a thread, but relies on the user
manually calling its ``run`` method to start the controller.

Args:
meter: The meter used to collect metrics.
exporter: The exporter used to export metrics.
interval: The collect/export interval in seconds.
"""

daemon = True

def __init__(
self, meter: Meter, exporter: MetricsExporter, interval: float
):
super().__init__()
self.meter = meter
self.exporter = exporter
self.interval = interval

def run(self):
while True:
self.tick()
sleep(self.interval)

def shutdown(self):
# Run one more collection pass to flush metrics batched in the meter
self.tick()

def tick(self):
# Collect all of the meter's metrics to be exported
self.meter.collect()
# Export the collected metrics
token = attach(set_value("suppress_instrumentation", True))
self.exporter.export(self.meter.processor.checkpoint_set())
detach(token)
# Perform post-exporting logic
self.meter.processor.finished_collection()