Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-15863: Implement Request Timeout #173

Merged
merged 19 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions tap_facebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

from facebook_business.exceptions import FacebookError, FacebookRequestError, FacebookBadObjectError

from requests.exceptions import ConnectionError, Timeout

API = None

INSIGHTS_MAX_WAIT_TO_START_SECONDS = 5 * 60
Expand All @@ -45,6 +47,8 @@

RESULT_RETURN_LIMIT = 100

REQUEST_TIMEOUT = 300

STREAMS = [
'adcreative',
'ads',
Expand Down Expand Up @@ -145,7 +149,7 @@ def log_retry_attempt(details):
if isinstance(exception, TypeError) and str(exception) == "string indices must be integers":
LOGGER.info('TypeError due to bad JSON response')
def should_retry_api_error(exception):
if isinstance(exception, FacebookBadObjectError) or isinstance(exception, AttributeError):
if isinstance(exception, FacebookBadObjectError) or isinstance(exception, Timeout) or isinstance(exception, ConnectionError) or isinstance(exception, AttributeError):
return True
elif isinstance(exception, FacebookRequestError):
return (exception.api_transient_error()
Expand Down Expand Up @@ -278,6 +282,7 @@ def sync_batches(self, stream_objects):

key_properties = ['id']

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from account.get_ad_creatives() below
@retry_pattern(backoff.expo, (FacebookRequestError, TypeError, AttributeError), max_tries=5, factor=5)
def get_adcreatives(self):
Expand All @@ -295,6 +300,7 @@ class Ads(IncrementalStream):

key_properties = ['id', 'updated_time']

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from account.get_ads() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def _call_get_ads(self, params):
Expand All @@ -321,6 +327,7 @@ def do_request_multiple():
filt_ads = self._call_get_ads(params)
yield filt_ads

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from ad.api_get() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def prepare_record(ad):
Expand All @@ -341,6 +348,7 @@ class AdSets(IncrementalStream):

key_properties = ['id', 'updated_time']

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from account.get_ad_sets() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def _call_get_ad_sets(self, params):
Expand All @@ -367,6 +375,7 @@ def do_request_multiple():
filt_adsets = self._call_get_ad_sets(params)
yield filt_adsets

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from ad_set.api_get() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def prepare_record(ad_set):
Expand All @@ -384,6 +393,7 @@ class Campaigns(IncrementalStream):

key_properties = ['id']

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from account.get_campaigns() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def _call_get_campaigns(self, params):
Expand Down Expand Up @@ -415,6 +425,7 @@ def do_request_multiple():
filt_campaigns = self._call_get_campaigns(params)
yield filt_campaigns

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from request call below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def prepare_record(campaign):
Expand Down Expand Up @@ -488,12 +499,14 @@ def sync_batches(self, stream_objects):
api_batch.execute()
return str(pendulum.parse(latest_lead[self.replication_key]))

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from account.get_ads() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def get_ads(self):
params = {'limit': RESULT_RETURN_LIMIT}
yield from self.account.get_ads(params=params)

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from ad.get_leads() below
@retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5)
def get_leads(self, ads, start_time, previous_start_time):
Expand Down Expand Up @@ -646,6 +659,7 @@ def __api_get_with_retry(job):
job = job.api_get()
return job

@retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2)
# Added retry_pattern to handle AttributeError raised from requests call below
@retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError, AttributeError), max_tries=5, factor=5)
def run_job(self, params):
Expand Down Expand Up @@ -871,8 +885,15 @@ def main_impl():
global RESULT_RETURN_LIMIT
RESULT_RETURN_LIMIT = CONFIG.get('result_return_limit', RESULT_RETURN_LIMIT)

# Set request timeout with config param `request_timeout`.
config_request_timeout = CONFIG.get('request_timeout')
if config_request_timeout and float(config_request_timeout):
request_timeout = float(config_request_timeout)
else:
request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then set default to 300 seconds.

global API
API = FacebookAdsApi.init(access_token=access_token)
API = FacebookAdsApi.init(access_token=access_token, timeout=request_timeout)
user = fb_user.User(fbid='me')

accounts = user.get_ad_accounts()
Expand Down
Loading