From c84ba9483ca9b4642904b69897f931813d60cff0 Mon Sep 17 00:00:00 2001 From: Girish Chandrashekar Date: Tue, 14 Mar 2023 19:38:43 +0100 Subject: [PATCH] Implement shutdown procedure for OTLP grpc exporters (#3138) * Implement shutdown procedure for OTLP grpc exporters - Add `_shutdown` variable for checking if the exporter has been shutdown. - Prevent export if the `_shutdown` flag has been set. Log a warning message is exporter has been shutdown. - Use thread lock to synchronize the last export call before shutdown timeout. The `shutdown` method will wait until the `timeout_millis` if there is an ongoing export. If there is no ongiong export, set the `_shutdown` flag to prevent further exports and return. - Add unit tests for the `OTLPExporterMixIn` and the sub classes for traces and metrics. * lint files * add changelog entry for fix * lint test files --------- Co-authored-by: Srikanth Chekuri Co-authored-by: Leighton Chen --- CHANGELOG.md | 27 ++-- .../exporter/otlp/proto/grpc/exporter.py | 131 ++++++++++-------- .../proto/grpc/metric_exporter/__init__.py | 2 +- .../proto/grpc/trace_exporter/__init__.py | 3 + .../tests/test_otlp_exporter_mixin.py | 93 ++++++++++++- .../tests/test_otlp_metrics_exporter.py | 49 ++++++- .../tests/test_otlp_trace_exporter.py | 44 ++++++ 7 files changed, 277 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a72276d75dd..31dd72bdcc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,15 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased + - PeriodicExportingMetricReader will continue if collection times out ([#3100](https://github.com/open-telemetry/opentelemetry-python/pull/3100)) - Fix formatting of ConsoleMetricExporter. ([#3197](https://github.com/open-telemetry/opentelemetry-python/pull/3197)) - +- Implement shutdown procedure forOTLP grpc exporters + ([#3138](https://github.com/open-telemetry/opentelemetry-python/pull/3138)) - Add exponential histogram ([#2964](https://github.com/open-telemetry/opentelemetry-python/pull/2964)) -## Version 1.16.0/0.37b0 (2023-02-15) +## Version 1.16.0/0.37b0 (2023-02-17) - Change ``__all__`` to be statically defined. ([#3143](https://github.com/open-telemetry/opentelemetry-python/pull/3143)) @@ -398,7 +400,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK to let distros use its default implementation ([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937)) -- Add Trace ID validation to meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992)) +- Add Trace ID validation to + meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992)) - Fixed Python 3.10 incompatibility in `opentelemetry-opentracing-shim` tests ([#2018](https://github.com/open-telemetry/opentelemetry-python/pull/2018)) - `opentelemetry-sdk` added support for `OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT` @@ -729,7 +732,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1471](https://github.com/open-telemetry/opentelemetry-python/pull/1471)) - Add support for Python 3.9 ([#1441](https://github.com/open-telemetry/opentelemetry-python/pull/1441)) -- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable, when using opentelemetry-instrument command. +- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable, + when using opentelemetry-instrument command. ([#1461](https://github.com/open-telemetry/opentelemetry-python/pull/1461)) - Add `fields` to propagators ([#1374](https://github.com/open-telemetry/opentelemetry-python/pull/1374)) @@ -778,7 +782,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1533](https://github.com/open-telemetry/opentelemetry-python/pull/1533)) - `opentelemetry-sdk` The JaegerPropagator has been moved into its own package: `opentelemetry-propagator-jaeger` ([#1525](https://github.com/open-telemetry/opentelemetry-python/pull/1525)) -- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and Zipkin exporters +- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and + Zipkin exporters ([#1535](https://github.com/open-telemetry/opentelemetry-python/pull/1535)) - `opentelemetry-sdk` Remove rate property setter from TraceIdRatioBasedSampler ([#1536](https://github.com/open-telemetry/opentelemetry-python/pull/1536)) @@ -888,7 +893,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1199](https://github.com/open-telemetry/opentelemetry-python/pull/1199)) - Add Global Error Handler ([#1080](https://github.com/open-telemetry/opentelemetry-python/pull/1080)) -- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables +- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` + and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables ([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120)) - Adding Resource to MeterRecord ([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209)) @@ -913,7 +919,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151)) - Fixed OTLP events to Zipkin annotations translation. ([#1161](https://github.com/open-telemetry/opentelemetry-python/pull/1161)) -- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of opentelemetry-instrumentation-flask. +- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of + opentelemetry-instrumentation-flask. ([#1138](https://github.com/open-telemetry/opentelemetry-python/pull/1138)) - Update sampling result names ([#1128](https://github.com/open-telemetry/opentelemetry-python/pull/1128)) @@ -923,7 +930,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203)) - Protect access to Span implementation ([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188)) -- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. +- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context + manager. ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162)) ## Version 0.13b0 (2020-09-17) @@ -1000,7 +1008,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#959](https://github.com/open-telemetry/opentelemetry-python/pull/959)) - Update default port to 55680 ([#977](https://github.com/open-telemetry/opentelemetry-python/pull/977)) -- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with jaeger-collector +- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with + jaeger-collector ([#908](https://github.com/open-telemetry/opentelemetry-python/pull/908)) - Send start_timestamp and convert labels to strings ([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index c068f87d783..7a56120014c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,16 +14,16 @@ """OTLP Exporter""" -from logging import getLogger +import threading from abc import ABC, abstractmethod from collections.abc import Sequence +from logging import getLogger from os import environ from time import sleep from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from typing import TypeVar from urllib.parse import urlparse -from opentelemetry.sdk.trace import ReadableSpan import backoff from google.rpc.error_details_pb2 import RetryInfo @@ -37,6 +37,9 @@ ssl_channel_credentials, ) +from opentelemetry.exporter.otlp.proto.grpc import ( + _OTLP_GRPC_HEADERS, +) from opentelemetry.proto.common.v1.common_pb2 import ( AnyValue, ArrayValue, @@ -51,12 +54,10 @@ OTEL_EXPORTER_OTLP_INSECURE, OTEL_EXPORTER_OTLP_TIMEOUT, ) -from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.metrics.export import MetricsData +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers -from opentelemetry.exporter.otlp.proto.grpc import ( - _OTLP_GRPC_HEADERS, -) logger = getLogger(__name__) SDKDataT = TypeVar("SDKDataT") @@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]: def _translate_value(value: Any) -> KeyValue: - if isinstance(value, bool): any_value = AnyValue(bool_value=value) @@ -135,7 +135,6 @@ def get_resource_data( resource_class: Callable[..., TypingResourceT], name: str, ) -> List[TypingResourceT]: - resource_data = [] for ( @@ -282,6 +281,9 @@ def __init__( secure_channel(endpoint, credentials, compression=compression) ) + self._export_lock = threading.Lock() + self._shutdown = False + @abstractmethod def _translate_data( self, data: TypingSequence[SDKDataT] @@ -302,6 +304,11 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: def _export( self, data: Union[TypingSequence[ReadableSpan], MetricsData] ) -> ExportResultT: + # After the call to shutdown, subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown: + logger.warning("Exporter already shutdown, ignoring batch") + return self._result.FAILURE # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto @@ -317,69 +324,75 @@ def _export( # exponentially. Once delay is greater than max_value, the yielded # value will remain constant. for delay in _expo(max_value=max_value): - - if delay == max_value: + if delay == max_value or self._shutdown: return self._result.FAILURE - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) + with self._export_lock: + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=self._timeout, + ) - return self._result.SUCCESS + return self._result.SUCCESS - except RpcError as error: + except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: + if error.code() in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 + ) + + logger.warning( + ( + "Transient error %s encountered while exporting " + "%s, retrying in %ss." + ), + error.code(), + self._exporting, + delay, + ) + sleep(delay) + continue + else: + logger.error( + "Failed to export %s, error code: %s", + self._exporting, + error.code(), ) - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s, retrying in %ss." - ), - error.code(), - self._exporting, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s, error code: %s", - self._exporting, - error.code(), - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS + if error.code() == StatusCode.OK: + return self._result.SUCCESS - return self._result.FAILURE + return self._result.FAILURE return self._result.FAILURE - def shutdown(self) -> None: - pass + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + if self._shutdown: + logger.warning("Exporter already shutdown, ignoring call") + return + # wait for the last export if any + self._export_lock.acquire(timeout=timeout_millis) + self._shutdown = True + self._export_lock.release() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 942ef1d4aaa..8abb381cce8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -433,7 +433,7 @@ def _split_metrics_data( yield MetricsData(resource_metrics=split_resource_metrics) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index 555c9031560..0203c00ec36 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -290,6 +290,9 @@ def _translate_data( def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) + def shutdown(self) -> None: + OTLPExporterMixin.shutdown(self) + def force_flush(self, timeout_millis: int = 30000) -> bool: return True diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 81a874af705..c7577557405 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time from logging import WARNING from types import MethodType from typing import Sequence from unittest import TestCase from unittest.mock import Mock, patch +from google.protobuf.duration_pb2 import Duration +from google.rpc.error_details_pb2 import RetryInfo from grpc import Compression from opentelemetry.exporter.otlp.proto.grpc.exporter import ( @@ -58,7 +62,6 @@ def test_environ_to_compression(self): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") def test_export_warning(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [0]}) rpc_error = RpcError() @@ -69,7 +72,6 @@ def code(self): rpc_error.code = MethodType(code, rpc_error) class OTLPMockExporter(OTLPExporterMixin): - _result = Mock() _stub = Mock( **{"return_value": Mock(**{"Export.side_effect": rpc_error})} @@ -113,3 +115,90 @@ def trailing_metadata(self): "while exporting mock, retrying in 0s." ), ) + + def test_shutdown(self): + result_mock = Mock() + + class OTLPMockExporter(OTLPExporterMixin): + _result = result_mock + _stub = Mock(**{"return_value": Mock()}) + + def _translate_data( + self, data: Sequence[SDKDataT] + ) -> ExportServiceRequestT: + pass + + @property + def _exporting(self) -> str: + return "mock" + + otlp_mock_exporter = OTLPMockExporter() + + with self.assertLogs(level=WARNING) as warning: + # pylint: disable=protected-access + self.assertEqual( + otlp_mock_exporter._export(data={}), result_mock.SUCCESS + ) + otlp_mock_exporter.shutdown() + # pylint: disable=protected-access + self.assertEqual( + otlp_mock_exporter._export(data={}), result_mock.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + result_mock = Mock() + rpc_error = RpcError() + + def code(self): + return StatusCode.UNAVAILABLE + + def trailing_metadata(self): + return { + "google.rpc.retryinfo-bin": RetryInfo( + retry_delay=Duration(seconds=1) + ).SerializeToString() + } + + rpc_error.code = MethodType(code, rpc_error) + rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) + + class OTLPMockExporter(OTLPExporterMixin): + _result = result_mock + _stub = Mock( + **{"return_value": Mock(**{"Export.side_effect": rpc_error})} + ) + + def _translate_data( + self, data: Sequence[SDKDataT] + ) -> ExportServiceRequestT: + pass + + @property + def _exporting(self) -> str: + return "mock" + + otlp_mock_exporter = OTLPMockExporter() + + # pylint: disable=protected-access + export_thread = threading.Thread( + target=otlp_mock_exporter._export, args=({},) + ) + export_thread.start() + try: + # pylint: disable=protected-access + self.assertTrue(otlp_mock_exporter._export_lock.locked()) + # delay is 1 second while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + otlp_mock_exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access + self.assertTrue(otlp_mock_exporter._shutdown) + # pylint: disable=protected-access + self.assertFalse(otlp_mock_exporter._export_lock.locked()) + finally: + export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index b38d91eb83f..64ab205ad0e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=too-many-lines +import threading +import time from concurrent.futures import ThreadPoolExecutor + +# pylint: disable=too-many-lines +from logging import WARNING from os.path import dirname from typing import List from unittest import TestCase @@ -1485,6 +1489,49 @@ def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) mock_secure_channel.assert_called() + def test_shutdown(self): + add_MetricsServiceServicer_to_server( + MetricsServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.SUCCESS, + ) + self.exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + add_MetricsServiceServicer_to_server( + MetricsServiceServicerUNAVAILABLEDelay(), self.server + ) + + export_thread = threading.Thread( + target=self.exporter.export, args=(self.metrics["sum_int"],) + ) + export_thread.start() + try: + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_lock.locked()) + # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + self.exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown) + # pylint: disable=protected-access + self.assertFalse(self.exporter._export_lock.locked()) + finally: + export_thread.join() + def _resource_metrics( index: int, scope_metrics: List[ScopeMetrics] diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 36ae0a7c11e..2498da74b81 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -13,8 +13,11 @@ # limitations under the License. import os +import threading +import time from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor +from logging import WARNING from unittest import TestCase from unittest.mock import Mock, PropertyMock, patch @@ -929,6 +932,47 @@ def test_dropped_values(self): .dropped_attributes_count, ) + def test_shutdown(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.SUCCESS + ) + self.exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerUNAVAILABLEDelay(), self.server + ) + + export_thread = threading.Thread( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + try: + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_lock.locked()) + # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + self.exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown) + # pylint: disable=protected-access + self.assertFalse(self.exporter._export_lock.locked()) + finally: + export_thread.join() + def _create_span_with_status(status: SDKStatus): span = _Span(