Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 5, 2023
1 parent 929db63 commit 604a836
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 31 deletions.
18 changes: 10 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
],
expected_retries=2),
# first attempts returns a 403 rateLimitExceeded error
# second attempt returns a 403 quotaExceeded error
# third attempt returns a Http 403 quotaExceeded error
# second attempt returns a 429 rateLimitExceeded error
# third attempt returns a Http 403 rateLimitExceeded error
# fourth attempt passes
param(
responses=[
Expand All @@ -444,17 +444,18 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
errors=({
"message": "transient", "reason": "rateLimitExceeded"
}, )),
exceptions.Forbidden(
exceptions.ResourceExhausted(
"some message",
errors=({
"message": "transient", "reason": "quotaExceeded"
"message": "transient", "reason": "rateLimitExceeded"
}, )),
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient", "reason": "quotaExceeded"
"message": "transient",
"reason": "rateLimitExceeded"
}]
}
},
Expand Down Expand Up @@ -504,16 +505,17 @@ def store_callback(unused_request):
self.assertEqual(expected_retries, mock_get_table.call_count - 2)

@parameterized.expand([
# first attempt returns a Http 403 with transient reason and retries
# first attempt returns a Http 429 with transient reason and retries
# second attempt returns a Http 403 with non-transient reason and fails
param(
responses=[
HttpForbiddenError(
response={'status': 403},
response={'status': 429},
content={
"error": {
"errors": [{
"message": "transient", "reason": "quotaExceeded"
"message": "transient",
"reason": "rateLimitExceeded"
}]
}
},
Expand Down
38 changes: 15 additions & 23 deletions sdks/python/apache_beam/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
try:
from apitools.base.py.exceptions import HttpError
from google.api_core.exceptions import GoogleAPICallError
from google.api_core import exceptions
except ImportError as e:
HttpError = None
GoogleAPICallError = None # type: ignore
exceptions = None

# Protect against environments where aws tools are not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand All @@ -60,18 +58,7 @@
# pylint: enable=wrong-import-order, wrong-import-position

_LOGGER = logging.getLogger(__name__)
_RETRYABLE_REASONS = [
"rateLimitExceeded", "quotaExceeded", "internalError", "backendError"
]
_RETRYABLE_TYPES = (
exceptions.TooManyRequests if exceptions else None,
exceptions.InternalServerError if exceptions else None,
exceptions.BadGateway if exceptions else None,
exceptions.ServiceUnavailable if exceptions else None,
exceptions.DeadlineExceeded if exceptions else None,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
ConnectionError)
_RETRYABLE_REASONS = ["rateLimitExceeded", "internalError", "backendError"]


class PermanentException(Exception):
Expand Down Expand Up @@ -181,28 +168,33 @@ def retry_on_server_errors_and_timeout_filter(exception):


def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
"""Retry on server, timeout and 403 errors.
"""Retry on server, timeout, 429, and some 403 errors.
403 errors include both transient (accessDenied, billingNotEnabled) and
non-transient errors (quotaExceeded, rateLimitExceeded). Only retry transient
errors."""
403 errors from BigQuery include both non-transient (accessDenied,
billingNotEnabled) and transient errors (rateLimitExceeded).
Only retry transient errors."""
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 429:
return True
if exception.status_code == 403:
try:
# attempt to extract the reason and check if it's retryable
return exception.content["error"]["errors"][0][
"reason"] in _RETRYABLE_REASONS
except Exception:
except (KeyError, IndexError, TypeError):
_LOGGER.debug(
"Could not determine if HttpError is non-transient. Will retry: %s",
"Could not determine if HttpError is non-transient. "
"Will not retry: %s",
exception)
return True
return False
if GoogleAPICallError is not None and isinstance(exception,
GoogleAPICallError):
if exception.code == 429:
return True
if exception.code == 403:
if not hasattr(exception, "errors") or len(exception.errors) == 0:
# default to retrying
return True
# default to not retrying
return False

reason = exception.errors[0]["reason"]
return reason in _RETRYABLE_REASONS
Expand Down

0 comments on commit 604a836

Please sign in to comment.