Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement shutdown procedure for OTLP grpc exporters #3138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

- PeriodicExportingMetricReader will continue if collection times out
([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100))
- Fix formatting of ConsoleMetricExporter.
([#3197](https://github.com/open-telemetry/opentelemetry-python/pull/3197))

- Implement shutdown procedure forOTLP grpc exporters
([#3138](https://github.com/open-telemetry/opentelemetry-python/pull/3138))
- Add exponential histogram
([#2964](https://github.com/open-telemetry/opentelemetry-python/pull/2964))

## Version 1.16.0/0.37b0 (2023-02-15)
## Version 1.16.0/0.37b0 (2023-02-17)

- Change ``__all__`` to be statically defined.
([#3143](https://github.com/open-telemetry/opentelemetry-python/pull/3143))
Expand Down Expand Up @@ -398,7 +400,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK
to let distros use its default implementation
([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937))
- Add Trace ID validation to meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992))
- Add Trace ID validation to
meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992))
- Fixed Python 3.10 incompatibility in `opentelemetry-opentracing-shim` tests
([#2018](https://github.com/open-telemetry/opentelemetry-python/pull/2018))
- `opentelemetry-sdk` added support for `OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT`
Expand Down Expand Up @@ -729,7 +732,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1471](https://github.com/open-telemetry/opentelemetry-python/pull/1471))
- Add support for Python 3.9
([#1441](https://github.com/open-telemetry/opentelemetry-python/pull/1441))
- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable, when using opentelemetry-instrument command.
- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable,
when using opentelemetry-instrument command.
([#1461](https://github.com/open-telemetry/opentelemetry-python/pull/1461))
- Add `fields` to propagators
([#1374](https://github.com/open-telemetry/opentelemetry-python/pull/1374))
Expand Down Expand Up @@ -778,7 +782,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1533](https://github.com/open-telemetry/opentelemetry-python/pull/1533))
- `opentelemetry-sdk` The JaegerPropagator has been moved into its own package: `opentelemetry-propagator-jaeger`
([#1525](https://github.com/open-telemetry/opentelemetry-python/pull/1525))
- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and Zipkin exporters
- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and
Zipkin exporters
([#1535](https://github.com/open-telemetry/opentelemetry-python/pull/1535))
- `opentelemetry-sdk` Remove rate property setter from TraceIdRatioBasedSampler
([#1536](https://github.com/open-telemetry/opentelemetry-python/pull/1536))
Expand Down Expand Up @@ -888,7 +893,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1199](https://github.com/open-telemetry/opentelemetry-python/pull/1199))
- Add Global Error Handler
([#1080](https://github.com/open-telemetry/opentelemetry-python/pull/1080))
- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables
- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables
([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120))
- Adding Resource to MeterRecord
([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209))
Expand All @@ -913,7 +919,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151))
- Fixed OTLP events to Zipkin annotations translation.
([#1161](https://github.com/open-telemetry/opentelemetry-python/pull/1161))
- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of opentelemetry-instrumentation-flask.
- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of
opentelemetry-instrumentation-flask.
([#1138](https://github.com/open-telemetry/opentelemetry-python/pull/1138))
- Update sampling result names
([#1128](https://github.com/open-telemetry/opentelemetry-python/pull/1128))
Expand All @@ -923,7 +930,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203))
- Protect access to Span implementation
([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188))
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager.
- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context
manager.
([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162))

## Version 0.13b0 (2020-09-17)
Expand Down Expand Up @@ -1000,7 +1008,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#959](https://github.com/open-telemetry/opentelemetry-python/pull/959))
- Update default port to 55680
([#977](https://github.com/open-telemetry/opentelemetry-python/pull/977))
- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with jaeger-collector
- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with
jaeger-collector
([#908](https://github.com/open-telemetry/opentelemetry-python/pull/908))
- Send start_timestamp and convert labels to strings
([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

"""OTLP Exporter"""

from logging import getLogger
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence
from logging import getLogger
from os import environ
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence
from typing import TypeVar
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

import backoff
from google.rpc.error_details_pb2 import RetryInfo
Expand All @@ -37,6 +37,9 @@
ssl_channel_credentials,
)

from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)
from opentelemetry.proto.common.v1.common_pb2 import (
AnyValue,
ArrayValue,
Expand All @@ -51,12 +54,10 @@
OTEL_EXPORTER_OTLP_INSECURE,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.metrics.export import MetricsData
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)

logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
Expand Down Expand Up @@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:


def _translate_value(value: Any) -> KeyValue:

if isinstance(value, bool):
any_value = AnyValue(bool_value=value)

Expand Down Expand Up @@ -135,7 +135,6 @@ def get_resource_data(
resource_class: Callable[..., TypingResourceT],
name: str,
) -> List[TypingResourceT]:

resource_data = []

for (
Expand Down Expand Up @@ -282,6 +281,9 @@ def __init__(
secure_channel(endpoint, credentials, compression=compression)
)

self._export_lock = threading.Lock()
self._shutdown = False

@abstractmethod
def _translate_data(
self, data: TypingSequence[SDKDataT]
Expand All @@ -302,6 +304,11 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
Expand All @@ -317,69 +324,75 @@ def _export(
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _expo(max_value=max_value):

if delay == max_value:
if delay == max_value or self._shutdown:
return self._result.FAILURE

try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS
return self._result.SUCCESS

except RpcError as error:
except RpcError as error:

if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:

retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS
if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE
return self._result.FAILURE

return self._result.FAILURE

def shutdown(self) -> None:
pass
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis)
girishc13 marked this conversation as resolved.
Show resolved Hide resolved
self._shutdown = True
self._export_lock.release()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def _split_metrics_data(
yield MetricsData(resource_metrics=split_resource_metrics)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

@property
def _exporting(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def _translate_data(
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girishc13 Can you please help me understand why you are calling shutdown on mixin class directly instead of using super()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember the exact details but the shutdown method is implemented by both the OTLPExporterMixin and the SpanExporter interfaces. The exporter.shutdown is handled by different logic and this pr was targeting the backend client that needs to be shutdown. You need to trace the calls for shutdown from the usage of the OTLPSpanExporter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue maybe due to mixin inheritance being applied incorrectly, currently we have

class OTLPSpanExporter(SpanExporter, OTLPExporterMixin[...]):

but usually in python mixin should come before the base class, e.g.

class OTLPSpanExporter(OTLPExporterMixin[...], SpanExporter):

this way, super().shutdown() call will correctly use shutdown method from mixin


def force_flush(self, timeout_millis: int = 30000) -> bool:
return True

Expand Down
Loading