Skip to content

Commit

Permalink
Crest Work (#176)
Browse files Browse the repository at this point in the history
* TDL-9728: Stream `ads_insights_age_gender` has unexpected datatype for replication key field `date_start` (#172)

* added format as date-time in schema file

* added code coverage

* added check for date format in the bookmark test

* added the check for first sync messages

Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>

* TDL-9809: `forced-replication-method` missing from metadata for some streams and TDL-9872: replication keys are not specified as expected in discoverable metadata	 (#167)

* added valid replication keys in catalog

* modified the code

* TDL-9809: Added replication keys in metadata

* adde code coverage

* Resolved review comments

Co-authored-by: harshpatel4_crest <harsh.patel4@crestdatasys.com>
Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>

* TDL-7455: Add tap-tester test to verify replication of deleted records	 (#168)

* TDL-7455: Added archived data integration test

* TDL-7455: Updated integration test

* added code coverage

* Resolved review comment

Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>

* TDL-7596: Add tap-tester test for attribution window (#169)

* added tap tester test for attribution window

* updated the code

* added code coverage

* updated the code according to the comments

* updated code to raise error when attribution window is not 1, 7, 28

* test: run invalid attribution window intergation test

* updated test case

* test: updated test case code

* test: test invalid attribution window

* test: test invalid attribution window

* test: test invalid attribution window

* test: test invalid attribution window

* test: run invalid attribution window test case

* added intergation test for invalid sttribution window

Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>

Co-authored-by: namrata270998 <namrata.brahmbhatt@crestdatasys.com>
Co-authored-by: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 19, 2021
1 parent 10bad1c commit 1af73ac
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 45 deletions.
50 changes: 49 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ jobs:
name: 'Run Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-facebook/bin/activate
nosetests tests/unittests
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_facebook --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- slack/notify-on-failure:
only_for_branches: master
run_integration_tests:
Expand Down Expand Up @@ -154,6 +160,24 @@ workflows:
test_name: "test_facebook_tests_run.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_archived_data.py"
test_name: "test_facebook_archived_data.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_attribution_window.py"
test_name: "test_facebook_attribution_window.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_invalid_attribution_window.py"
test_name: "test_facebook_invalid_attribution_window.py"
requires:
- ensure_env
- build:
context: circleci-user
requires:
Expand All @@ -165,6 +189,9 @@ workflows:
- test_facebook_bookmarks.py
- test_facebook_automatic_fields.py
- test_facebook_tests_run.py
- test_facebook_archived_data.py
- test_facebook_attribution_window.py
- test_facebook_invalid_attribution_window.py
- deploy:
context: circleci-user
requires:
Expand Down Expand Up @@ -220,6 +247,24 @@ workflows:
test_name: "test_facebook_tests_run.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_archived_data.py"
test_name: "test_facebook_archived_data.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_attribution_window.py"
test_name: "test_facebook_attribution_window.py"
requires:
- ensure_env
- run_integration_tests:
context: circleci-user
name: "test_facebook_invalid_attribution_window.py"
test_name: "test_facebook_invalid_attribution_window.py"
requires:
- ensure_env
- build:
context: circleci-user
requires:
Expand All @@ -231,6 +276,9 @@ workflows:
- test_facebook_bookmarks.py
- test_facebook_automatic_fields.py
- test_facebook_tests_run.py
- test_facebook_archived_data.py
- test_facebook_attribution_window.py
- test_facebook_invalid_attribution_window.py
triggers:
- schedule:
cron: "0 6 * * *"
Expand Down
24 changes: 17 additions & 7 deletions tap_facebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Stream(object):
account = attr.ib()
stream_alias = attr.ib()
catalog_entry = attr.ib()
replication_method = 'FULL_TABLE'

def automatic_fields(self):
fields = set()
Expand Down Expand Up @@ -205,6 +206,7 @@ def fields(self):
class IncrementalStream(Stream):

state = attr.ib()
replication_method = 'INCREMENTAL'

def __attrs_post_init__(self):
self.current_bookmark = get_start(self, UPDATED_TIME_KEY)
Expand Down Expand Up @@ -430,6 +432,7 @@ class Leads(Stream):
replication_key = "created_time"

key_properties = ['id']
replication_method = 'INCREMENTAL'

def compare_lead_created_times(self, leadA, leadB):
if leadA is None:
Expand Down Expand Up @@ -559,6 +562,7 @@ def advance_bookmark(stream, bookmark_key, date):
@attr.s
class AdsInsights(Stream):
base_properties = ['campaign_id', 'adset_id', 'ad_id', 'date_start']
replication_method = 'INCREMENTAL'

state = attr.ib()
options = attr.ib()
Expand All @@ -584,14 +588,17 @@ def __attrs_post_init__(self):
if self.options.get('primary-keys'):
self.key_properties.extend(self.options['primary-keys'])

self.buffer_days = 28
if CONFIG.get('insights_buffer_days'):
self.buffer_days = int(CONFIG.get('insights_buffer_days'))
# attribution window should only be 1, 7 or 28
if self.buffer_days not in [1, 7, 28]:
raise Exception("The attribution window must be 1, 7 or 28.")

def job_params(self):
start_date = get_start(self, self.bookmark_key)

buffer_days = 28
if CONFIG.get('insights_buffer_days'):
buffer_days = int(CONFIG.get('insights_buffer_days'))

buffered_start_date = start_date.subtract(days=buffer_days)
buffered_start_date = start_date.subtract(days=self.buffer_days)
min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD)
if buffered_start_date < min_start_date:
LOGGER.warning("%s: Start date is earlier than %s months ago, using %s instead. "
Expand Down Expand Up @@ -805,10 +812,13 @@ def discover_schemas():
LOGGER.info('Loading schema for %s', stream.name)
schema = singer.resolve_schema_references(load_schema(stream), refs)

bookmark_key = BOOKMARK_KEYS.get(stream.name)

mdata = metadata.to_map(metadata.get_standard_metadata(schema,
key_properties=stream.key_properties))
key_properties=stream.key_properties,
replication_method=stream.replication_method,
valid_replication_keys=[bookmark_key] if bookmark_key else None))

bookmark_key = BOOKMARK_KEYS.get(stream.name)
if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY :
mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic')

Expand Down
6 changes: 4 additions & 2 deletions tap_facebook/schemas/ads_insights_age_and_gender.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"type": [
"null",
"string"
]
],
"format": "date-time"
},
"ad_id": {
"type": [
Expand Down Expand Up @@ -283,7 +284,8 @@
"type": [
"null",
"string"
]
],
"format": "date-time"
},
"objective": {
"type": [
Expand Down
6 changes: 4 additions & 2 deletions tap_facebook/schemas/ads_insights_hourly_advertiser.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@
"type": [
"null",
"string"
]
],
"format": "date-time"
},
"date_stop": {
"type": [
"null",
"string"
]
],
"format": "date-time"
},
"engagement_rate_ranking": {
"type": [
Expand Down
112 changes: 112 additions & 0 deletions tests/test_facebook_archived_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import os

from tap_tester import connections, runner, menagerie

from base import FacebookBaseTest

class FacebookArchivedData(FacebookBaseTest):

@staticmethod
def name():
return "tap_tester_facebook_archived_data"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return_value = {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date' : '2021-10-06T00:00:00Z',
'end_date' : '2021-10-07T00:00:00Z',
'insights_buffer_days': '1',
'include_deleted': 'false'
}
if original:
return return_value

return_value["include_deleted"] = 'true'
return return_value

def test_run(self):
'''
Testing the archived data with 'include_deleted' parameter
'''
# include_deleted is supported for below streams only
expected_streams = ['ads', 'adsets', 'campaigns']

##########################################################################
### First Sync with include_deleted = false
##########################################################################

# instantiate connection with the include_deleted = false
conn_id_1 = connections.ensure_connection(self)

# run check mode
found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1)

# table and field selection
test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1
if catalog.get('tap_stream_id') in expected_streams]
self.perform_and_verify_table_and_field_selection(conn_id_1, test_catalogs_1_all_fields, select_all_fields=True)

# run initial sync
record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1)
synced_records_1 = runner.get_records_from_target_output()

##########################################################################
### Second Sync with include_deleted = true
##########################################################################

# create a new connection with the include_deleted = true
conn_id_2 = connections.ensure_connection(self, original_properties=False)

# run check mode
found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2)

# table and field selection
test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2
if catalog.get('tap_stream_id') in expected_streams]
self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs_2_all_fields, select_all_fields=True)

# run sync
record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2)
synced_records_2 = runner.get_records_from_target_output()

for stream in expected_streams:
with self.subTest(stream=stream):

# expected primary keys
expected_primary_keys = self.expected_primary_keys()[stream]

# collect information about count of records
record_count_sync_1 = record_count_by_stream_1.get(stream, 0)
record_count_sync_2 = record_count_by_stream_2.get(stream, 0)

# collect list and set of primary keys for all the records
primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in synced_records_1.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in synced_records_2.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_sync_1 = set(primary_keys_list_1)
primary_keys_sync_2 = set(primary_keys_list_2)

# collect list of effective_status for all the records
records_status_sync1 = [message.get('data').get('effective_status')
for message in synced_records_1.get(stream).get('messages')
if message.get('action') == 'upsert']
records_status_sync2 = [message.get('data').get('effective_status')
for message in synced_records_2.get(stream).get('messages')
if message.get('action') == 'upsert']

# Verifying that no ARCHIVED records are returned for sync 1
self.assertNotIn('ARCHIVED', records_status_sync1)

# Verifying that ARCHIVED records are returned for sync 2
self.assertIn('ARCHIVED', records_status_sync2)

# Verify the number of records replicated in sync 2 is greater than the number
# of records replicated in sync 1
self.assertGreater(record_count_sync_2, record_count_sync_1)

# Verify the records replicated in sync 1 were also replicated in sync 2
self.assertTrue(primary_keys_sync_1.issubset(primary_keys_sync_2))
89 changes: 89 additions & 0 deletions tests/test_facebook_attribution_window.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os

from tap_tester import runner, connections

from base import FacebookBaseTest

class FacebookAttributionWindow(FacebookBaseTest):

# set attribution window
ATTRIBUTION_WINDOW = 7

@staticmethod
def name():
return "tap_tester_facebook_attribution_window"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return_value = {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date' : '2019-07-24T00:00:00Z',
'end_date' : '2019-07-26T00:00:00Z',
'insights_buffer_days': str(self.ATTRIBUTION_WINDOW)
}
if original:
return return_value

return_value["start_date"] = self.start_date
return return_value

def test_run(self):
self.run_test(self.ATTRIBUTION_WINDOW) # attribution window: 7

self.ATTRIBUTION_WINDOW = 28
self.run_test(self.ATTRIBUTION_WINDOW) # attribution window: 28

def run_test(self, attr_window):
"""
Test to check the attribution window
"""

conn_id = connections.ensure_connection(self)

# get start date
start_date = self.get_properties()['start_date']
# calculate start date with attribution window
start_date_with_attribution_window = self.timedelta_formatted(start_date, days=-attr_window)

# 'attribution window' is only supported for 'ads_insights' streams
expected_streams = []
for stream in self.expected_streams():
if self.is_insight(stream):
expected_streams.append(stream)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True)

# Run a sync job using orchestrator
self.run_and_verify_sync(conn_id)
sync_records = runner.get_records_from_target_output()

expected_replication_keys = self.expected_replication_keys()

for stream in expected_streams:
with self.subTest(stream=stream):

replication_key = next(iter(expected_replication_keys[stream]))

# get records
records = [record.get('data') for record in sync_records.get(stream).get('messages')]

# check for the record is between attribution date and start date
is_between = False

for record in records:
replication_key_value = record.get(replication_key)

# Verify the sync records respect the (simulated) start date value
self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window),
msg="The record does not respect the attribution window.")

# verify if the record's bookmark value is between start date and (simulated) start date value
if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) < self.parse_date(start_date):
is_between = True

self.assertTrue(is_between, msg="No record found between start date and attribution date.")
Loading

0 comments on commit 1af73ac

Please sign in to comment.