From 075f24ff5184cdc1933afebd4d0ea4a2671f7d85 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Mon, 11 Oct 2021 17:19:45 +0530 Subject: [PATCH 01/12] TDL-6148: Added retry for Attribute error of sync batches --- tap_facebook/__init__.py | 4 + tests/unittests/test_attribute_error_retry.py | 98 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/unittests/test_attribute_error_retry.py diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 126d46d4..927ecccc 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -153,6 +153,8 @@ def should_retry_api_error(exception): return True elif isinstance(exception, TypeError) and str(exception) == "string indices must be integers": return True + elif isinstance(exception, AttributeError): + return True return False return backoff.on_exception( @@ -241,6 +243,7 @@ class AdCreative(Stream): doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup/adcreatives/ ''' + @retry_pattern(backoff.expo, AttributeError, max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) @@ -436,6 +439,7 @@ def compare_lead_created_times(self, leadA, leadB): else: return leadA + @retry_pattern(backoff.expo, AttributeError, max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_attribute_error_retry.py new file mode 100644 index 00000000..84635021 --- /dev/null +++ b/tests/unittests/test_attribute_error_retry.py @@ -0,0 +1,98 @@ +import json +import unittest +from unittest import mock +from unittest.mock import Mock, patch +from tap_facebook import FacebookRequestError +from facebook_business import FacebookAdsApi +from tap_facebook import AdCreative, Leads +from singer import resolve_schema_references +from singer.schema import Schema +from singer.catalog import Catalog, CatalogEntry + +# Mock object for the batch object +class MockBatch: + + def __init__(self, exception): + self.exception = exception + + def execute(self): + if self.exception: + raise AttributeError("'str' object has no attribute 'get'") + +class TestAdCreativeSyncBbatches(unittest.TestCase): + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception=True) + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives and verify AttributeError is raised + with self.assertRaises(AttributeError): + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_no_error_on_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to simply pass the things and expect the tap to run without exception + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception=False) # No exception + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called once as no exception is thrown + self.assertEquals(1, mocked_api.new_batch.call_count) + self.assertEquals(1, mocked_schema.call_count) + + +class TestLeadsSyncBatches(unittest.TestCase): + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api): + """ + Leads.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of Leads. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception=True) + + # Initialize Leads and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + leads_object = Leads('', '', '', '', '') + leads_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of Leads and verify AttributeError is raised + with self.assertRaises(AttributeError): + leads_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) From 07024274061c5937015d14274ee26c158833cc42 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 12 Oct 2021 12:58:00 +0530 Subject: [PATCH 02/12] TDL-6148: Removed unused imports from unit tests --- tests/unittests/test_attribute_error_retry.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_attribute_error_retry.py index 84635021..35e20e19 100644 --- a/tests/unittests/test_attribute_error_retry.py +++ b/tests/unittests/test_attribute_error_retry.py @@ -1,13 +1,10 @@ -import json import unittest from unittest import mock -from unittest.mock import Mock, patch -from tap_facebook import FacebookRequestError -from facebook_business import FacebookAdsApi +from unittest.mock import Mock from tap_facebook import AdCreative, Leads from singer import resolve_schema_references from singer.schema import Schema -from singer.catalog import Catalog, CatalogEntry +from singer.catalog import CatalogEntry # Mock object for the batch object class MockBatch: From 63e361b3c71b06828fbd21878bd683f334aae6da Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 12 Oct 2021 16:53:04 +0530 Subject: [PATCH 03/12] TDL-13267: Added retry for 500 error of AdCreatives --- tap_facebook/__init__.py | 4 +- ...or_retry.py => test_sync_batches_retry.py} | 71 +++++++++++++++++-- 2 files changed, 67 insertions(+), 8 deletions(-) rename tests/unittests/{test_attribute_error_retry.py => test_sync_batches_retry.py} (55%) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 927ecccc..319e8d58 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -243,7 +243,7 @@ class AdCreative(Stream): doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup/adcreatives/ ''' - @retry_pattern(backoff.expo, AttributeError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) @@ -439,7 +439,7 @@ def compare_lead_created_times(self, leadA, leadB): else: return leadA - @retry_pattern(backoff.expo, AttributeError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_sync_batches_retry.py similarity index 55% rename from tests/unittests/test_attribute_error_retry.py rename to tests/unittests/test_sync_batches_retry.py index 35e20e19..28bbbb3a 100644 --- a/tests/unittests/test_attribute_error_retry.py +++ b/tests/unittests/test_sync_batches_retry.py @@ -1,20 +1,29 @@ import unittest from unittest import mock from unittest.mock import Mock +from tap_facebook import FacebookRequestError from tap_facebook import AdCreative, Leads from singer import resolve_schema_references from singer.schema import Schema from singer.catalog import CatalogEntry -# Mock object for the batch object +# Mock object for the batch object to raise exception class MockBatch: - def __init__(self, exception): + def __init__(self, exception="NoException"): self.exception = exception def execute(self): - if self.exception: + if self.exception == "AttributeError": raise AttributeError("'str' object has no attribute 'get'") + elif self.exception == "FacebookRequestError": + raise FacebookRequestError( + message='', + request_context={"":Mock()}, + http_status=500, + http_headers=Mock(), + body={} + ) class TestAdCreativeSyncBbatches(unittest.TestCase): @@ -28,7 +37,7 @@ def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api """ # Mock new_batch() function of API mocked_api.new_batch = Mock() - mocked_api.new_batch.return_value = MockBatch(exception=True) + mocked_api.new_batch.return_value = MockBatch(exception="AttributeError") # Raise AttributeError exception # Initialize AdCreative and mock catalog_entry mock_catalog_entry = CatalogEntry(schema=Schema()) @@ -43,6 +52,31 @@ def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api self.assertEquals(5, mocked_api.new_batch.call_count) self.assertEquals(5, mocked_schema.call_count) + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_facebook_request_error_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to raise a `FacebookRequestError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="FacebookRequestError") # Raise FacebookRequestError exception + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives and verify FacebookRequestError is raised + with self.assertRaises(FacebookRequestError): + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) + @mock.patch("tap_facebook.API") @mock.patch("singer.resolve_schema_references") def test_no_error_on_sync_batches(self, mocked_schema, mocked_api): @@ -52,7 +86,7 @@ def test_no_error_on_sync_batches(self, mocked_schema, mocked_api): """ # Mock new_batch() function of API mocked_api.new_batch = Mock() - mocked_api.new_batch.return_value = MockBatch(exception=False) # No exception + mocked_api.new_batch.return_value = MockBatch() # No exception # Initialize AdCreative and mock catalog_entry mock_catalog_entry = CatalogEntry(schema=Schema()) @@ -79,7 +113,7 @@ def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api """ # Mock new_batch() function of API mocked_api.new_batch = Mock() - mocked_api.new_batch.return_value = MockBatch(exception=True) + mocked_api.new_batch.return_value = MockBatch(exception="AttributeError") # Raise AttributeError exception # Initialize Leads and mock catalog_entry mock_catalog_entry = CatalogEntry(schema=Schema()) @@ -93,3 +127,28 @@ def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api # verify calls inside sync_batches are called 5 times as max 5 reties provided for function self.assertEquals(5, mocked_api.new_batch.call_count) self.assertEquals(5, mocked_schema.call_count) + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_facebook_request_error_sync_batches(self, mocked_schema, mocked_api): + """ + Leads.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of Leads. + We mock this method to raise a `FacebookRequestError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="FacebookRequestError") # Raise FacebookRequestError exception + + # Initialize Leads and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + leads_object = Leads('', '', '', '', '') + leads_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of Leads and verify FacebookRequestError is raised + with self.assertRaises(FacebookRequestError): + leads_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) From 92c5e35a3495f2d6edc326b769635f5583fd8d65 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 20 Oct 2021 13:26:42 +0530 Subject: [PATCH 04/12] TDL-6148: Add AttributeError backoff for all sync functions --- tap_facebook/__init__.py | 20 +- tests/unittests/test_attribute_error_retry.py | 193 ++++++++++++++++++ 2 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 tests/unittests/test_attribute_error_retry.py diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 319e8d58..62d26dd4 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -272,7 +272,7 @@ def sync_batches(self, stream_objects): key_properties = ['id'] - @retry_pattern(backoff.expo, (FacebookRequestError, TypeError), max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, TypeError, AttributeError), max_tries=5, factor=5) def get_adcreatives(self): return self.account.get_ad_creatives(params={'limit': RESULT_RETURN_LIMIT}) @@ -288,7 +288,7 @@ class Ads(IncrementalStream): key_properties = ['id', 'updated_time'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ads(self, params): """ This is necessary because the functions that call this endpoint return @@ -313,7 +313,7 @@ def do_request_multiple(): filt_ads = self._call_get_ads(params) yield filt_ads - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad): return ad.api_get(fields=self.fields()).export_all_data() @@ -332,7 +332,7 @@ class AdSets(IncrementalStream): key_properties = ['id', 'updated_time'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ad_sets(self, params): """ This is necessary because the functions that call this endpoint return @@ -357,7 +357,7 @@ def do_request_multiple(): filt_adsets = self._call_get_ad_sets(params) yield filt_adsets - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad_set): return ad_set.api_get(fields=self.fields()).export_all_data() @@ -373,7 +373,7 @@ class Campaigns(IncrementalStream): key_properties = ['id'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_campaigns(self, params): """ This is necessary because the functions that call this endpoint return @@ -403,7 +403,7 @@ def do_request_multiple(): filt_campaigns = self._call_get_campaigns(params) yield filt_campaigns - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(campaign): """If campaign.ads is selected, make the request and insert the data here""" campaign_out = campaign.api_get(fields=fields).export_all_data() @@ -473,12 +473,12 @@ def sync_batches(self, stream_objects): api_batch.execute() return str(pendulum.parse(latest_lead[self.replication_key])) - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_ads(self): params = {'limit': RESULT_RETURN_LIMIT} yield from self.account.get_ads(params=params) - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_leads(self, ads, start_time, previous_start_time): start_time = int(start_time.timestamp()) # Get unix timestamp params = {'limit': RESULT_RETURN_LIMIT, @@ -619,7 +619,7 @@ def job_params(self): } buffered_start_date = buffered_start_date.add(days=1) - @retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError), max_tries=5, factor=5) + @retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError, AttributeError), max_tries=5, factor=5) def run_job(self, params): LOGGER.info('Starting adsinsights job with params %s', params) job = self.account.get_insights( # pylint: disable=no-member diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_attribute_error_retry.py new file mode 100644 index 00000000..33a38972 --- /dev/null +++ b/tests/unittests/test_attribute_error_retry.py @@ -0,0 +1,193 @@ +import unittest +from unittest.mock import Mock +from unittest import mock +from tap_facebook import AdCreative, Ads, AdSets, Campaigns, AdsInsights, Leads + +@mock.patch("time.sleep") +class TestAttributErrorBackoff(unittest.TestCase): + """A set of unit tests to ensure that requests are retrying properly for AttributeError Error""" + def test_get_adcreatives(self, mocked_sleep): + """ + AdCreative.get_adcreatives calls a `facebook_business` method,`get_ad_creatives()`, to get a batch of ad creatives. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_creatives function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ad_creatives = Mock() + mocked_account.get_ad_creatives.side_effect = AttributeError + + # Call get_adcreatives() function of AdCreatives and verify AttributeError is raised + ad_creative_object = AdCreative('', mocked_account, '', '') + with self.assertRaises(AttributeError): + ad_creative_object.get_adcreatives() + + # verify get_ad_creatives() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_creatives.call_count, 5) + + def test__call_get_ads(self, mocked_sleep): + """ + Ads._call_get_ads calls a `facebook_business` method,`get_ads()`, to get a batch of ads. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ads function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = AttributeError + + # Call _call_get_ads() function of Ads and verify AttributeError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + ad_object._call_get_ads('test') + + # verify get_ads() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ads.call_count, 5) + + @mock.patch("pendulum.parse") + def test_ad_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Ads calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock ad object + mocked_ad = Mock() + mocked_ad.api_get = Mock() + mocked_ad.__getitem__ = Mock() + mocked_ad.api_get.side_effect = AttributeError + + # # Mock get_ads function return mocked ad object + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = [[mocked_ad]] + + # Iterate ads object which calls prepare_record() inside and verify AttributeError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in ad_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_ad.api_get.call_count, 5) + + def test__call_get_ad_sets(self, mocked_sleep): + """ + AdSets._call_get_ad_sets calls a `facebook_business` method,`get_ad_sets()`, to get a batch of adsets. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_sets function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = AttributeError + + # Call _call_get_ad_sets() function of AdSets and verify AttributeError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + ad_set_object._call_get_ad_sets('test') + + # verify get_ad_sets() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_sets.call_count, 5) + + @mock.patch("pendulum.parse") + def test_adset_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of AdSets calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock adset object + mocked_adset = Mock() + mocked_adset.api_get = Mock() + mocked_adset.__getitem__ = Mock() + mocked_adset.api_get.side_effect = AttributeError + + # Mock get_ad_sets function return mocked ad object + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = [[mocked_adset]] + + # Iterate adset object which calls prepare_record() inside and verify AttributeError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in ad_set_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_adset.api_get.call_count, 5) + + def test__call_get_campaigns(self, mocked_sleep): + """ + Campaigns._call_get_campaigns calls a `facebook_business` method,`get_campaigns()`, to get a batch of campaigns. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_campaigns function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = AttributeError + + # Call _call_get_campaigns() function of Campaigns and verify AttributeError is raised + campaigns_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + campaigns_object._call_get_campaigns('test') + + # verify get_campaigns() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_campaigns.call_count, 5) + + @mock.patch("pendulum.parse") + def test_campaign_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Campaigns calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # # Mock campaign object + mocked_campaign = Mock() + mocked_campaign.api_get = Mock() + mocked_campaign.__getitem__ = Mock() + mocked_campaign.api_get.side_effect = AttributeError + + # # Mock get_campaigns function return mocked ad object + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = [[mocked_campaign]] + + # Iterate campaigns object which calls prepare_record() inside and verify AttributeError is raised + campaign_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in campaign_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_campaign.api_get.call_count, 5) + + def test_run_job(self, mocked_sleep): + """ + AdsInsights.run_job calls a `facebook_business` method,`get_insights()`, to get a batch of insights. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_insights function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.side_effect = AttributeError + + # Call run_job() function of Campaigns and verify AttributeError is raised + ads_insights_object = AdsInsights('', mocked_account, '', '', '', {}) + with self.assertRaises(AttributeError): + ads_insights_object.run_job('test') + + # verify get_insights() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_insights.call_count, 5) From acf9a61e2d38dd1f6d1c614d881240dfc1dd60c0 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 21 Oct 2021 19:03:31 +0530 Subject: [PATCH 05/12] added code coverage --- .circleci/config.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 77a8115a..523b94bc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -53,7 +53,13 @@ jobs: name: 'Run Unit Tests' command: | source /usr/local/share/virtualenvs/tap-facebook/bin/activate - nosetests tests/unittests + pip install nose coverage + nosetests --with-coverage --cover-erase --cover-package=tap_facebook --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - slack/notify-on-failure: only_for_branches: master run_integration_tests: From 49ce423c1f1ce99037a082ea9d5b1c797e223318 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Fri, 12 Nov 2021 17:15:50 +0530 Subject: [PATCH 06/12] Resolved review comment --- tap_facebook/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 62d26dd4..e895d9e0 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -145,7 +145,7 @@ def log_retry_attempt(details): if isinstance(exception, TypeError) and str(exception) == "string indices must be integers": LOGGER.info('TypeError due to bad JSON response') def should_retry_api_error(exception): - if isinstance(exception, FacebookBadObjectError): + if isinstance(exception, FacebookBadObjectError) or isinstance(exception, AttributeError): return True elif isinstance(exception, FacebookRequestError): return exception.api_transient_error() or exception.api_error_subcode() == 99 or exception.http_status() == 500 @@ -153,8 +153,6 @@ def should_retry_api_error(exception): return True elif isinstance(exception, TypeError) and str(exception) == "string indices must be integers": return True - elif isinstance(exception, AttributeError): - return True return False return backoff.on_exception( From c54cbabf7e6e28d5451b2c32397c7251af4243f5 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 17 Nov 2021 16:53:45 +0530 Subject: [PATCH 07/12] Resolved review comments --- tests/unittests/test_sync_batches_retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/test_sync_batches_retry.py b/tests/unittests/test_sync_batches_retry.py index 28bbbb3a..53cccbfa 100644 --- a/tests/unittests/test_sync_batches_retry.py +++ b/tests/unittests/test_sync_batches_retry.py @@ -48,7 +48,7 @@ def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api with self.assertRaises(AttributeError): ad_creative_object.sync_batches([]) - # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + # verify calls inside sync_batches are called 5 times as max 5 retries provided for function self.assertEquals(5, mocked_api.new_batch.call_count) self.assertEquals(5, mocked_schema.call_count) From 5b6e9fef809fb42fad36a7ccca1f89a4dc5619da Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Thu, 18 Nov 2021 13:47:07 +0530 Subject: [PATCH 08/12] Added code comments --- tap_facebook/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index e895d9e0..09f5d703 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -241,6 +241,7 @@ class AdCreative(Stream): doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup/adcreatives/ ''' + # Added retry_pattern to handle AttributeError raised from api_batch.execute() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() @@ -270,6 +271,7 @@ def sync_batches(self, stream_objects): key_properties = ['id'] + # Added retry_pattern to handle AttributeError raised from account.get_ad_creatives() below @retry_pattern(backoff.expo, (FacebookRequestError, TypeError, AttributeError), max_tries=5, factor=5) def get_adcreatives(self): return self.account.get_ad_creatives(params={'limit': RESULT_RETURN_LIMIT}) @@ -286,6 +288,7 @@ class Ads(IncrementalStream): key_properties = ['id', 'updated_time'] + # Added retry_pattern to handle AttributeError raised from account.get_ads() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ads(self, params): """ @@ -311,6 +314,7 @@ def do_request_multiple(): filt_ads = self._call_get_ads(params) yield filt_ads + # Added retry_pattern to handle AttributeError raised from ad.api_get() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad): return ad.api_get(fields=self.fields()).export_all_data() @@ -330,6 +334,7 @@ class AdSets(IncrementalStream): key_properties = ['id', 'updated_time'] + # Added retry_pattern to handle AttributeError raised from account.get_ad_sets() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ad_sets(self, params): """ @@ -355,6 +360,7 @@ def do_request_multiple(): filt_adsets = self._call_get_ad_sets(params) yield filt_adsets + # Added retry_pattern to handle AttributeError raised from ad_set.api_get() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad_set): return ad_set.api_get(fields=self.fields()).export_all_data() @@ -371,6 +377,7 @@ class Campaigns(IncrementalStream): key_properties = ['id'] + # Added retry_pattern to handle AttributeError raised from account.get_campaigns() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_campaigns(self, params): """ @@ -401,6 +408,7 @@ def do_request_multiple(): filt_campaigns = self._call_get_campaigns(params) yield filt_campaigns + # Added retry_pattern to handle AttributeError raised from request call below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(campaign): """If campaign.ads is selected, make the request and insert the data here""" @@ -437,6 +445,7 @@ def compare_lead_created_times(self, leadA, leadB): else: return leadA + # Added retry_pattern to handle AttributeError raised from api_batch.execute() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() @@ -471,11 +480,13 @@ def sync_batches(self, stream_objects): api_batch.execute() return str(pendulum.parse(latest_lead[self.replication_key])) + # Added retry_pattern to handle AttributeError raised from account.get_ads() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_ads(self): params = {'limit': RESULT_RETURN_LIMIT} yield from self.account.get_ads(params=params) + # Added retry_pattern to handle AttributeError raised from ad.get_leads() below @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_leads(self, ads, start_time, previous_start_time): start_time = int(start_time.timestamp()) # Get unix timestamp @@ -617,6 +628,7 @@ def job_params(self): } buffered_start_date = buffered_start_date.add(days=1) + # Added retry_pattern to handle AttributeError raised from requests call below @retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError, AttributeError), max_tries=5, factor=5) def run_job(self, params): LOGGER.info('Starting adsinsights job with params %s', params) From 74b5826e1c53823597b5d6e0983d147ab7e6fa10 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Thu, 18 Nov 2021 19:18:26 +0530 Subject: [PATCH 09/12] Resolved review comment --- tests/unittests/test_attribute_error_retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_attribute_error_retry.py index 33a38972..63949a2b 100644 --- a/tests/unittests/test_attribute_error_retry.py +++ b/tests/unittests/test_attribute_error_retry.py @@ -26,7 +26,7 @@ def test_get_adcreatives(self, mocked_sleep): # verify get_ad_creatives() is called 5 times as max 5 reties provided for function self.assertEquals(mocked_account.get_ad_creatives.call_count, 5) - def test__call_get_ads(self, mocked_sleep): + def test_call_get_ads(self, mocked_sleep): """ Ads._call_get_ads calls a `facebook_business` method,`get_ads()`, to get a batch of ads. We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, From 5402c465b733e03e743482242d671fe4305ba55e Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Fri, 19 Nov 2021 13:24:07 +0530 Subject: [PATCH 10/12] TDL-9728: Stream `ads_insights_age_gender` has unexpected datatype for replication key field `date_start` (#172) * added format as date-time in schema file * added code coverage * added check for date format in the bookmark test * added the check for first sync messages Co-authored-by: namrata270998 --- .circleci/config.yml | 8 ++++- .../schemas/ads_insights_age_and_gender.json | 6 ++-- .../ads_insights_hourly_advertiser.json | 6 ++-- tests/test_facebook_bookmarks.py | 29 +++++++++++++++---- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 77a8115a..523b94bc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -53,7 +53,13 @@ jobs: name: 'Run Unit Tests' command: | source /usr/local/share/virtualenvs/tap-facebook/bin/activate - nosetests tests/unittests + pip install nose coverage + nosetests --with-coverage --cover-erase --cover-package=tap_facebook --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - slack/notify-on-failure: only_for_branches: master run_integration_tests: diff --git a/tap_facebook/schemas/ads_insights_age_and_gender.json b/tap_facebook/schemas/ads_insights_age_and_gender.json index f17f3dca..e1d91df6 100644 --- a/tap_facebook/schemas/ads_insights_age_and_gender.json +++ b/tap_facebook/schemas/ads_insights_age_and_gender.json @@ -25,7 +25,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "ad_id": { "type": [ @@ -283,7 +284,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "objective": { "type": [ diff --git a/tap_facebook/schemas/ads_insights_hourly_advertiser.json b/tap_facebook/schemas/ads_insights_hourly_advertiser.json index 70f8c8e1..27a3e9f5 100644 --- a/tap_facebook/schemas/ads_insights_hourly_advertiser.json +++ b/tap_facebook/schemas/ads_insights_hourly_advertiser.json @@ -165,13 +165,15 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "date_stop": { "type": [ "null", "string" - ] + ], + "format": "date-time" }, "engagement_rate_ranking": { "type": [ diff --git a/tests/test_facebook_bookmarks.py b/tests/test_facebook_bookmarks.py index ada0da17..6efe77ee 100644 --- a/tests/test_facebook_bookmarks.py +++ b/tests/test_facebook_bookmarks.py @@ -84,6 +84,16 @@ def calculated_states_by_stream(self, current_state): return stream_to_calculated_state + # function for verifying the date format + def is_expected_date_format(self, date): + try: + # parse date + datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + # return False if date is in not expected format + return False + # return True in case of no error + return True def test_run(self): expected_streams = self.expected_streams() @@ -193,14 +203,16 @@ def bookmarks_test(self, expected_streams): for record in second_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the second sync records respect the previous (simulated) bookmark value replication_key_value = record.get(replication_key) - if stream in {'ads_insights_age_and_gender', 'ads_insights_hourly_advertiser'}: # BUG | https://stitchdata.atlassian.net/browse/SRCE-4873 - replication_key_value = datetime.datetime.strftime( - dateutil.parser.parse(replication_key_value), - self.BOOKMARK_COMPARISON_FORMAT - ) self.assertGreaterEqual(replication_key_value, simulated_bookmark_minus_lookback, msg="Second sync records do not repect the previous bookmark.") @@ -211,6 +223,13 @@ def bookmarks_test(self, expected_streams): ) for record in first_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the first sync bookmark value is the max replication key value for a given stream replication_key_value = record.get(replication_key) From fc9abc8e4805445d43c9464eab396b42b553ab33 Mon Sep 17 00:00:00 2001 From: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com> Date: Fri, 19 Nov 2021 13:41:28 +0530 Subject: [PATCH 11/12] TDL-9809: `forced-replication-method` missing from metadata for some streams and TDL-9872: replication keys are not specified as expected in discoverable metadata (#167) * added valid replication keys in catalog * modified the code * TDL-9809: Added replication keys in metadata * adde code coverage * Resolved review comments Co-authored-by: harshpatel4_crest Co-authored-by: namrata270998 --- tap_facebook/__init__.py | 11 +++++++-- tests/test_facebook_discovery.py | 42 +++++++++++--------------------- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 126d46d4..63512dac 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -170,6 +170,7 @@ class Stream(object): account = attr.ib() stream_alias = attr.ib() catalog_entry = attr.ib() + replication_method = 'FULL_TABLE' def automatic_fields(self): fields = set() @@ -200,6 +201,7 @@ def fields(self): class IncrementalStream(Stream): state = attr.ib() + replication_method = 'INCREMENTAL' def __attrs_post_init__(self): self.current_bookmark = get_start(self, UPDATED_TIME_KEY) @@ -425,6 +427,7 @@ class Leads(Stream): replication_key = "created_time" key_properties = ['id'] + replication_method = 'INCREMENTAL' def compare_lead_created_times(self, leadA, leadB): if leadA is None: @@ -554,6 +557,7 @@ def advance_bookmark(stream, bookmark_key, date): @attr.s class AdsInsights(Stream): base_properties = ['campaign_id', 'adset_id', 'ad_id', 'date_start'] + replication_method = 'INCREMENTAL' state = attr.ib() options = attr.ib() @@ -794,10 +798,13 @@ def discover_schemas(): LOGGER.info('Loading schema for %s', stream.name) schema = singer.resolve_schema_references(load_schema(stream), refs) + bookmark_key = BOOKMARK_KEYS.get(stream.name) + mdata = metadata.to_map(metadata.get_standard_metadata(schema, - key_properties=stream.key_properties)) + key_properties=stream.key_properties, + replication_method=stream.replication_method, + valid_replication_keys=[bookmark_key] if bookmark_key else None)) - bookmark_key = BOOKMARK_KEYS.get(stream.name) if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY : mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic') diff --git a/tests/test_facebook_discovery.py b/tests/test_facebook_discovery.py index 46e04fc0..a97947e9 100644 --- a/tests/test_facebook_discovery.py +++ b/tests/test_facebook_discovery.py @@ -84,40 +84,26 @@ def test_run(self): msg="There is NOT only one top level breadcrumb for {}".format(stream) + \ "\nstream_properties | {}".format(stream_properties)) - # BUG_1 | https://stitchdata.atlassian.net/browse/SRCE-4855 - failing_with_no_replication_keys = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_keys: # BUG_1 - # verify replication key(s) match expectations - self.assertSetEqual( - expected_replication_keys, actual_replication_keys - ) + # verify replication key(s) match expectations + self.assertSetEqual( + expected_replication_keys, actual_replication_keys + ) # verify primary key(s) match expectations self.assertSetEqual( expected_primary_keys, actual_primary_keys, ) - # BUG_2 | https://stitchdata.atlassian.net/browse/SRCE-4856 - failing_with_no_replication_method = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_method: # BUG_2 - # verify the replication method matches our expectations - self.assertEqual( - expected_replication_method, actual_replication_method - ) - - # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - if actual_replication_keys: - self.assertEqual(self.INCREMENTAL, actual_replication_method) - else: - self.assertEqual(self.FULL_TABLE, actual_replication_method) + # verify the replication method matches our expectations + self.assertEqual( + expected_replication_method, actual_replication_method + ) + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + if actual_replication_keys: + self.assertEqual(self.INCREMENTAL, actual_replication_method) + else: + self.assertEqual(self.FULL_TABLE, actual_replication_method) # verify that primary keys and replication keys # are given the inclusion of automatic in metadata. From 12bb05d04ef076241fc9a894d6c6fe0c166c4bc5 Mon Sep 17 00:00:00 2001 From: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com> Date: Fri, 19 Nov 2021 14:00:11 +0530 Subject: [PATCH 12/12] TDL-7455: Add tap-tester test to verify replication of deleted records (#168) * TDL-7455: Added archived data integration test * TDL-7455: Updated integration test * added code coverage * Resolved review comment Co-authored-by: namrata270998 --- .circleci/config.yml | 14 ++++ tests/test_facebook_archived_data.py | 112 +++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 tests/test_facebook_archived_data.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 523b94bc..83f84179 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -160,6 +160,12 @@ workflows: test_name: "test_facebook_tests_run.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_archived_data.py" + test_name: "test_facebook_archived_data.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -171,6 +177,7 @@ workflows: - test_facebook_bookmarks.py - test_facebook_automatic_fields.py - test_facebook_tests_run.py + - test_facebook_archived_data.py - deploy: context: circleci-user requires: @@ -226,6 +233,12 @@ workflows: test_name: "test_facebook_tests_run.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_archived_data.py" + test_name: "test_facebook_archived_data.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -237,6 +250,7 @@ workflows: - test_facebook_bookmarks.py - test_facebook_automatic_fields.py - test_facebook_tests_run.py + - test_facebook_archived_data.py triggers: - schedule: cron: "0 6 * * *" diff --git a/tests/test_facebook_archived_data.py b/tests/test_facebook_archived_data.py new file mode 100644 index 00000000..c86113b6 --- /dev/null +++ b/tests/test_facebook_archived_data.py @@ -0,0 +1,112 @@ +import os + +from tap_tester import connections, runner, menagerie + +from base import FacebookBaseTest + +class FacebookArchivedData(FacebookBaseTest): + + @staticmethod + def name(): + return "tap_tester_facebook_archived_data" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2021-10-06T00:00:00Z', + 'end_date' : '2021-10-07T00:00:00Z', + 'insights_buffer_days': '1', + 'include_deleted': 'false' + } + if original: + return return_value + + return_value["include_deleted"] = 'true' + return return_value + + def test_run(self): + ''' + Testing the archived data with 'include_deleted' parameter + ''' + # include_deleted is supported for below streams only + expected_streams = ['ads', 'adsets', 'campaigns'] + + ########################################################################## + ### First Sync with include_deleted = false + ########################################################################## + + # instantiate connection with the include_deleted = false + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + ### Second Sync with include_deleted = true + ########################################################################## + + # create a new connection with the include_deleted = true + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected primary keys + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information about count of records + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + # collect list and set of primary keys for all the records + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + # collect list of effective_status for all the records + records_status_sync1 = [message.get('data').get('effective_status') + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + records_status_sync2 = [message.get('data').get('effective_status') + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # Verifying that no ARCHIVED records are returned for sync 1 + self.assertNotIn('ARCHIVED', records_status_sync1) + + # Verifying that ARCHIVED records are returned for sync 2 + self.assertIn('ARCHIVED', records_status_sync2) + + # Verify the number of records replicated in sync 2 is greater than the number + # of records replicated in sync 1 + self.assertGreater(record_count_sync_2, record_count_sync_1) + + # Verify the records replicated in sync 1 were also replicated in sync 2 + self.assertTrue(primary_keys_sync_1.issubset(primary_keys_sync_2))