-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
326 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
from datetime import datetime, timedelta | ||
from time import time | ||
|
||
import pytest | ||
|
||
from zsuite import exponential_delay | ||
from zsuite.backoff import ( | ||
_apply_bounds, | ||
_normalize_cutoff, | ||
_normalize_jitter_pct, | ||
_validate_arguments, | ||
) | ||
|
||
|
||
def test_initial_conditions(): | ||
with pytest.raises(ValueError): | ||
list(exponential_delay(minimum_sleep=0, enable_sleep=False)) | ||
|
||
with pytest.raises(ValueError): | ||
list(exponential_delay(max_sleep=0, enable_sleep=False)) | ||
|
||
with pytest.raises(ValueError): | ||
list(exponential_delay(backoff_factor=1, enable_sleep=False)) | ||
|
||
|
||
def test_jitter_conditions(): | ||
with pytest.raises(ValueError): | ||
list(exponential_delay(jitter_pct=-1, enable_sleep=False)) | ||
|
||
with pytest.raises(ValueError): | ||
list(exponential_delay(jitter_pct=101, enable_sleep=False)) | ||
|
||
|
||
def test_cutoff(): | ||
total_delay = 1 | ||
max_sleep = 0.25 | ||
cutoff = datetime.now() + timedelta(seconds=total_delay) | ||
delays = list(exponential_delay(minimum_sleep=0.001, max_sleep=max_sleep, cutoff=cutoff)) | ||
assert sum(delays) <= total_delay + max_sleep + max_sleep * 0.1 | ||
|
||
|
||
def test_max_attempts(): | ||
delays = list(exponential_delay(minimum_sleep=1, max_sleep=10, max_attempts=3, enable_sleep=False)) | ||
assert len(delays) == 3 # Should have 3 elements including the first 0 | ||
|
||
|
||
def test_minimum_sleep(): | ||
delays = list(exponential_delay(minimum_sleep=1, jitter_pct=0.9, max_attempts=100, enable_sleep=False)) | ||
assert delays[0] == 0 # The first sleep should be 0 | ||
assert all(d >= 1 for d in delays[1:]) # All other sleeps should be >= 1 | ||
|
||
|
||
def test_maximum_sleep(): | ||
delays = list( | ||
exponential_delay( | ||
minimum_sleep=1, | ||
max_sleep=10, | ||
jitter_pct=0.5, | ||
max_attempts=100, | ||
enable_sleep=False, | ||
) | ||
) | ||
assert delays[0] == 0 # The first sleep should be 0 | ||
assert all(1 <= d <= 10 for d in delays[1:]) # All other sleeps should be >= 1 | ||
|
||
|
||
def test_first_sleep(): | ||
first_sleep = next(exponential_delay(minimum_sleep=1, max_sleep=10)) | ||
assert first_sleep == 0 # The first sleep should be 0 | ||
|
||
|
||
def test_backoff(): | ||
max_attempts = 15 | ||
delays = list( | ||
exponential_delay( | ||
minimum_sleep=0.1, | ||
jitter_pct=0.0, | ||
max_sleep=60, | ||
max_attempts=max_attempts, | ||
enable_sleep=False, | ||
backoff_factor=2, | ||
) | ||
) | ||
expected = [ | ||
0, | ||
0.1, | ||
0.2, | ||
0.4, | ||
0.8, | ||
1.6, | ||
3.2, | ||
6.4, | ||
12.8, | ||
25.6, | ||
51.2, | ||
60, | ||
60, | ||
60, | ||
60, | ||
] | ||
assert len(delays) == max_attempts | ||
assert delays == expected | ||
|
||
|
||
# Tests for _normalize_jitter_pct function | ||
def test_normalize_jitter_pct_with_int(): | ||
assert _normalize_jitter_pct(10) == 0.1 | ||
assert _normalize_jitter_pct(100) == 1.0 | ||
assert _normalize_jitter_pct(0) == 0.0 | ||
|
||
|
||
def test_normalize_jitter_pct_with_float(): | ||
assert _normalize_jitter_pct(0.1) == 0.1 | ||
assert _normalize_jitter_pct(1.0) == 1.0 | ||
assert _normalize_jitter_pct(0.0) == 0.0 | ||
|
||
|
||
def test_normalize_jitter_pct_with_invalid_int(): | ||
with pytest.raises(ValueError): | ||
_normalize_jitter_pct(-1) | ||
with pytest.raises(ValueError): | ||
_normalize_jitter_pct(101) | ||
|
||
|
||
def test_normalize_jitter_pct_with_invalid_float(): | ||
with pytest.raises(ValueError): | ||
_normalize_jitter_pct(-0.1) | ||
with pytest.raises(ValueError): | ||
_normalize_jitter_pct(1.1) | ||
|
||
|
||
# Tests for _normalize_cutoff function | ||
def test_normalize_cutoff_with_datetime(): | ||
dt = datetime.now() | ||
assert round(_normalize_cutoff(dt) - dt.timestamp(), 3) == 0.0 | ||
|
||
|
||
def test_normalize_cutoff_with_timedelta(): | ||
td = timedelta(seconds=10) | ||
assert round(_normalize_cutoff(td) - (time() + td.total_seconds()), 3) == 0.0 | ||
|
||
|
||
def test_normalize_cutoff_with_int(): | ||
assert round(_normalize_cutoff(10) - (time() + 10), 3) == 0.0 | ||
|
||
|
||
# Tests for _validate_arguments function | ||
def test_validate_arguments_all_good(): | ||
_validate_arguments(1.1, 1, 1, 0.1) # Should not raise any exception | ||
|
||
|
||
def test_validate_arguments_bad_minimum_sleep(): | ||
with pytest.raises(ValueError, match="Initial sleep time must be greater than 0"): | ||
_validate_arguments(1.1, 1, 1, 0) | ||
|
||
|
||
def test_validate_arguments_bad_max_sleep(): | ||
with pytest.raises(ValueError, match="Maximum sleep time must be greater than 0"): | ||
_validate_arguments(1.1, 1, 0, 0.1) | ||
|
||
|
||
def test_validate_arguments_bad_backoff_factor(): | ||
with pytest.raises(ValueError, match="Backoff factor must be greater than 1"): | ||
_validate_arguments(1, 1, 1, 0.1) | ||
|
||
|
||
def test_validate_arguments_bad_max_attempts(): | ||
with pytest.raises(ValueError, match="If provided, max_attempts must be greater than 0"): | ||
_validate_arguments(1.1, 0, 1, 0.1) | ||
|
||
|
||
# Tests for _apply_bounds function | ||
def test_apply_bounds_within_range(): | ||
assert _apply_bounds(5, 1, 10) == 5 # Inside the range, should return the value itself | ||
|
||
|
||
def test_apply_bounds_equal_to_min(): | ||
assert _apply_bounds(1, 1, 10) == 1 # Equal to minimum, should return the minimum | ||
|
||
|
||
def test_apply_bounds_equal_to_max(): | ||
assert _apply_bounds(10, 1, 10) == 10 # Equal to maximum, should return the maximum | ||
|
||
|
||
def test_apply_bounds_below_min(): | ||
assert _apply_bounds(0, 1, 10) == 1 # Below minimum, should return the minimum | ||
|
||
|
||
def test_apply_bounds_above_max(): | ||
assert _apply_bounds(11, 1, 10) == 10 # Above maximum, should return the maximum |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .backoff import exponential_delay |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
from datetime import datetime, timedelta | ||
from random import uniform | ||
from time import sleep, time | ||
|
||
|
||
def exponential_delay( | ||
minimum_sleep: int | float = 0.1, | ||
max_sleep: int | float = 60, | ||
jitter_pct: int | float = 0.1, | ||
backoff_factor: float = 1.25, | ||
cutoff: datetime | timedelta | int | None = None, | ||
max_attempts: int | None = None, | ||
enable_sleep: bool = True, | ||
): | ||
""" | ||
Generates sleep times based on exponential backoff algorithm. | ||
NOTE: First sleep is always 0 seconds. This is to allow the caller to perform an initial action before sleeping. | ||
Parameters: | ||
- minimum_sleep (float): The initial sleep time in seconds. Must be greater than 0. | ||
- max_sleep (float): The maximum sleep time in seconds. Must be greater than 0. | ||
- backoff_factor (float): The multiplier applied to the current sleep time to get the next sleep time. | ||
- jitter_pct (int | float): Optional jitter percentage to apply. | ||
- If int, should be between 0 and 100. | ||
- If float, should be between 0 and 1. | ||
- cutoff (datetime | timedelta | int | None): Optional cutoff time for the sleep. | ||
- If datetime, uses the given datetime as the cutoff. | ||
- If timedelta, adds the delta to the current time to get the cutoff. | ||
- If int, adds the number of seconds to the current time to get the cutoff. | ||
- enable_sleep (bool): Whether to actually sleep for the generated sleep times. Useful for testing. | ||
Yields: | ||
- sleep_time (float): The next sleep time in seconds. | ||
Raises: | ||
- ValueError: If any of the parameters are out of their valid range. | ||
Examples:: | ||
for sleep_time in exponential_delay(initial_sleep=0.1, max_sleep=10, backoff_factor=1.15): | ||
print(f"Slept for {sleep_time} seconds") | ||
""" | ||
current_delay = minimum_sleep | ||
count = 1 | ||
|
||
# Validate & normalize arguments | ||
_validate_arguments(backoff_factor, max_attempts, max_sleep, minimum_sleep) | ||
cutoff = _normalize_cutoff(cutoff) | ||
jitter_pct = _normalize_jitter_pct(jitter_pct) | ||
|
||
# Main loop | ||
|
||
yield 0 # Always yield at least once, and the first sleep should be 0 | ||
while True: | ||
count += 1 | ||
if max_attempts is not None and count > max_attempts: | ||
return | ||
if cutoff and time() >= cutoff: | ||
return # stop iteration if we've reached the cutoff time | ||
|
||
this_sleep = current_delay # only apply jitter to the current sleep | ||
if jitter_pct: | ||
this_sleep = _apply_jitter(this_sleep, jitter_pct, minimum_sleep, max_sleep) | ||
this_sleep = _apply_bounds(this_sleep, minimum_sleep, max_sleep) | ||
|
||
if cutoff and time() + this_sleep > cutoff: | ||
this_sleep = cutoff - time() | ||
|
||
# Sleep (assuming it isn't disabled) and yield the sleep time | ||
if enable_sleep: | ||
sleep(this_sleep) # sleep before yielding the slept time | ||
yield this_sleep | ||
|
||
if current_delay < max_sleep: | ||
current_delay *= backoff_factor | ||
current_delay = min(current_delay, max_sleep) | ||
|
||
|
||
def _normalize_jitter_pct(jitter_pct): | ||
match jitter_pct: | ||
case int(): | ||
if jitter_pct < 0 or jitter_pct > 100: | ||
raise ValueError("Jitter must be int between 0 and 100 or float between 0 and 1") | ||
jitter_pct = float(jitter_pct * 0.01) | ||
case float(): | ||
if jitter_pct < 0 or jitter_pct > 1: | ||
raise ValueError("Jitter must be float between 0 and 1, or int between 0 and 100") | ||
return jitter_pct | ||
|
||
|
||
def _normalize_cutoff(cutoff): | ||
match cutoff: | ||
case datetime(): | ||
cutoff = cutoff.timestamp() | ||
case timedelta(): | ||
cutoff = time() + cutoff.total_seconds() | ||
case int(): | ||
cutoff = cutoff + time() | ||
return cutoff | ||
|
||
|
||
def _validate_arguments(backoff_factor, max_attempts, max_sleep, minimum_sleep): | ||
if minimum_sleep <= 0: | ||
raise ValueError("Initial sleep time must be greater than 0") | ||
if max_sleep <= 0: | ||
raise ValueError("Maximum sleep time must be greater than 0") | ||
if backoff_factor <= 1: | ||
raise ValueError("Backoff factor must be greater than 1") | ||
if max_attempts is not None and max_attempts <= 0: | ||
raise ValueError("If provided, max_attempts must be greater than 0") | ||
|
||
|
||
def _apply_bounds(this_sleep, minimum_sleep, max_sleep): | ||
if this_sleep < minimum_sleep: | ||
this_sleep = minimum_sleep | ||
if this_sleep > max_sleep: | ||
this_sleep = max_sleep | ||
return this_sleep | ||
|
||
|
||
def _apply_jitter(this_sleep, jitter_pct, minimum_sleep, max_sleep): | ||
upper = this_sleep + this_sleep * jitter_pct | ||
lower = this_sleep - this_sleep * jitter_pct | ||
if lower < minimum_sleep: | ||
lower = minimum_sleep | ||
if upper > max_sleep: | ||
upper = max_sleep | ||
this_sleep += uniform( | ||
lower, | ||
upper, | ||
) | ||
return this_sleep |