-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding OpenTelemetry exporter code #14784
Changes from 15 commits
55343e0
3590ac8
b55b73b
5abed09
e9eb839
495c119
78a6953
22026dc
fc05356
a5e2b89
e08fa26
bf1093b
51489a9
549f203
6fdfb37
1ce6bbb
de56d02
1adc16c
871b55e
75e1912
11d17cb
bd971d8
c6ab18b
611bb43
b8a57cc
faaf083
7c8bbe1
8e2500a
3bd0773
f4e9524
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,196 @@ | ||||||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||||||
# Licensed under the MIT License. | ||||||
import json | ||||||
import logging | ||||||
import typing | ||||||
from enum import Enum | ||||||
from urllib.parse import urlparse | ||||||
|
||||||
from opentelemetry.sdk.metrics.export import MetricsExportResult | ||||||
from opentelemetry.sdk.trace.export import SpanExportResult | ||||||
from opentelemetry.sdk.util import ns_to_iso_str | ||||||
from opentelemetry.trace import Span, SpanKind | ||||||
from opentelemetry.trace.status import StatusCanonicalCode | ||||||
|
||||||
from azure.core.exceptions import HttpResponseError | ||||||
from azure.core.pipeline.policies import ProxyPolicy, RetryPolicy | ||||||
from opentelemetry.exporter.azuremonitor import utils | ||||||
from opentelemetry.exporter.azuremonitor._generated import AzureMonitorClient | ||||||
from opentelemetry.exporter.azuremonitor._generated.models import TelemetryItem, TrackResponse | ||||||
from opentelemetry.exporter.azuremonitor.options import ExporterOptions | ||||||
from opentelemetry.exporter.azuremonitor.storage import LocalFileStorage | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
class ExportResult(Enum): | ||||||
SUCCESS = 0 | ||||||
FAILED_RETRYABLE = 1 | ||||||
FAILED_NOT_RETRYABLE = 2 | ||||||
|
||||||
|
||||||
# pylint: disable=broad-except | ||||||
class BaseExporter: | ||||||
"""Azure Monitor base exporter for OpenTelemetry. | ||||||
|
||||||
Args: | ||||||
options: :doc:`export.options` to allow configuration for the exporter | ||||||
""" | ||||||
|
||||||
def __init__(self, **options): | ||||||
self._telemetry_processors = [] | ||||||
self.options = ExporterOptions(**options) | ||||||
retry_policy = RetryPolicy(timeout=self.options.timeout) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seconds are used as well in RetryPolicy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haha timeout is 7 days? Interesting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. users should also have the option to provide their own retry policy.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if we want to provide the users with the option to provide their own There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would incline towards having kwargs where users have the option to send in their own policies only because of the language precendent set in other python libraries in this repo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine keeping it as is and later bringing this up in the arch board too. |
||||||
proxy_policy = ProxyPolicy(proxies=self.options.proxies) | ||||||
rakshith91 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self.client = AzureMonitorClient( | ||||||
self.options.endpoint, proxy_policy=proxy_policy, retry_policy=retry_policy) | ||||||
self.storage = LocalFileStorage( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious as to whether there is logic in |
||||||
path=self.options.storage_path, | ||||||
max_size=self.options.storage_max_size, | ||||||
maintenance_period=self.options.storage_maintenance_period, | ||||||
retention_period=self.options.storage_retention_period, | ||||||
) | ||||||
|
||||||
def add_telemetry_processor( | ||||||
self, processor: typing.Callable[..., any] | ||||||
) -> None: | ||||||
"""Adds telemetry processor to the collection. | ||||||
|
||||||
Telemetry processors will be called one by one before telemetry | ||||||
item is pushed for sending and in the order they were added. | ||||||
|
||||||
Args: | ||||||
processor: Processor to add | ||||||
""" | ||||||
self._telemetry_processors.append(processor) | ||||||
|
||||||
def clear_telemetry_processors(self) -> None: | ||||||
"""Removes all telemetry processors""" | ||||||
self._telemetry_processors = [] | ||||||
|
||||||
def _apply_telemetry_processors( | ||||||
self, envelopes: typing.List[TelemetryItem] | ||||||
) -> typing.List[TelemetryItem]: | ||||||
"""Applies all telemetry processors in the order they were added. | ||||||
|
||||||
This function will return the list of envelopes to be exported after | ||||||
each processor has been run sequentially. Individual processors can | ||||||
throw exceptions and fail, but the applying of all telemetry processors | ||||||
will proceed (not fast fail). Processors also return True if envelope | ||||||
should be included for exporting, False otherwise. | ||||||
|
||||||
Args: | ||||||
envelopes: The envelopes to apply each processor to. | ||||||
""" | ||||||
filtered_envelopes = [] | ||||||
for envelope in envelopes: | ||||||
accepted = True | ||||||
for processor in self._telemetry_processors: | ||||||
try: | ||||||
if processor(envelope) is False: | ||||||
accepted = False | ||||||
break | ||||||
except Exception as ex: | ||||||
logger.warning("Telemetry processor failed with: %s.", ex) | ||||||
if accepted: | ||||||
filtered_envelopes.append(envelope) | ||||||
return filtered_envelopes | ||||||
|
||||||
def _transmit_from_storage(self) -> None: | ||||||
for blob in self.storage.gets(): | ||||||
# give a few more seconds for blob lease operation | ||||||
# to reduce the chance of race (for perf consideration) | ||||||
if blob.lease(self.options.timeout + 5): | ||||||
envelopes = blob.get() | ||||||
result = self._transmit(envelopes) | ||||||
if result == ExportResult.FAILED_RETRYABLE: | ||||||
blob.lease(1) | ||||||
else: | ||||||
blob.delete() | ||||||
|
||||||
# pylint: disable=too-many-branches | ||||||
# pylint: disable=too-many-nested-blocks | ||||||
# pylint: disable=too-many-return-statements | ||||||
def _transmit(self, envelopes: typing.List[TelemetryItem]) -> ExportResult: | ||||||
""" | ||||||
Transmit the data envelopes to the ingestion service. | ||||||
|
||||||
Returns an ExportResult, this function should never | ||||||
throw an exception. | ||||||
""" | ||||||
if len(envelopes) > 0: | ||||||
try: | ||||||
track_response = self.client.track(envelopes) | ||||||
if not track_response.errors: | ||||||
logger.info("Transmission succeeded: Item received: %s. Items accepted: %s", | ||||||
track_response.items_received, track_response.items_accepted) | ||||||
return ExportResult.SUCCESS | ||||||
else: | ||||||
resend_envelopes = [] | ||||||
lzchen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
for error in track_response.errors: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be a try/catch here as well, in case local storage messes up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is already included in a try/catch block, are you thinking about a specific exception to consider? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, |
||||||
if self._is_retryable_code(error.statusCode): | ||||||
resend_envelopes.append( | ||||||
envelopes[error.index] | ||||||
) | ||||||
else: | ||||||
logger.error( | ||||||
hectorhdzg marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
"Data drop %s: %s %s.", | ||||||
error.statusCode, | ||||||
error.message, | ||||||
envelopes[error.index], | ||||||
) | ||||||
if resend_envelopes: | ||||||
self.storage.put(resend_envelopes) | ||||||
|
||||||
except HttpResponseError as response_error: | ||||||
if self._is_retryable_code(response_error.status_code): | ||||||
return ExportResult.FAILED_RETRYABLE | ||||||
else: | ||||||
return ExportResult.FAILED_NOT_RETRYABLE | ||||||
except Exception as ex: | ||||||
lzchen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
logger.warning( | ||||||
"Retrying due to transient client side error %s.", ex | ||||||
) | ||||||
# client side error (retryable) | ||||||
return ExportResult.FAILED_RETRYABLE | ||||||
return ExportResult.FAILED_NOT_RETRYABLE | ||||||
# No spans to export | ||||||
return ExportResult.SUCCESS | ||||||
|
||||||
def _is_retryable_code(self, response_code: int) -> bool: | ||||||
""" | ||||||
Determine if response is retryable | ||||||
""" | ||||||
if response_code in ( | ||||||
206, # Retriable | ||||||
408, # Timeout | ||||||
hectorhdzg marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
429, # Throttle, too Many Requests | ||||||
439, # Quota, too Many Requests over extended time | ||||||
500, # Internal Server Error | ||||||
503, # Service Unavailable | ||||||
): | ||||||
return True | ||||||
else: | ||||||
return False | ||||||
|
||||||
|
||||||
def get_trace_export_result(result: ExportResult) -> SpanExportResult: | ||||||
if result == ExportResult.SUCCESS: | ||||||
return SpanExportResult.SUCCESS | ||||||
if result in ( | ||||||
ExportResult.FAILED_RETRYABLE, | ||||||
ExportResult.FAILED_NOT_RETRYABLE, | ||||||
): | ||||||
return SpanExportResult.FAILURE | ||||||
return None | ||||||
|
||||||
|
||||||
def get_metrics_export_result(result: ExportResult) -> MetricsExportResult: | ||||||
if result == ExportResult.SUCCESS: | ||||||
return MetricsExportResult.SUCCESS | ||||||
if result in ( | ||||||
ExportResult.FAILED_RETRYABLE, | ||||||
ExportResult.FAILED_NOT_RETRYABLE, | ||||||
): | ||||||
return MetricsExportResult.FAILURE | ||||||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we changing the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this also doesn't follow semver? (open-telemetry/opentelemetry-python#1222 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would release this package eventually as the first preview of the major version.
I've just updated it with the language precedent set. the python way to say 1.0.0-beta (a semantic guideline of semver) would be 1.0.0b1 (syntactic to python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, we currently adhere to semvers for all the GAed libraries- and for pre releases, we are compatible with pep 440 , https://www.python.org/dev/peps/pep-0440/#id27