-
Notifications
You must be signed in to change notification settings - Fork 660
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement shutdown procedure for OTLP grpc exporters (#3138)
* Implement shutdown procedure for OTLP grpc exporters - Add `_shutdown` variable for checking if the exporter has been shutdown. - Prevent export if the `_shutdown` flag has been set. Log a warning message is exporter has been shutdown. - Use thread lock to synchronize the last export call before shutdown timeout. The `shutdown` method will wait until the `timeout_millis` if there is an ongoing export. If there is no ongiong export, set the `_shutdown` flag to prevent further exports and return. - Add unit tests for the `OTLPExporterMixIn` and the sub classes for traces and metrics. * lint files * add changelog entry for fix * lint test files --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Co-authored-by: Leighton Chen <lechen@microsoft.com>
- Loading branch information
1 parent
af582e9
commit c84ba94
Showing
7 changed files
with
277 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,16 +14,16 @@ | |
|
||
"""OTLP Exporter""" | ||
|
||
from logging import getLogger | ||
import threading | ||
from abc import ABC, abstractmethod | ||
from collections.abc import Sequence | ||
from logging import getLogger | ||
from os import environ | ||
from time import sleep | ||
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union | ||
from typing import Sequence as TypingSequence | ||
from typing import TypeVar | ||
from urllib.parse import urlparse | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
|
||
import backoff | ||
from google.rpc.error_details_pb2 import RetryInfo | ||
|
@@ -37,6 +37,9 @@ | |
ssl_channel_credentials, | ||
) | ||
|
||
from opentelemetry.exporter.otlp.proto.grpc import ( | ||
_OTLP_GRPC_HEADERS, | ||
) | ||
from opentelemetry.proto.common.v1.common_pb2 import ( | ||
AnyValue, | ||
ArrayValue, | ||
|
@@ -51,12 +54,10 @@ | |
OTEL_EXPORTER_OTLP_INSECURE, | ||
OTEL_EXPORTER_OTLP_TIMEOUT, | ||
) | ||
from opentelemetry.sdk.resources import Resource as SDKResource | ||
from opentelemetry.sdk.metrics.export import MetricsData | ||
from opentelemetry.sdk.resources import Resource as SDKResource | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.util.re import parse_env_headers | ||
from opentelemetry.exporter.otlp.proto.grpc import ( | ||
_OTLP_GRPC_HEADERS, | ||
) | ||
|
||
logger = getLogger(__name__) | ||
SDKDataT = TypeVar("SDKDataT") | ||
|
@@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]: | |
|
||
|
||
def _translate_value(value: Any) -> KeyValue: | ||
|
||
if isinstance(value, bool): | ||
any_value = AnyValue(bool_value=value) | ||
|
||
|
@@ -135,7 +135,6 @@ def get_resource_data( | |
resource_class: Callable[..., TypingResourceT], | ||
name: str, | ||
) -> List[TypingResourceT]: | ||
|
||
resource_data = [] | ||
|
||
for ( | ||
|
@@ -282,6 +281,9 @@ def __init__( | |
secure_channel(endpoint, credentials, compression=compression) | ||
) | ||
|
||
self._export_lock = threading.Lock() | ||
self._shutdown = False | ||
|
||
@abstractmethod | ||
def _translate_data( | ||
self, data: TypingSequence[SDKDataT] | ||
|
@@ -302,6 +304,11 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: | |
def _export( | ||
self, data: Union[TypingSequence[ReadableSpan], MetricsData] | ||
) -> ExportResultT: | ||
# After the call to shutdown, subsequent calls to Export are | ||
# not allowed and should return a Failure result. | ||
if self._shutdown: | ||
logger.warning("Exporter already shutdown, ignoring batch") | ||
return self._result.FAILURE | ||
|
||
# FIXME remove this check if the export type for traces | ||
# gets updated to a class that represents the proto | ||
|
@@ -317,69 +324,75 @@ def _export( | |
# exponentially. Once delay is greater than max_value, the yielded | ||
# value will remain constant. | ||
for delay in _expo(max_value=max_value): | ||
|
||
if delay == max_value: | ||
if delay == max_value or self._shutdown: | ||
return self._result.FAILURE | ||
|
||
try: | ||
self._client.Export( | ||
request=self._translate_data(data), | ||
metadata=self._headers, | ||
timeout=self._timeout, | ||
) | ||
with self._export_lock: | ||
try: | ||
self._client.Export( | ||
request=self._translate_data(data), | ||
metadata=self._headers, | ||
timeout=self._timeout, | ||
) | ||
|
||
return self._result.SUCCESS | ||
return self._result.SUCCESS | ||
|
||
except RpcError as error: | ||
except RpcError as error: | ||
|
||
if error.code() in [ | ||
StatusCode.CANCELLED, | ||
StatusCode.DEADLINE_EXCEEDED, | ||
StatusCode.RESOURCE_EXHAUSTED, | ||
StatusCode.ABORTED, | ||
StatusCode.OUT_OF_RANGE, | ||
StatusCode.UNAVAILABLE, | ||
StatusCode.DATA_LOSS, | ||
]: | ||
if error.code() in [ | ||
StatusCode.CANCELLED, | ||
StatusCode.DEADLINE_EXCEEDED, | ||
StatusCode.RESOURCE_EXHAUSTED, | ||
StatusCode.ABORTED, | ||
StatusCode.OUT_OF_RANGE, | ||
StatusCode.UNAVAILABLE, | ||
StatusCode.DATA_LOSS, | ||
]: | ||
|
||
retry_info_bin = dict(error.trailing_metadata()).get( | ||
"google.rpc.retryinfo-bin" | ||
) | ||
if retry_info_bin is not None: | ||
retry_info = RetryInfo() | ||
retry_info.ParseFromString(retry_info_bin) | ||
delay = ( | ||
retry_info.retry_delay.seconds | ||
+ retry_info.retry_delay.nanos / 1.0e9 | ||
retry_info_bin = dict(error.trailing_metadata()).get( | ||
"google.rpc.retryinfo-bin" | ||
) | ||
if retry_info_bin is not None: | ||
retry_info = RetryInfo() | ||
retry_info.ParseFromString(retry_info_bin) | ||
delay = ( | ||
retry_info.retry_delay.seconds | ||
+ retry_info.retry_delay.nanos / 1.0e9 | ||
) | ||
|
||
logger.warning( | ||
( | ||
"Transient error %s encountered while exporting " | ||
"%s, retrying in %ss." | ||
), | ||
error.code(), | ||
self._exporting, | ||
delay, | ||
) | ||
sleep(delay) | ||
continue | ||
else: | ||
logger.error( | ||
"Failed to export %s, error code: %s", | ||
self._exporting, | ||
error.code(), | ||
) | ||
|
||
logger.warning( | ||
( | ||
"Transient error %s encountered while exporting " | ||
"%s, retrying in %ss." | ||
), | ||
error.code(), | ||
self._exporting, | ||
delay, | ||
) | ||
sleep(delay) | ||
continue | ||
else: | ||
logger.error( | ||
"Failed to export %s, error code: %s", | ||
self._exporting, | ||
error.code(), | ||
) | ||
|
||
if error.code() == StatusCode.OK: | ||
return self._result.SUCCESS | ||
if error.code() == StatusCode.OK: | ||
return self._result.SUCCESS | ||
|
||
return self._result.FAILURE | ||
return self._result.FAILURE | ||
|
||
return self._result.FAILURE | ||
|
||
def shutdown(self) -> None: | ||
pass | ||
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: | ||
if self._shutdown: | ||
logger.warning("Exporter already shutdown, ignoring call") | ||
return | ||
# wait for the last export if any | ||
self._export_lock.acquire(timeout=timeout_millis) | ||
self._shutdown = True | ||
self._export_lock.release() | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
dimaqq
|
||
|
||
@property | ||
@abstractmethod | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
🐛 lock is released even if not acquired.
https://docs.python.org/3/library/threading.html#threading.Lock.acquire