Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 15623 crest master #57

Merged
merged 8 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
command: |
python3 -m venv /usr/local/share/virtualenvs/tap-google-sheets
source /usr/local/share/virtualenvs/tap-google-sheets/bin/activate
pip install .[dev]
pip install .[test]
- run:
name: 'pylint'
command: |
Expand All @@ -25,6 +25,17 @@ jobs:
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_google_sheets/schemas/*.json
- add_ssh_keys
- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-google-sheets/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_google_sheets --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Integration Tests'
command: |
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=
"refresh_token": "YOUR_REFRESH_TOKEN",
"spreadsheet_id": "YOUR_GOOGLE_SPREADSHEET_ID",
"start_date": "2019-01-01T00:00:00Z",
"user_agent": "tap-google-sheets <api_user_email@example.com>"
"user_agent": "tap-google-sheets <api_user_email@example.com>",
"request_timeout": 300
}
```

Optionally, also create a `state.json` file. `currently_syncing` is an optional attribute used for identifying the last object to be synced in case the job is interrupted mid-stream. The next run would begin where the last job left off.
Only the `performance_reports` uses a bookmark. The date-time bookmark is stored in a nested structure based on the endpoint, site, and sub_type.
Only the `performance_reports` uses a bookmark. The date-time bookmark is stored in a nested structure based on the endpoint, site, and sub_type.The `request_timeout` is an optional paramater to set timeout for requests. Default: 300 seconds

```json
{
Expand Down
3 changes: 2 additions & 1 deletion config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"refresh_token": "YOUR_REFRESH_TOKEN",
"spreadsheet_id": "YOUR_GOOGLE_SPREADSHEET_ID",
"start_date": "2019-01-01T00:00:00Z",
"user_agent": "tap-google-search-console <api_user_email@example.com>"
"user_agent": "tap-google-search-console <api_user_email@example.com>",
"request_timeout": 300
}
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
install_requires=[
'backoff==1.8.0',
'requests==2.22.0',
'singer-python==5.9.0'
'singer-python==5.12.2'
],
extras_require={
'dev': [
'ipdb==0.11',
'test': [
'pylint',
'nose'
],
'dev': [
'ipdb==0.11',
]
},
entry_points='''
Expand Down
4 changes: 3 additions & 1 deletion tap_google_sheets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def main():
with GoogleClient(parsed_args.config['client_id'],
parsed_args.config['client_secret'],
parsed_args.config['refresh_token'],
parsed_args.config['user_agent']) as client:
parsed_args.config.get('request_timeout'),
parsed_args.config['user_agent']
) as client:

state = {}
if parsed_args.state:
Expand Down
32 changes: 26 additions & 6 deletions tap_google_sheets/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import singer
from singer import metrics
from singer import utils
from requests.exceptions import Timeout, ConnectionError

BASE_URL = 'https://www.googleapis.com'
GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
LOGGER = singer.get_logger()

REQUEST_TIMEOUT = 300

class Server5xxError(Exception):
pass
Expand Down Expand Up @@ -131,6 +132,7 @@ def __init__(self,
client_id,
client_secret,
refresh_token,
request_timeout=REQUEST_TIMEOUT,
user_agent=None):
self.__client_id = client_id
self.__client_secret = client_secret
Expand All @@ -140,16 +142,26 @@ def __init__(self,
self.__expires = None
self.__session = requests.Session()
self.base_url = None


# if request_timeout is other than 0,"0" or "" then use request_timeout
if request_timeout and float(request_timeout):
request_timeout = float(request_timeout)
else: # If value is 0,"0" or "" then set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
self.request_timeout = request_timeout

# Backoff request for 5 times at an interval of 10 seconds in case of Timeout or Connection error
@backoff.on_exception(backoff.constant,
(Timeout, ConnectionError),
max_tries=5,
interval=10,
jitter=None) # Interval value not consistent if jitter not None
def __enter__(self):
self.get_access_token()
return self

def __exit__(self, exception_type, exception_value, traceback):
self.__session.close()


@backoff.on_exception(backoff.expo,
Server5xxError,
max_tries=5,
Expand All @@ -172,7 +184,8 @@ def get_access_token(self):
'client_id': self.__client_id,
'client_secret': self.__client_secret,
'refresh_token': self.__refresh_token,
})
},
timeout=self.request_timeout)

if response.status_code >= 500:
raise Server5xxError()
Expand All @@ -186,6 +199,12 @@ def get_access_token(self):
LOGGER.info('Authorized, token expires = {}'.format(self.__expires))


# Backoff request for 5 times at an interval of 10 seconds when we get Timeout error
@backoff.on_exception(backoff.constant,
(Timeout),
max_tries=5,
interval=10,
jitter=None) # Interval value not consistent if jitter not None
# Rate Limit: https://developers.google.com/sheets/api/limits
# 100 request per 100 seconds per User
@backoff.on_exception(backoff.expo,
Expand Down Expand Up @@ -221,7 +240,8 @@ def request(self, method, path=None, url=None, api=None, **kwargs):
kwargs['headers']['Content-Type'] = 'application/json'

with metrics.http_request_timer(endpoint) as timer:
response = self.__session.request(method, url, **kwargs)

response = self.__session.request(method, url, timeout=self.request_timeout, **kwargs)
timer.tags[metrics.Tag.http_status_code] = response.status_code

if response.status_code >= 500:
Expand Down
36 changes: 31 additions & 5 deletions tap_google_sheets/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,18 @@ def get_sheet_schema_columns(sheet):
prior_header = None
i = 0
skipped = 0

# if no headers are present, log the message that sheet is skipped
if not headers:
LOGGER.warn('SKIPPING THE SHEET AS HEADERS ROW IS EMPTY. SHEET: {}'.format(sheet_title))

# Read column headers until end or 2 consecutive skipped headers
for header in headers:
# LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True)))
column_index = i + 1
column_letter = colnum_string(column_index)
header_value = header.get('formattedValue')
if header_value: # NOT skipped
if header_value: # if the column is NOT to be skipped
column_is_skipped = False
skipped = 0
column_name = '{}'.format(header_value)
Expand Down Expand Up @@ -170,12 +175,13 @@ def get_sheet_schema_columns(sheet):
LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}'.format(
sheet_title, column_name, column_letter, column_effective_value_type))
LOGGER.info('Converting to string.')
else: # skipped
else: # if the column is to be skipped
column_is_skipped = True
skipped = skipped + 1
column_index_str = str(column_index).zfill(2)
column_name = '__sdc_skip_col_{}'.format(column_index_str)
col_properties = {'type': ['null', 'string']}
# unsupported field description if the field is to be skipped
col_properties = {'type': ['null', 'string'], 'description': 'Column is unsupported and would be skipped because header is not available'}
column_gs_type = 'stringValue'
LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
sheet_title, column_name, column_letter))
Expand All @@ -184,12 +190,20 @@ def get_sheet_schema_columns(sheet):
if skipped >= 2:
# skipped = 2 consecutive skipped headers
# Remove prior_header column_name
# stop scanning the sheet and break
sheet_json_schema['properties'].pop(prior_header, None)
# prior index is the index of the column prior to the currently column
prior_index = column_index - 1
# added a new boolean key `prior_column_skipped` to check if the column is one of the two columns with consecutive headers
# as due to consecutive empty headers both the columns should not be included in the schema as well as the metadata
columns[prior_index-1]['prior_column_skipped'] = True
LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
sheet_title, column_name, column_letter))
break

else:
# skipped < 2 prepare `columns` dictionary with index, letter, column name, column type and
# if the column is to be skipped or not for each column in the list
column = {}
column = {
'columnIndex': column_index,
Expand All @@ -204,10 +218,10 @@ def get_sheet_schema_columns(sheet):
col_properties = {
'anyOf': [
col_properties,
{'type': ['null', 'string']}
{'type': ['null', 'string']} # all the date, time has string types in schema
]
}

# add the column properties in the `properties` in json schema for the respective column name
sheet_json_schema['properties'][column_name] = col_properties

prior_header = column_name
Expand All @@ -231,8 +245,10 @@ def get_sheet_metadata(sheet, spreadsheet_id, client):
params = stream_metadata.get('params', {})
sheet_title_encoded = urllib.parse.quote_plus(sheet_title)
sheet_title_escaped = re.escape(sheet_title)
# create querystring for preparing the request
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \
params.items()]).replace('{sheet_title}', sheet_title_encoded)
# create path for preparing the request
path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
spreadsheet_id), querystring)

Expand Down Expand Up @@ -280,7 +296,9 @@ def get_schemas(client, spreadsheet_id):
if stream_name == 'spreadsheet_metadata':
api = stream_metadata.get('api', 'sheets')
params = stream_metadata.get('params', {})
# prepare the query string for the request
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
# prepare the path for request
path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
spreadsheet_id), querystring)

Expand All @@ -306,6 +324,14 @@ def get_schemas(client, spreadsheet_id):
valid_replication_keys=None,
replication_method='FULL_TABLE'
)
# for each column check if the `columnSkipped` value is true and the `prior_column_skipped` is false or None
# in the columns dict. The `prior_column_skipped` would be true when it is the first column of the two
# consecutive empty headers column if true: update the incusion property to `unsupported`
for column in columns:
if column.get('columnSkipped') and not column.get('prior_column_skipped'):
mdata = metadata.to_map(sheet_mdata)
sheet_mdata = metadata.write(mdata, ('properties', column.get('columnName')), 'inclusion', 'unsupported')
sheet_mdata = metadata.to_list(mdata)
field_metadata[sheet_title] = sheet_mdata

return schemas, field_metadata
2 changes: 1 addition & 1 deletion tap_google_sheets/schemas/file_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"displayName": {
"type": ["null", "string"]
},
"emailAdress": {
"emailAddress": {
"type": ["null", "string"]
}
}
Expand Down
13 changes: 12 additions & 1 deletion tap_google_sheets/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,18 @@ def sync(client, config, catalog, state):
from_row=from_row,
columns=columns,
sheet_data_rows=sheet_data_rows)
if row_num < to_row:

# Here row_num is the addition of from_row and total records get in response(per batch).
# Condition row_num < to_row was checking that if records on the current page are less than expected(to_row) or not.
# If the condition returns true then it was breaking the loop.
# API does not return the last empty rows in response.
# For example, rows 199 and 200 are empty, and a total of 400 rows are there in the sheet. So, in 1st iteration,
# to_row = 200, from_row = 2, row_num = 2(from_row) + 197 = 199(1st row contain header value)
# So, the above condition become true and breaks the loop without syncing records from 201 to 400.
# sheet_data_rows is no of records return in the current page. If it's a whole blank page then stop looping.
# So, in the above case, it syncs records 201 to 400 also even if rows 199 and 200 are blank.
# Then when the next batch 401 to 600 is empty, it breaks the loop.
if not sheet_data_rows: # If a whole blank page found, then stop looping.
is_last_row = True

# Process records, send batch of records to target
Expand Down
11 changes: 10 additions & 1 deletion tests/test_google_sheets_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,16 @@ def test_run(self):
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys))
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"')
if stream == "file_metadata":
# BUG | below keys are not synced https://jira.talendforge.org/browse/TDL-14409

# As per google documentation https://developers.google.com/drive/api/v3/reference/files `teamDriveId` is deprecated. There is mentioned that use `driveId` instead.
# `driveId` is populated from items in the team shared drives. But stitch integration does not support shared team drive. So replicating driveid is not possible.
# So, these two fields will not be synced.
expected_all_keys.remove('teamDriveId')
expected_all_keys.remove('driveId')
# Earlier field `emailAddress` was defined as `emailAdress`(typo mismatch) in file_metadata.json.
# So, this particular field did not collected. Because API response contain `emailAddress` field.
# Now, typo has been correted and verifying that `emailAddress` field collected.
lastModifyingUser_fields = set(messages['messages'][0].get('data', {}).get('lastModifyingUser', {}).keys()) # Get `lastModifyingUser` from file_metadata records
# Verify that `emailAddress` field under `lastModifyingUser` collected.
self.assertTrue({'emailAddress'}.issubset(lastModifyingUser_fields), msg="emailAddress does not found in lastModifyingUser")
self.assertSetEqual(expected_all_keys, actual_all_keys)
10 changes: 6 additions & 4 deletions tests/test_google_sheets_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ def test_run(self):
expected_automatic_fields.remove('modifiedTime')
self.assertSetEqual(expected_automatic_fields, actual_automatic_fields)

# verify columns missing headers or missing values where __sdc_row = 2
# verify missing values where __sdc_row = 2
# are marked with inclusion of unsupported
# BUG_TDL-14475 | https://jira.talendforge.org/browse/TDL-14475
failing_streams = {'sadsheet-column-skip-bug', 'Item Master'} # BUG_TDL-14475
if stream not in failing_streams: # BUG_TDL-14475
# The card TDL-14475 was only about adding unsupported
# inclusion property for empty header values. The sheet
# `Item Master` has columns with empty row values
failing_streams = {'Item Master'}
if stream not in failing_streams:
self.assertSetEqual(expected_unsupported_fields, actual_unsupported_fields)

# verify that all other fields have inclusion of available
Expand Down
Loading