Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out duplicate backoff code #3396

Merged
merged 8 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ classifiers = [
]
dependencies = [
"opentelemetry-proto == 1.20.0.dev",
"backoff >= 1.10.0, < 2.0.0; python_version<'3.7'",
"backoff >= 1.10.0, < 3.0.0; python_version>='3.7'",
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from collections.abc import Sequence
from typing import Any, Mapping, Optional, List, Callable, TypeVar, Dict

import backoff

from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.proto.common.v1.common_pb2 import (
InstrumentationScope as PB2InstrumentationScope,
Expand Down Expand Up @@ -130,3 +132,16 @@ def _get_resource_data(
)
)
return resource_data


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _create_exp_backoff_generator(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
_create_exp_backoff_generator,
)
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -137,19 +137,6 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -266,7 +253,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _expo(max_value=max_value):
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
return self._result.FAILURE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ def test_otlp_headers_from_env(self):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -306,7 +308,9 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
def test_export_warning(self, mock_expo):
mock_expo.configure_mock(**{"return_value": [0]})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
mock_method.reset_mock()

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
@patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"})
def test_otlp_exporter_otlp_compression_envvar(
Expand Down Expand Up @@ -405,7 +407,9 @@ def test_otlp_exporter_otlp_compression_unspecified(
"localhost:4317", compression=Compression.NoCompression
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -420,7 +424,9 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_key_value,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Expand Down Expand Up @@ -460,24 +459,9 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
pmcollins marked this conversation as resolved.
Show resolved Hide resolved
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
if _is_backoff_v2:
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -486,12 +470,13 @@ def test_unavailable(self, mock_sleep, mock_expo):
add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.span]), SpanExportResult.FAILURE
)
result = self.exporter.export([self.span])
self.assertEqual(result, SpanExportResult.FAILURE)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -56,18 +58,6 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -147,7 +137,9 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = encode_logs(batch).SerializeToString()

for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
OTLPMetricExporterMixin,
Expand Down Expand Up @@ -73,7 +74,6 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_env_headers

import backoff
import requests
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
Expand All @@ -87,18 +87,6 @@
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin):

Expand Down Expand Up @@ -181,7 +169,9 @@ def export(
**kwargs,
) -> MetricExportResult:
serialized_data = encode_metrics(metrics_data)
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
encode_spans,
)
Expand Down Expand Up @@ -54,18 +56,6 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -145,7 +135,9 @@ def export(self, spans) -> SpanExportResult:

serialized_data = encode_spans(spans).SerializeToString()

for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from requests.models import Response
from responses import POST, activate, add

from opentelemetry.exporter.otlp.proto.common._internal import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
Expand All @@ -31,7 +32,6 @@
DEFAULT_METRICS_EXPORT_PATH,
DEFAULT_TIMEOUT,
OTLPMetricExporter,
_is_backoff_v2,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -298,7 +298,7 @@ def test_serialization(self, mock_post):
)

@activate
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import responses

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
DEFAULT_COMPRESSION,
DEFAULT_ENDPOINT,
DEFAULT_LOGS_EXPORT_PATH,
DEFAULT_TIMEOUT,
OTLPLogExporter,
_is_backoff_v2,
)
from opentelemetry.exporter.otlp.proto.http.version import __version__
from opentelemetry.sdk._logs import LogData
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_exporter_env(self):
self.assertIsInstance(exporter._session, requests.Session)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand Down
Loading