-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
deprecate file_metadata stream #96
Changes from all commits
70b81a9
892d3f6
a55a103
e8fa9e9
de26fb6
9cfcd5e
447605d
5d7d388
5087fa0
c5d5da8
a53c768
9626d4e
bb8e62d
583a797
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,3 @@ | ||
{ | ||
"currently_syncing": "file_metadata", | ||
"bookmarks": { | ||
"file_metadata": "2019-09-27T22:34:39.000000Z" | ||
} | ||
"currently_syncing": "sheet_metadata" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,8 @@ | |
|
||
|
||
class BookmarksTest(GoogleSheetsBaseTest): | ||
"""Ensure all sheets streams will replicate based off of the most recent bookmarked state for 'file_metadata'""" | ||
"""Ensure all sheets streams will replicate in full table mode and create appropriate bookmarks""" | ||
|
||
conn_id = "" | ||
expected_test_streams = "" | ||
record_count_by_stream_1 = "" | ||
|
@@ -20,20 +20,12 @@ def name(): | |
def test_run(self): | ||
""" | ||
Run check mode, perform table and field selection, and run a sync. | ||
Replication can be triggered by pushing back state to prior 'file_metadata' state. | ||
Run a second sync after not updating state to verify no streams are being synced | ||
Run a 3rd sync and ensure full table streams are triggered by the simulated bookmark value. | ||
|
||
- Verify initial sync message actions include activate versions and the upserts | ||
- Verify no streams are synced when 'file_metadata' bookmark does not change | ||
- Verify that the third sync with the updated simulated bookmark has the same synced streams as the first sync | ||
- Verify that streams will sync based off of 'file_metadata' even when it is not selected | ||
Comment on lines
-28
to
-30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since file_metadata is now deprecated we don't rely on the file updated timestamp anymore, which was used as a pseudo bookmark earlier, however this does not impact as all the streams are full_table in nature. |
||
- check if bookmark include activate versions for all streams | ||
""" | ||
skipped_streams = {stream | ||
for stream in self.expected_streams() | ||
if stream.startswith('sadsheet')}.union({ | ||
'file_metadata' # testing case without file_metadata selected, but still providing bookmark | ||
}) | ||
if stream.startswith('sadsheet')} | ||
self.expected_test_streams = self.expected_streams() - skipped_streams | ||
|
||
# Grab connection, and run discovery and initial sync | ||
|
@@ -43,7 +35,7 @@ def test_run(self): | |
|
||
# Grab state to be updated later | ||
state = menagerie.get_state(self.conn_id) | ||
|
||
# BUG full table streams are saving bookmarks unnecessarily https://jira.talendforge.org/browse/TDL-14343 | ||
|
||
# BUG there are no activate version messages in the sheet_metadata, spreadsheet_metadata | ||
|
@@ -55,40 +47,7 @@ def test_run(self): | |
self.assertEqual('activate_version', sync1_message_actions[0]) | ||
self.assertEqual('activate_version', sync1_message_actions[-1]) | ||
self.assertSetEqual({'upsert'}, set(sync1_message_actions[1:-1])) | ||
|
||
# run a sync again, this time we shouldn't get any records back | ||
sync_job_name = runner.run_sync_mode(self, self.conn_id) | ||
exit_status = menagerie.get_exit_status(self.conn_id, sync_job_name) | ||
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) | ||
record_count_by_stream_2 = runner.examine_target_output_file( | ||
self, self.conn_id, self.expected_streams(), self.expected_primary_keys()) | ||
|
||
# verify we do not sync any unexpected streams | ||
self.assertSetEqual(set(), set(record_count_by_stream_2.keys())) | ||
|
||
# verify no records were synced for our expected streams | ||
for stream in self.expected_test_streams: | ||
with self.subTest(stream=stream): | ||
self.assertEqual(0, record_count_by_stream_2.get(stream, 0)) | ||
|
||
# roll back the state of the file_metadata stream to ensure that we sync sheets | ||
# based off of this state | ||
file_metadata_stream = 'file_metadata' | ||
file_metadata_bookmark = state['bookmarks'][file_metadata_stream] | ||
bookmark_datetime = datetime.datetime.strptime(file_metadata_bookmark, self.BOOKMARK_COMPARISON_FORMAT) | ||
target_datetime = bookmark_datetime + datetime.timedelta(days=-1) | ||
target_bookmark = datetime.datetime.strftime(target_datetime, self.BOOKMARK_COMPARISON_FORMAT) | ||
|
||
new_state = copy.deepcopy(state) | ||
new_state['bookmarks'][file_metadata_stream] = target_bookmark | ||
|
||
menagerie.set_state(self.conn_id, new_state) | ||
|
||
record_count_by_stream_3 = self.run_and_verify_sync(self.conn_id) | ||
synced_records_3 = runner.get_records_from_target_output() | ||
|
||
# verify we sync sheets based off the state of file_metadata | ||
self.assertDictEqual(self.record_count_by_stream_1, record_count_by_stream_3) | ||
self.assertIn(stream, state["bookmarks"].keys()) | ||
|
||
def starter(self): | ||
""" | ||
|
@@ -102,7 +61,7 @@ def starter(self): | |
### Instantiate connection | ||
########################################################################## | ||
self.conn_id = connections.ensure_connection(self) | ||
|
||
########################################################################## | ||
### Discovery without the backoff | ||
########################################################################## | ||
|
@@ -118,7 +77,7 @@ def starter(self): | |
self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match") | ||
LOGGER.info("discovered schemas are OK") | ||
|
||
|
||
# table and field selection | ||
test_catalogs = [catalog for catalog in found_catalogs | ||
if catalog.get('stream_name') in self.expected_test_streams] | ||
|
@@ -143,7 +102,3 @@ def starter(self): | |
msg="failed to replicate any data: {}".format(self.record_count_by_stream_1) | ||
) | ||
LOGGER.info("total replicated row count: %s", sum(self.record_count_by_stream_1.values())) | ||
|
||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include some stream name with the value. The state file looks incomplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @sgandhi1311 All the streams are essentially full table so we don't store date specific bookmarks for any stream.