Skip to content

Commit

Permalink
Add exponential backoff to avoid rate limiting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Jun 4, 2024
1 parent 7167f96 commit e7b60e0
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union
Expand Down Expand Up @@ -150,13 +151,19 @@ class TransferFilesWithConcurrency(beam.DoFn):
fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
max_retries: The maximum number of retries for failed file transfers.
initial_backoff: The initial backoff time in seconds before retrying a failed transfer.
backoff_factor: The factor by which the backoff time is multiplied after each retry.
"""

transfer_target: CacheFSSpecTarget
max_concurrency: int
secrets: Optional[Dict] = None
open_kwargs: Optional[Dict] = None
fsspec_sync_patch: bool = False
max_retries: int
initial_backoff: float
backoff_factor: float

def process(self, indexed_urls):
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
Expand All @@ -172,10 +179,22 @@ def process(self, indexed_urls):
_, url = futures[future]
logger.error(f"Error transferring file {url}: {e}")

def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]:
open_kwargs = self.open_kwargs or {}
self.transfer_target.cache_file(url, self.secrets, self.fsspec_sync_patch, **open_kwargs)
return (index, self.transfer_target._full_path(url))
def transfer_file(self, index: int, url: str) -> Tuple[int, str]:
retries = 0
while retries <= self.max_retries:
try:
open_kwargs = self.open_kwargs or {}
self.transfer_target.cache_file(url, self.secrets, self.fsspec_sync_patch, **open_kwargs)
return (index, self.transfer_target._full_path(url))
except Exception as e:
if retries == self.max_retries:
logger.error(f"Max retries reached for {url}: {e}")
raise e
else:
backoff_time = self.initial_backoff * (self.backoff_factor ** retries)
logger.warning(f"Error transferring file {url}: {e}. Retrying in {backoff_time} seconds...")
time.sleep(backoff_time)
retries += 1


@dataclass
Expand All @@ -195,6 +214,9 @@ class CheckpointFileTransfer(beam.PTransform):
fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
max_retries: The maximum number of retries for failed file transfers.
initial_backoff: The initial backoff time in seconds before retrying a failed transfer.
backoff_factor: The factor by which the backoff time is multiplied after each retry.
"""

transfer_target: Union[str, CacheFSSpecTarget]
Expand All @@ -203,6 +225,9 @@ class CheckpointFileTransfer(beam.PTransform):
max_executors: int = 20
concurrency_per_executor: int = 10
fsspec_sync_patch: bool = False
max_retries: int = 5
initial_backoff: float = 1.0
backoff_factor: float = 2.0

def assign_keys(self, element) -> Tuple[int, Any]:
index, url = element
Expand All @@ -228,6 +253,9 @@ def expand(self, pcoll):
open_kwargs=self.open_kwargs,
max_concurrency=self.concurrency_per_executor,
fsspec_sync_patch=self.fsspec_sync_patch,
max_retries=self.max_retries,
initial_backoff=self.initial_backoff,
backoff_factor=self.backoff_factor
)
)
)
Expand Down

0 comments on commit e7b60e0

Please sign in to comment.