From 42a1b0a59a641623d0842be31d6d30403aef2540 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 11 Nov 2024 15:25:16 +0530 Subject: [PATCH 1/5] Added the exponential backoff code --- src/databricks/sql/auth/retry.py | 15 ++++++++++++++- src/databricks/sql/thrift_backend.py | 4 ++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 0c6547cb..92f0a9da 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -285,7 +285,7 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: """ retry_after = self.get_retry_after(response) if retry_after: - backoff = self.get_backoff_time() + backoff = self.get_exponential_backoff() proposed_wait = max(backoff, retry_after) self.check_proposed_wait(proposed_wait) time.sleep(proposed_wait) @@ -293,6 +293,19 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: return False + def get_exponential_backoff(self) -> float: + """ + This method implements the exponential backoff algorithm to calculate the delay between retries. + :return: + """ + + current_attempt = self.stop_after_attempts_count - self.total + proposed_backoff = (2**current_attempt) * self.delay_min + proposed_backoff = min(proposed_backoff, self.delay_max) + self.check_proposed_wait(proposed_backoff) + + return proposed_backoff + def get_backoff_time(self) -> float: """Calls urllib3's built-in get_backoff_time. diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index cf5cd906..29be5482 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -64,8 +64,8 @@ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins) _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), - "_retry_delay_max": (float, 60, 5, 3600), - "_retry_stop_after_attempts_count": (int, 30, 1, 60), + "_retry_delay_max": (float, 30, 5, 3600), + "_retry_stop_after_attempts_count": (int, 5, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), } From 1330ad7f59d82a8c3bc94917cae59e776d8c7a3f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 12 Nov 2024 11:52:00 +0530 Subject: [PATCH 2/5] Added the exponential backoff algorithm and refractored the code --- src/databricks/sql/auth/retry.py | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 92f0a9da..f813b01e 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -285,38 +285,27 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: """ retry_after = self.get_retry_after(response) if retry_after: - backoff = self.get_exponential_backoff() - proposed_wait = max(backoff, retry_after) - self.check_proposed_wait(proposed_wait) - time.sleep(proposed_wait) - return True + proposed_wait = retry_after + else: + proposed_wait = self.get_exponential_backoff() - return False + proposed_wait = min(proposed_wait, self.delay_max) + self.check_proposed_wait(proposed_wait) + time.sleep(proposed_wait) + return True def get_exponential_backoff(self) -> float: """ This method implements the exponential backoff algorithm to calculate the delay between retries. - :return: - """ - - current_attempt = self.stop_after_attempts_count - self.total - proposed_backoff = (2**current_attempt) * self.delay_min - proposed_backoff = min(proposed_backoff, self.delay_max) - self.check_proposed_wait(proposed_backoff) - - return proposed_backoff - - def get_backoff_time(self) -> float: - """Calls urllib3's built-in get_backoff_time. Never returns a value larger than self.delay_max A MaxRetryDurationError will be raised if the calculated backoff would exceed self.max_attempts_duration - Note: within urllib3, a backoff is only calculated in cases where a Retry-After header is not present - in the previous unsuccessful request and `self.respect_retry_after_header` is True (which is always true) + :return: """ - proposed_backoff = super().get_backoff_time() + current_attempt = self.stop_after_attempts_count - self.total + proposed_backoff = (2**current_attempt) * self.delay_min proposed_backoff = min(proposed_backoff, self.delay_max) self.check_proposed_wait(proposed_backoff) From 288f09c1d36c1b86a072511b99e670edb973af55 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 19 Nov 2024 11:45:36 +0530 Subject: [PATCH 3/5] Added jitter and added unit tests --- src/databricks/sql/auth/retry.py | 8 +++-- tests/unit/test_retry.py | 56 +++++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index f813b01e..ec321bed 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -1,4 +1,5 @@ import logging +import random import time import typing from enum import Enum @@ -287,14 +288,14 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: if retry_after: proposed_wait = retry_after else: - proposed_wait = self.get_exponential_backoff() + proposed_wait = self.get_backoff_time() proposed_wait = min(proposed_wait, self.delay_max) self.check_proposed_wait(proposed_wait) time.sleep(proposed_wait) return True - def get_exponential_backoff(self) -> float: + def get_backoff_time(self) -> float: """ This method implements the exponential backoff algorithm to calculate the delay between retries. @@ -306,6 +307,9 @@ def get_exponential_backoff(self) -> float: current_attempt = self.stop_after_attempts_count - self.total proposed_backoff = (2**current_attempt) * self.delay_min + if self.backoff_jitter != 0.0: + proposed_backoff += random.random() * self.backoff_jitter + proposed_backoff = min(proposed_backoff, self.delay_max) self.check_proposed_wait(proposed_backoff) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 2108af4f..d5e56361 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -1,11 +1,12 @@ from os import error import time -from unittest.mock import Mock, patch +from unittest.mock import Mock, patch, call import pytest from requests import Request from urllib3 import HTTPResponse -from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory - +from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory, CommandType +from databricks.sql.exc import MaxRetryDurationError +from urllib3.exceptions import MaxRetryError class TestRetry: @pytest.fixture() @@ -25,32 +26,55 @@ def error_history(self) -> RequestHistory: method="POST", url=None, error=None, status=503, redirect_location=None ) + def calculate_backoff_time(self, attempt, delay_min, delay_max): + exponential_backoff_time = (2**attempt) * delay_min + return min(exponential_backoff_time, delay_max) + @patch("time.sleep") def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy._retry_start_time = time.time() retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - t_mock.assert_called_with(2) + + expected_backoff_time = self.calculate_backoff_time(0, retry_policy.delay_min, retry_policy.delay_max) + t_mock.assert_called_with(expected_backoff_time) @patch("time.sleep") - def test_sleep__retry_after_is_binding(self, t_mock, retry_policy, error_history): + def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_policy): + num_attempts = retry_policy.stop_after_attempts_count + retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(3) + retry_policy.command_type = CommandType.OTHER + + for attempt in range(num_attempts): + retry_policy.sleep(HTTPResponse(status=503)) + # Internally urllib3 calls the increment function generating a new instance for every retry + retry_policy = retry_policy.increment() + + expected_backoff_times = [] + for attempt in range(num_attempts): + expected_backoff_times.append(self.calculate_backoff_time(attempt, retry_policy.delay_min, retry_policy.delay_max)) + + # Asserts if the sleep value was called in the expected order + t_mock.assert_has_calls([call(expected_time) for expected_time in expected_backoff_times]) @patch("time.sleep") - def test_sleep__retry_after_present_but_not_binding( - self, t_mock, retry_policy, error_history - ): + def test_excessive_retry_attempts_error(self, t_mock, retry_policy): + # Attempting more than stop_after_attempt_count + num_attempts = retry_policy.stop_after_attempts_count + 1 + retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "1"})) - t_mock.assert_called_with(2) + retry_policy.command_type = CommandType.OTHER + + with pytest.raises(MaxRetryError): + for attempt in range(num_attempts): + retry_policy.sleep(HTTPResponse(status=503)) + # Internally urllib3 calls the increment function generating a new instance for every retry + retry_policy = retry_policy.increment() @patch("time.sleep") - def test_sleep__retry_after_surpassed(self, t_mock, retry_policy, error_history): + def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history): retry_policy._retry_start_time = time.time() retry_policy.history = [error_history, error_history, error_history] retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(4) + t_mock.assert_called_with(3) From bb47c92d3161a3b610487c16ce104a0f6a395d3f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 19 Nov 2024 11:47:06 +0530 Subject: [PATCH 4/5] Reformatted --- tests/unit/test_retry.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index d5e56361..b7648ffb 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -1,11 +1,8 @@ -from os import error import time -from unittest.mock import Mock, patch, call +from unittest.mock import patch, call import pytest -from requests import Request from urllib3 import HTTPResponse from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory, CommandType -from databricks.sql.exc import MaxRetryDurationError from urllib3.exceptions import MaxRetryError class TestRetry: From 9baf38074855445859d8b18f57ff6323e329527b Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 20 Nov 2024 12:34:24 +0530 Subject: [PATCH 5/5] Fixed the test_retry_exponential_backoff integration test --- tests/e2e/common/retry_test_mixins.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index 7dd5f745..942955ca 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -174,7 +174,7 @@ def test_retry_max_count_not_exceeded(self): def test_retry_exponential_backoff(self): """GIVEN the retry policy is configured for reasonable exponential backoff WHEN the server sends nothing but 429 responses with retry-afters - THEN the connector will use those retry-afters as a floor + THEN the connector will use those retry-afters values as delay """ retry_policy = self._retry_policy.copy() retry_policy["_retry_delay_min"] = 1 @@ -191,10 +191,10 @@ def test_retry_exponential_backoff(self): assert isinstance(cm.value.args[1], MaxRetryDurationError) # With setting delay_min to 1, the expected retry delays should be: - # 3, 3, 4 - # The first 2 retries are allowed, the 3rd retry puts the total duration over the limit + # 3, 3, 3, 3 + # The first 3 retries are allowed, the 4th retry puts the total duration over the limit # of 10 seconds - assert mock_obj.return_value.getresponse.call_count == 3 + assert mock_obj.return_value.getresponse.call_count == 4 assert duration > 6 # Should be less than 7, but this is a safe margin for CI/CD slowness