Skip to content

Commit

Permalink
✨ Source Facebook Marketing: improve rate limit error message (#37341)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Apr 17, 2024
1 parent 2e00b2a commit db9c993
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 2.1.4
dockerImageTag: 2.1.5
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand Down
455 changes: 295 additions & 160 deletions airbyte-integrations/connectors/source-facebook-marketing/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.1.4"
version = "2.1.5"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def _get_start_date(self) -> Mapping[str, pendulum.Date]:
:return: the first date to sync
"""
today = pendulum.today().date()
today = pendulum.today(tz=pendulum.tz.UTC).date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD

start_dates_for_account = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# The Facebook API error codes indicating rate-limiting are listed at
# https://developers.facebook.com/docs/graph-api/overview/rate-limiting/
FACEBOOK_RATE_LIMIT_ERROR_CODES = (
FACEBOOK_RATE_LIMIT_ERROR_CODES = {
4,
17,
32,
Expand All @@ -29,7 +29,7 @@
80005,
80006,
80008,
)
}
FACEBOOK_TEMPORARY_OAUTH_ERROR_CODE = 2
FACEBOOK_BATCH_ERROR_CODE = 960
FACEBOOK_UNKNOWN_ERROR_CODE = 99
Expand Down Expand Up @@ -85,6 +85,10 @@ def revert_request_record_limit(details):
# set the flag to the api class that the `limit` param is restored
details.get("args")[0].request_record_limit_is_reduced = False

def give_up(details):
if isinstance(details["exception"], FacebookRequestError):
raise traced_exception(details["exception"])

def is_transient_cannot_include_error(exc: FacebookRequestError) -> bool:
"""After migration to API v19.0, some customers randomly face a BAD_REQUEST error (OAuthException) with the pattern:"Cannot include ..."
According to the last comment in https://developers.facebook.com/community/threads/286697364476462/, this might be a transient issue that can be solved with a retry."""
Expand Down Expand Up @@ -119,6 +123,7 @@ def should_retry_api_error(exc):
jitter=None,
on_backoff=[log_retry_attempt, reduce_request_record_limit],
on_success=[revert_request_record_limit],
on_giveup=[give_up],
giveup=lambda exc: not should_retry_api_error(exc),
**wait_gen_kwargs,
)
Expand Down Expand Up @@ -175,6 +180,14 @@ def traced_exception(fb_exception: FacebookRequestError):
"access token with all required permissions."
)

elif fb_exception.api_error_code() in FACEBOOK_RATE_LIMIT_ERROR_CODES:
return AirbyteTracedException(
message="The maximum number of requests on the Facebook API has been reached. See https://developers.facebook.com/docs/graph-api/overview/rate-limiting/ for more information",
internal_message=str(fb_exception),
failure_type=FailureType.transient_error,
exception=fb_exception,
)

else:
failure_type = FailureType.system_error
friendly_msg = f"Error: {fb_exception.api_error_code()}, {fb_exception.api_error_message()}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def cast_to_type(input_date: DateOrDateTime, target_date: DateOrDateTime) -> Dat


def validate_start_date(start_date: DateOrDateTime) -> DateOrDateTime:
today = cast_to_type(start_date, pendulum.today())
today = cast_to_type(start_date, pendulum.today(tz=pendulum.tz.UTC))
retention_date = today.subtract(months=DATA_RETENTION_PERIOD)
if retention_date.day != today.day:
# `.subtract(months=37)` can be erroneous, for instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def test_get_result_when_job_is_failed(self, failed_job):
@freezegun.freeze_time("2023-10-29")
def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field):
"""Test that split will correctly downsize edge_object"""
today = pendulum.today().date()
today = pendulum.today(tz=pendulum.tz.UTC).date()
start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10)
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models import FailureType
from facebook_business import FacebookAdsApi, FacebookSession
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams import Activities, AdAccount, AdCreatives, Campaigns, Videos
Expand Down Expand Up @@ -105,6 +106,31 @@ def test_limit_reached(self, mocker, requests_mock, api, fb_call_rate_response,
except FacebookRequestError:
pytest.fail("Call rate error has not being handled")

def test_given_rate_limit_reached_when_read_then_raise_transient_traced_exception(self, requests_mock, api, fb_call_rate_response, account_id, some_config):
requests_mock.register_uri(
"GET",
FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/campaigns",
[fb_call_rate_response],
)

stream = Campaigns(
api=api,
account_ids=[account_id],
start_date=pendulum.now(),
end_date=pendulum.now(),
)

with pytest.raises(AirbyteTracedException) as exception:
list(
stream.read_records(
sync_mode=SyncMode.full_refresh,
stream_state={},
stream_slice={"account_id": account_id},
)
)

assert exception.value.failure_type == FailureType.transient_error

def test_batch_limit_reached(self, requests_mock, api, fb_call_rate_response, account_id):
"""Error once, check that we retry and not fail"""
responses = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import pathlib
from typing import Any, Mapping

import pytest
Expand All @@ -15,19 +16,25 @@
# BASE ARGS
CMD = "check"
SOURCE: Source = SourceFacebookMarketing()
_EXCLUDE_DELETE_CONFIGS_PATH = "test_migrations/include_deleted_to_status_filters/include_deleted_false"
_INCLUDE_DELETE_CONFIGS_PATH = "test_migrations/include_deleted_to_status_filters/include_deleted_true"
_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH = "test_migrations/account_id_to_array"


# HELPERS
def load_config(config_path: str) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def _config_path(path_from_unit_test_folder: str) -> str:
return str(pathlib.Path(__file__).parent / path_from_unit_test_folder)


class TestMigrateAccountIdToArray:
TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_old_config.json"
NEW_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_new_config.json"
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"
NEW_CONFIG_WITHOUT_ACCOUNT_ID = "unit_tests/test_migrations/account_id_to_array/test_new_config_without_account_id.json"
TEST_CONFIG_PATH = _config_path(f"{_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH}/test_old_config.json")
NEW_TEST_CONFIG_PATH = _config_path(f"{_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH}/test_new_config.json")
UPGRADED_TEST_CONFIG_PATH = _config_path(f"{_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH}/test_upgraded_config.json")
NEW_CONFIG_WITHOUT_ACCOUNT_ID = _config_path(f"{_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH}/test_new_config_without_account_id.json")

@staticmethod
def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
Expand Down Expand Up @@ -97,12 +104,12 @@ def test_should_not_migrate_config_without_account_id(self):


class TestMigrateIncludeDeletedToStatusFilters:
OLD_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_old_config.json"
NEW_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_new_config.json"
OLD_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_old_config.json"
NEW_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_new_config.json"
OLD_TEST1_CONFIG_PATH = _config_path(f"{_EXCLUDE_DELETE_CONFIGS_PATH}/test_old_config.json")
NEW_TEST1_CONFIG_PATH = _config_path(f"{_EXCLUDE_DELETE_CONFIGS_PATH}/test_new_config.json")
OLD_TEST2_CONFIG_PATH = _config_path(f"{_INCLUDE_DELETE_CONFIGS_PATH}/test_old_config.json")
NEW_TEST2_CONFIG_PATH = _config_path(f"{_INCLUDE_DELETE_CONFIGS_PATH}/test_new_config.json")

UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"
UPGRADED_TEST_CONFIG_PATH = _config_path("test_migrations/account_id_to_array/test_upgraded_config.json")

filter_properties = ["ad_statuses", "adset_statuses", "campaign_statuses"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from source_facebook_marketing.utils import DATA_RETENTION_PERIOD, validate_end_date, validate_start_date

TODAY = pendulum.local(2023, 3, 31)
TODAY = pendulum.datetime(2023, 3, 31)


@pytest.mark.parametrize(
Expand All @@ -21,9 +21,9 @@
),
(
"start_date",
pendulum.local(2019, 1, 1),
pendulum.local(2020, 3, 2),
[f"The start date cannot be beyond 37 months from the current date. " f"Set start date to {pendulum.local(2020, 3, 2)}."],
pendulum.datetime(2019, 1, 1),
pendulum.datetime(2020, 3, 2),
[f"The start date cannot be beyond 37 months from the current date. " f"Set start date to {pendulum.datetime(2020, 3, 2)}."],
),
(
"start_date",
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.4 | 2024-04-16 | [37367](https://github.com/airbytehq/airbyte/pull/37367) | Skip config migration when the legacy account_id field does not exist |
| 2.1.5 | 2024-04-17 | [37341](https://github.com/airbytehq/airbyte/pull/37341) | Move rate limit errors to transient errors. |
| 2.1.4 | 2024-04-16 | [37367](https://github.com/airbytehq/airbyte/pull/37367) | Skip config migration when the legacy account_id field does not exist |
| 2.1.3 | 2024-04-16 | [37320](https://github.com/airbytehq/airbyte/pull/37320) | Add retry for transient error |
| 2.1.2 | 2024-03-29 | [36689](https://github.com/airbytehq/airbyte/pull/36689) | Fix key error `account_id` for custom reports. |
| 2.1.1 | 2024-03-18 | [36025](https://github.com/airbytehq/airbyte/pull/36025) | Fix start_date selection behaviour |
Expand Down

0 comments on commit db9c993

Please sign in to comment.