Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate file_metadata stream #96

Merged
merged 14 commits into from
Sep 5, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.1.0
* Remove deprecated stream file_metadata [#96](https://github.com/singer-io/tap-google-sheets/pull/96)

## 3.0.0
* Remove support for date datatype [#95](https://github.com/singer-io/tap-google-sheets/pull/95)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-google-sheets',
version='3.0.0',
version='3.1.0',
description='Singer.io tap for extracting data from the Google Sheets v4 API',
author='jeff.huth@bytecode.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
Expand Down
44 changes: 0 additions & 44 deletions tap_google_sheets/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,49 +243,6 @@ def sync_stream(self, records, catalog, time_extracted=None):
LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(self.stream_name, record_count))
update_currently_syncing(self.state, None)

class FileMetadata(GoogleSheets):
stream_name = "file_metadata"
api = "files"
path = "files/{spreadsheet_id}"
key_properties = ["id"]
replication_method = "INCREMENTAL"
replication_keys = ["modifiedTime"]
params = {
"fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser",
"supportsAllDrives": True
}

def sync(self, catalog, state, selected_streams):
"""
sync file's metadata
"""
self.state = state
# variable to check if file is changed or not

# get date to start sync from, ie. start date or bookmark date
start_date = strptime_to_utc(get_bookmark(state, self.stream_name, self.config_start_date))

LOGGER.info("GET file_metadata")
file_metadata, time_extracted = self.get_data(stream_name=self.stream_name)
LOGGER.info("Transform file_metadata")

file_modified_time = strptime_to_utc(file_metadata.get("modifiedTime"))
LOGGER.info("last_datetime = {}, file_modified_time = {}".format(start_date, file_modified_time))
if file_modified_time <= start_date:
# if file is not changed, update the variable
LOGGER.info("file_modified_time <= last_datetime, FILE NOT CHANGED. EXITING.")
# return and stop syncing the next streams, as the file is not changed
return False, file_modified_time

# only perform sync if file metadata stream is selected and file is changed
if self.stream_name in selected_streams:
# transform file metadata records
file_metadata_transformed = internal_transform.transform_file_metadata(file_metadata)
# do sync
self.sync_stream(file_metadata_transformed, catalog, time_extracted)

return True, file_modified_time

class SpreadSheetMetadata(GoogleSheets):
stream_name = "spreadsheet_metadata"
api = "sheets"
Expand Down Expand Up @@ -645,7 +602,6 @@ def sync(self, catalog, state, sheets_loaded_records):
# "spreadsheet_metadata" -> get sheets in the spreadsheet and load sheet's records
# and prepare records for "sheet_metadata" and "sheets_loaded" streams
STREAMS = OrderedDict()
STREAMS['file_metadata'] = FileMetadata
STREAMS['spreadsheet_metadata'] = SpreadSheetMetadata
STREAMS['sheet_metadata'] = SheetMetadata
STREAMS['sheets_loaded'] = SheetsLoaded
11 changes: 0 additions & 11 deletions tap_google_sheets/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
def sync(client, config, catalog, state):
"""
Sync the streams, loop over STREAMS
"file_metadata" -> get the file's metadata and if the spreadsheet file is updated then continue the sync else stop the sync
"spreadsheet_metadata" -> get the spreadsheet's metadata
- sync the spreadsheet_metadata stream if selected
- get the sheets in the spreadsheet and loop over the sheets and sync the sheet's records if selected
Expand Down Expand Up @@ -60,14 +59,4 @@ def sync(client, config, catalog, state):
else:
stream_obj.sync(catalog, state, sheets_loaded_records)

# sync file metadata
elif stream_name == "file_metadata":
file_changed, file_modified_time = stream_obj.sync(catalog, state, selected_streams)
if not file_changed:
break

LOGGER.info("FINISHED Syncing: %s", stream_name)

# write "file_metadata" bookmark, as we have successfully synced all the sheet's records
# it will force to re-sync of there is any interrupt between the sync
write_bookmark(state, 'file_metadata', strftime(file_modified_time))
14 changes: 0 additions & 14 deletions tap_google_sheets/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
return spreadsheet_metadata_arr

# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
# Convert to dict
file_metadata_tf = json.loads(json.dumps(file_metadata))
# Remove keys
if file_metadata_tf.get('lastModifyingUser'):
file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
file_metadata_tf['lastModifyingUser'].pop('me', None)
file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
# Add record to an array of 1
file_metadata_arr = []
file_metadata_arr.append(file_metadata_tf)
return file_metadata_arr

# Convert Excel Date Serial Number (excel_date_sn) to datetime string
# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
def excel_to_dttm_str(string_value, excel_date_sn, timezone_str=None):
Expand Down
5 changes: 0 additions & 5 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ def expected_metadata(self):
# self.REPLICATION_KEYS: {"modified_at"}
}
return {
"file_metadata": {
self.PRIMARY_KEYS: {"id", },
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"modifiedTime"}
},
"sheet_metadata": {
self.PRIMARY_KEYS: {"sheetId"}, # "spreadsheetId"}, # BUG? | This is not in the real tap, "spreadsheetId"},
self.REPLICATION_METHOD: self.FULL_TABLE,
Expand Down
56 changes: 5 additions & 51 deletions tests/test_google_sheets_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class BookmarksTest(GoogleSheetsBaseTest):
"""Ensure all sheets streams will replicate based off of the most recent bookmarked state for 'file_metadata'"""

conn_id = ""
expected_test_streams = ""
record_count_by_stream_1 = ""
Expand All @@ -20,20 +20,12 @@ def name():
def test_run(self):
"""
Run check mode, perform table and field selection, and run a sync.
Replication can be triggered by pushing back state to prior 'file_metadata' state.
Run a second sync after not updating state to verify no streams are being synced
Run a 3rd sync and ensure full table streams are triggered by the simulated bookmark value.

- Verify initial sync message actions include activate versions and the upserts
- Verify no streams are synced when 'file_metadata' bookmark does not change
- Verify that the third sync with the updated simulated bookmark has the same synced streams as the first sync
- Verify that streams will sync based off of 'file_metadata' even when it is not selected
Comment on lines -28 to -30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since file_metadata is now deprecated we don't rely on the file updated timestamp anymore, which was used as a pseudo bookmark earlier, however this does not impact as all the streams are full_table in nature.
removed the extra criteria from the test for the (now) deprecated stream

"""
skipped_streams = {stream
for stream in self.expected_streams()
if stream.startswith('sadsheet')}.union({
'file_metadata' # testing case without file_metadata selected, but still providing bookmark
})
if stream.startswith('sadsheet')}
self.expected_test_streams = self.expected_streams() - skipped_streams

# Grab connection, and run discovery and initial sync
Expand All @@ -43,7 +35,7 @@ def test_run(self):

# Grab state to be updated later
state = menagerie.get_state(self.conn_id)

# BUG full table streams are saving bookmarks unnecessarily https://jira.talendforge.org/browse/TDL-14343

# BUG there are no activate version messages in the sheet_metadata, spreadsheet_metadata
Expand All @@ -56,40 +48,6 @@ def test_run(self):
self.assertEqual('activate_version', sync1_message_actions[-1])
self.assertSetEqual({'upsert'}, set(sync1_message_actions[1:-1]))

# run a sync again, this time we shouldn't get any records back
sync_job_name = runner.run_sync_mode(self, self.conn_id)
exit_status = menagerie.get_exit_status(self.conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)
record_count_by_stream_2 = runner.examine_target_output_file(
self, self.conn_id, self.expected_streams(), self.expected_primary_keys())

# verify we do not sync any unexpected streams
self.assertSetEqual(set(), set(record_count_by_stream_2.keys()))

# verify no records were synced for our expected streams
for stream in self.expected_test_streams:
with self.subTest(stream=stream):
self.assertEqual(0, record_count_by_stream_2.get(stream, 0))

# roll back the state of the file_metadata stream to ensure that we sync sheets
# based off of this state
file_metadata_stream = 'file_metadata'
file_metadata_bookmark = state['bookmarks'][file_metadata_stream]
bookmark_datetime = datetime.datetime.strptime(file_metadata_bookmark, self.BOOKMARK_COMPARISON_FORMAT)
target_datetime = bookmark_datetime + datetime.timedelta(days=-1)
target_bookmark = datetime.datetime.strftime(target_datetime, self.BOOKMARK_COMPARISON_FORMAT)

new_state = copy.deepcopy(state)
new_state['bookmarks'][file_metadata_stream] = target_bookmark

menagerie.set_state(self.conn_id, new_state)

record_count_by_stream_3 = self.run_and_verify_sync(self.conn_id)
synced_records_3 = runner.get_records_from_target_output()

# verify we sync sheets based off the state of file_metadata
self.assertDictEqual(self.record_count_by_stream_1, record_count_by_stream_3)

def starter(self):
"""
Instantiate connection, run discovery, and initial sync.
Expand All @@ -102,7 +60,7 @@ def starter(self):
### Instantiate connection
##########################################################################
self.conn_id = connections.ensure_connection(self)

##########################################################################
### Discovery without the backoff
##########################################################################
Expand All @@ -118,7 +76,7 @@ def starter(self):
self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match")
LOGGER.info("discovered schemas are OK")


# table and field selection
test_catalogs = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in self.expected_test_streams]
Expand All @@ -143,7 +101,3 @@ def starter(self):
msg="failed to replicate any data: {}".format(self.record_count_by_stream_1)
)
LOGGER.info("total replicated row count: %s", sum(self.record_count_by_stream_1.values()))