diff --git a/CHANGELOG.md b/CHANGELOG.md index f0bb37d48e..77e5579688 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3226](https://github.com/open-telemetry/opentelemetry-python/pull/3226)) - Fix suppress instrumentation for log batch processor ([#3223](https://github.com/open-telemetry/opentelemetry-python/pull/3223)) +- Add speced out environment variables and arguments for BatchLogRecordProcessor + ([#3237](https://github.com/open-telemetry/opentelemetry-python/pull/3237)) ## Version 1.17.0/0.38b0 (2023-03-22) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 6f2f1c2c26..4903873432 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -19,7 +19,7 @@ import os import sys import threading -from os import linesep +from os import environ, linesep from time import time_ns from typing import IO, Callable, Deque, List, Optional, Sequence @@ -30,8 +30,22 @@ set_value, ) from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) from opentelemetry.util._once import Once +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 +_DEFAULT_MAX_QUEUE_SIZE = 2048 +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( + "Unable to parse value for %s as integer. Defaulting to %s." +) + _logger = logging.getLogger(__name__) @@ -142,20 +156,54 @@ class BatchLogRecordProcessor(LogRecordProcessor): """This is an implementation of LogRecordProcessor which creates batches of received logs in the export-friendly LogData representation and send to the configured LogExporter, as soon as they are emitted. + + `BatchLogRecordProcessor` is configurable with the following environment + variables which correspond to constructor parameters: + + - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` + - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` + - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` + - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` """ def __init__( self, exporter: LogExporter, - schedule_delay_millis: int = 5000, - max_export_batch_size: int = 512, - export_timeout_millis: int = 30000, + schedule_delay_millis: float = None, + max_export_batch_size: int = None, + export_timeout_millis: float = None, + max_queue_size: int = None, ): + if max_queue_size is None: + max_queue_size = BatchLogRecordProcessor._default_max_queue_size() + + if schedule_delay_millis is None: + schedule_delay_millis = ( + BatchLogRecordProcessor._default_schedule_delay_millis() + ) + + if max_export_batch_size is None: + max_export_batch_size = ( + BatchLogRecordProcessor._default_max_export_batch_size() + ) + + if export_timeout_millis is None: + export_timeout_millis = ( + BatchLogRecordProcessor._default_export_timeout_millis() + ) + + BatchLogRecordProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) + self._exporter = exporter + self._max_queue_size = max_queue_size self._schedule_delay_millis = schedule_delay_millis self._max_export_batch_size = max_export_batch_size self._export_timeout_millis = export_timeout_millis - self._queue = collections.deque() # type: Deque[LogData] + self._queue = collections.deque( + [], max_queue_size + ) # type: Deque[LogData] self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", target=self.worker, @@ -333,3 +381,86 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if not ret: _logger.warning("Timeout was exceeded in force_flush().") return ret + + @staticmethod + def _default_max_queue_size(): + try: + return int( + environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_QUEUE_SIZE, + _DEFAULT_MAX_QUEUE_SIZE, + ) + return _DEFAULT_MAX_QUEUE_SIZE + + @staticmethod + def _default_schedule_delay_millis(): + try: + return int( + environ.get( + OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_SCHEDULE_DELAY, + _DEFAULT_SCHEDULE_DELAY_MILLIS, + ) + return _DEFAULT_SCHEDULE_DELAY_MILLIS + + @staticmethod + def _default_max_export_batch_size(): + try: + return int( + environ.get( + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + return _DEFAULT_MAX_EXPORT_BATCH_SIZE + + @staticmethod + def _default_export_timeout_millis(): + try: + return int( + environ.get( + OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS + ) + ) + except ValueError: + _logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BLRP_EXPORT_TIMEOUT, + _DEFAULT_EXPORT_TIMEOUT_MILLIS, + ) + return _DEFAULT_EXPORT_TIMEOUT_MILLIS + + @staticmethod + def _validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 8e36ae58c7..275aa48340 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -66,11 +66,43 @@ i.e. the SDK behaves as if OTEL_TRACES_SAMPLER_ARG is not set. """ +OTEL_BLRP_SCHEDULE_DELAY = "OTEL_BLRP_SCHEDULE_DELAY" +""" +.. envvar:: OTEL_BLRP_SCHEDULE_DELAY + +The :envvar:`OTEL_BLRP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchLogRecordProcessor. +Default: 5000 +""" + +OTEL_BLRP_EXPORT_TIMEOUT = "OTEL_BLRP_EXPORT_TIMEOUT" +""" +.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT + +The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor. +Default: 30000 +""" + +OTEL_BLRP_MAX_QUEUE_SIZE = "OTEL_BLRP_MAX_QUEUE_SIZE" +""" +.. envvar:: OTEL_BLRP_MAX_QUEUE_SIZE + +The :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchLogRecordProcessor. +Default: 2048 +""" + +OTEL_BLRP_MAX_EXPORT_BATCH_SIZE = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" +""" +.. envvar:: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE + +The :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchLogRecordProcessor. +Default: 512 +""" + OTEL_BSP_SCHEDULE_DELAY = "OTEL_BSP_SCHEDULE_DELAY" """ .. envvar:: OTEL_BSP_SCHEDULE_DELAY -The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports. +The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchSpanProcessor. Default: 5000 """ @@ -78,7 +110,7 @@ """ .. envvar:: OTEL_BSP_EXPORT_TIMEOUT -The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data. +The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchSpanProcessor. Default: 30000 """ @@ -86,7 +118,7 @@ """ .. envvar:: OTEL_BSP_MAX_QUEUE_SIZE -The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export. +The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchSpanProcessor. Default: 2048 """ @@ -94,7 +126,7 @@ """ .. envvar:: OTEL_BSP_MAX_EXPORT_BATCH_SIZE -The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export. +The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchSpanProcessor. Default: 512 """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index ef04895819..7f56a30172 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -39,6 +39,14 @@ from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor from opentelemetry.util._once import Once +_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 +_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 +_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000 +_DEFAULT_MAX_QUEUE_SIZE = 2048 +_ENV_VAR_INT_VALUE_ERROR_MESSAGE = ( + "Unable to parse value for %s as integer. Defaulting to %s." +) + logger = logging.getLogger(__name__) @@ -152,40 +160,27 @@ def __init__( max_export_batch_size: int = None, export_timeout_millis: float = None, ): - if max_queue_size is None: - max_queue_size = int(environ.get(OTEL_BSP_MAX_QUEUE_SIZE, 2048)) + max_queue_size = BatchSpanProcessor._default_max_queue_size() if schedule_delay_millis is None: - schedule_delay_millis = int( - environ.get(OTEL_BSP_SCHEDULE_DELAY, 5000) + schedule_delay_millis = ( + BatchSpanProcessor._default_schedule_delay_millis() ) if max_export_batch_size is None: - max_export_batch_size = int( - environ.get(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 512) + max_export_batch_size = ( + BatchSpanProcessor._default_max_export_batch_size() ) if export_timeout_millis is None: - export_timeout_millis = int( - environ.get(OTEL_BSP_EXPORT_TIMEOUT, 30000) + export_timeout_millis = ( + BatchSpanProcessor._default_export_timeout_millis() ) - if max_queue_size <= 0: - raise ValueError("max_queue_size must be a positive integer.") - - if schedule_delay_millis <= 0: - raise ValueError("schedule_delay_millis must be positive.") - - if max_export_batch_size <= 0: - raise ValueError( - "max_export_batch_size must be a positive integer." - ) - - if max_export_batch_size > max_queue_size: - raise ValueError( - "max_export_batch_size must be less than or equal to max_queue_size." - ) + BatchSpanProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) self.span_exporter = span_exporter self.queue = collections.deque( @@ -417,6 +412,89 @@ def shutdown(self) -> None: self.worker_thread.join() self.span_exporter.shutdown() + @staticmethod + def _default_max_queue_size(): + try: + return int( + environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_MAX_QUEUE_SIZE, + _DEFAULT_MAX_QUEUE_SIZE, + ) + return _DEFAULT_MAX_QUEUE_SIZE + + @staticmethod + def _default_schedule_delay_millis(): + try: + return int( + environ.get( + OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_SCHEDULE_DELAY, + _DEFAULT_SCHEDULE_DELAY_MILLIS, + ) + return _DEFAULT_SCHEDULE_DELAY_MILLIS + + @staticmethod + def _default_max_export_batch_size(): + try: + return int( + environ.get( + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_MAX_EXPORT_BATCH_SIZE, + _DEFAULT_MAX_EXPORT_BATCH_SIZE, + ) + return _DEFAULT_MAX_EXPORT_BATCH_SIZE + + @staticmethod + def _default_export_timeout_millis(): + try: + return int( + environ.get( + OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS + ) + ) + except ValueError: + logger.exception( + _ENV_VAR_INT_VALUE_ERROR_MESSAGE, + OTEL_BSP_EXPORT_TIMEOUT, + _DEFAULT_EXPORT_TIMEOUT_MILLIS, + ) + return _DEFAULT_EXPORT_TIMEOUT_MILLIS + + @staticmethod + def _validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ): + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) + class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 6a6b1d5bf8..aa68b09624 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -35,6 +35,12 @@ InMemoryLogExporter, SimpleLogRecordProcessor, ) +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.concurrency_test import ConcurrencyTestBase @@ -175,6 +181,127 @@ def test_emit_call_log_record(self): logger.error("error") self.assertEqual(log_record_processor.emit.call_count, 1) + def test_args(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_queue_size=1024, + schedule_delay_millis=2500, + max_export_batch_size=256, + export_timeout_millis=15000, + ) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 1024) + self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._max_export_batch_size, 256) + self.assertEqual(log_record_processor._export_timeout_millis, 15000) + + @patch.dict( + "os.environ", + { + OTEL_BLRP_MAX_QUEUE_SIZE: "1024", + OTEL_BLRP_SCHEDULE_DELAY: "2500", + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: "256", + OTEL_BLRP_EXPORT_TIMEOUT: "15000", + }, + ) + def test_env_vars(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor(exporter) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 1024) + self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._max_export_batch_size, 256) + self.assertEqual(log_record_processor._export_timeout_millis, 15000) + + def test_args_defaults(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor(exporter) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 2048) + self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._max_export_batch_size, 512) + self.assertEqual(log_record_processor._export_timeout_millis, 30000) + + @patch.dict( + "os.environ", + { + OTEL_BLRP_MAX_QUEUE_SIZE: "a", + OTEL_BLRP_SCHEDULE_DELAY: " ", + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: "One", + OTEL_BLRP_EXPORT_TIMEOUT: "@", + }, + ) + def test_args_env_var_value_error(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor(exporter) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 2048) + self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._max_export_batch_size, 512) + self.assertEqual(log_record_processor._export_timeout_millis, 30000) + + def test_args_none_defaults(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_queue_size=None, + schedule_delay_millis=None, + max_export_batch_size=None, + export_timeout_millis=None, + ) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 2048) + self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._max_export_batch_size, 512) + self.assertEqual(log_record_processor._export_timeout_millis, 30000) + + def test_validation_negative_max_queue_size(self): + exporter = InMemoryLogExporter() + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + schedule_delay_millis=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + schedule_delay_millis=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_export_batch_size=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_export_batch_size=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=100, + max_export_batch_size=101, + ) + def test_shutdown(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor(exporter) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 422945798f..cb7739f328 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -175,7 +175,7 @@ class TestBatchSpanProcessor(ConcurrencyTestBase): OTEL_BSP_EXPORT_TIMEOUT: "4", }, ) - def test_batch_span_processor_environment_variables(self): + def test_args_env_var(self): batch_span_processor = export.BatchSpanProcessor( MySpanExporter(destination=[]) @@ -186,6 +186,37 @@ def test_batch_span_processor_environment_variables(self): self.assertEqual(batch_span_processor.max_export_batch_size, 3) self.assertEqual(batch_span_processor.export_timeout_millis, 4) + def test_args_env_var_defaults(self): + + batch_span_processor = export.BatchSpanProcessor( + MySpanExporter(destination=[]) + ) + + self.assertEqual(batch_span_processor.max_queue_size, 2048) + self.assertEqual(batch_span_processor.schedule_delay_millis, 5000) + self.assertEqual(batch_span_processor.max_export_batch_size, 512) + self.assertEqual(batch_span_processor.export_timeout_millis, 30000) + + @mock.patch.dict( + "os.environ", + { + OTEL_BSP_MAX_QUEUE_SIZE: "a", + OTEL_BSP_SCHEDULE_DELAY: " ", + OTEL_BSP_MAX_EXPORT_BATCH_SIZE: "One", + OTEL_BSP_EXPORT_TIMEOUT: "@", + }, + ) + def test_args_env_var_value_error(self): + + batch_span_processor = export.BatchSpanProcessor( + MySpanExporter(destination=[]) + ) + + self.assertEqual(batch_span_processor.max_queue_size, 2048) + self.assertEqual(batch_span_processor.schedule_delay_millis, 5000) + self.assertEqual(batch_span_processor.max_export_batch_size, 512) + self.assertEqual(batch_span_processor.export_timeout_millis, 30000) + def test_on_start_accepts_parent_context(self): # pylint: disable=no-self-use my_exporter = MySpanExporter(destination=[])