From 7bb87dbf3ad55153b90c5c7634658c99e6961f23 Mon Sep 17 00:00:00 2001 From: prijendev Date: Tue, 8 Feb 2022 15:26:07 +0530 Subject: [PATCH 1/2] break down sync method for better readbility --- tap_activecampaign/streams.py | 205 ++++++++++++++++++++-------------- 1 file changed, 120 insertions(+), 85 deletions(-) diff --git a/tap_activecampaign/streams.py b/tap_activecampaign/streams.py index 11f15f4..8ad6c43 100644 --- a/tap_activecampaign/streams.py +++ b/tap_activecampaign/streams.py @@ -184,10 +184,7 @@ def sync( parent_id=None): static_params = self.params - data_key = self.data_key - id_fields = self.key_properties bookmark_query_field = self.bookmark_query_field - created_timestamp_field = self.created_timestamp bookmark_field = next(iter(self.replication_keys or []), None) # Get the latest bookmark for the stream and set the last_integer/datetime last_datetime = None @@ -234,43 +231,124 @@ def sync( '?{}'.format(querystring) if params else '')) # API request data - data = {} - data = self.client.get( - path=path, - params=querystring, - endpoint=self.stream_name) - - # time_extracted: datetime when the data was extracted from the API - time_extracted = utils.now() - if not data or data is None or data == {}: - break # No data results - - # Transform data with transform_json from transform.py - # The data_key identifies the array/list of records below the element - # LOGGER.info('data = {}'.format(data)) # TESTING, comment out - transformed_data = [] # initialize the record list - data_list = [] - data_dict = {} - if data_key in data: - if isinstance(data[data_key], list): - transformed_data = transform_json(data, self.stream_name, data_key) - elif isinstance(data[data_key], dict): - data_list.append(data[data_key]) - data_dict[data_key] = data_list - transformed_data = transform_json(data_dict, self.stream_name, data_key) - else: # data_key not in data - if isinstance(data, list): - data_list = data - data_dict[data_key] = data_list - transformed_data = transform_json(data_dict, self.stream_name, data_key) - elif isinstance(data, dict): - data_list.append(data) - data_dict[data_key] = data_list - transformed_data = transform_json(data_dict, self.stream_name, data_key) + endpoint_total, total_records, record_count, page, offset, max_bookmark_value = self.get_and_transform_records( + querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total, + limit, total_records, record_count, page, offset, parent, parent_id, selected_streams) + + # Update the state with the max_bookmark_value for the endpoint + # ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint + if bookmark_field: + self.write_bookmark(state, self.stream_name, max_bookmark_value) + + # Return total_records (for all pages and date windows) + return endpoint_total + def transform_data(self, data): + """ + Transform data with transform_json from transform.py + """ + data_key = self.data_key + + # The data_key identifies the array/list of records below the element + transformed_data = [] # initialize the record list + data_list = [] + data_dict = {} + if data_key in data: + if isinstance(data[data_key], list): + transformed_data = transform_json(data, self.stream_name, data_key) + elif isinstance(data[data_key], dict): + data_list.append(data[data_key]) + data_dict[data_key] = data_list + transformed_data = transform_json(data_dict, self.stream_name, data_key) + else: # data_key not in data + if isinstance(data, list): + data_list = data + data_dict[data_key] = data_list + transformed_data = transform_json(data_dict, self.stream_name, data_key) + elif isinstance(data, dict): + data_list.append(data) + data_dict[data_key] = data_list + transformed_data = transform_json(data_dict, self.stream_name, data_key) + + return transformed_data + + def sync_child_stream(self, children, transformed_data, catalog, state, start_date, selected_streams): + """ + sync the child stream. Loop through all children and if it is selected then collect data based on parent_id. + """ + id_fields = self.key_properties + + for child_stream_name in children: + if child_stream_name in selected_streams: + LOGGER.info('START Syncing: {}'.format(child_stream_name)) + child_stream_obj = STREAMS[child_stream_name](self.client) + child_stream_obj.write_schema(catalog, child_stream_name) + # For each parent record + for record in transformed_data: + i = 0 + # Set parent_id + for id_field in id_fields: + if i == 0: + parent_id_field = id_field + if id_field == 'id': + parent_id_field = id_field + i = i + 1 + parent_id = record.get(parent_id_field) + + # sync_endpoint for child + LOGGER.info( + 'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\ + .format(child_stream_name, self.stream_name, parent_id)) + child_path = child_stream_obj.path.format(str(parent_id)) + + child_total_records = child_stream_obj.sync( + client=self.client, + catalog=catalog, + state=state, + start_date=start_date, + path=child_path, + selected_streams=selected_streams, + parent=child_stream_obj.parent, + parent_id=parent_id) + LOGGER.info( + 'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\ + .format(child_stream_name, parent_id, child_total_records)) + # End transformed data record loop + # End if child in selected streams + # End child streams for parent + # End if children + + def get_and_transform_records(self, querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total, + limit, total_records, record_count, page, offset, parent, parent_id, selected_streams): + + """ + Get the records using the client get request and transform it using transform_records + """ + + bookmark_field = next(iter(self.replication_keys or []), None) + created_timestamp_field = self.created_timestamp + id_fields = self.key_properties + + # API request data + data = {} + data = self.client.get( + path=path, + params=querystring, + endpoint=self.stream_name) + + # time_extracted: datetime when the data was extracted from the API + time_extracted = utils.now() + + if not data or data is None or data == {}: # No data results + LOGGER.info('No data for URL {}{}{}'.format( + self.client.base_url, + path, + '?{}'.format(querystring) if params else '')) + else: # has data + transformed_data = self.transform_data(data) + if not transformed_data or transformed_data is None: - LOGGER.info('No transformed data for data = {}'.format(data)) - break # No data results + LOGGER.info('No transformed data for data = {}'.format(data)) # No data results i = 0 for record in transformed_data: @@ -289,7 +367,7 @@ def sync( self.stream_name, key, record)) raise RuntimeError i = i + 1 - + # Process records and get the max_bookmark_value and record_count for the set of records max_bookmark_value, record_count = self.process_records( catalog=catalog, @@ -309,45 +387,8 @@ def sync( children = self.children if children: - for child_stream_name in children: - if child_stream_name in selected_streams: - LOGGER.info('START Syncing: {}'.format(child_stream_name)) - child_stream_obj = STREAMS[child_stream_name](client) - child_stream_obj.write_schema(catalog, child_stream_name) - # For each parent record - for record in transformed_data: - i = 0 - # Set parent_id - for id_field in id_fields: - if i == 0: - parent_id_field = id_field - if id_field == 'id': - parent_id_field = id_field - i = i + 1 - parent_id = record.get(parent_id_field) - - # sync_endpoint for child - LOGGER.info( - 'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\ - .format(child_stream_name, self.stream_name, parent_id)) - child_path = child_stream_obj.path.format(str(parent_id)) - - child_total_records = child_stream_obj.sync( - client=client, - catalog=catalog, - state=state, - start_date=start_date, - path=child_path, - selected_streams=selected_streams, - parent=child_stream_obj.parent, - parent_id=parent_id) - LOGGER.info( - 'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\ - .format(child_stream_name, parent_id, child_total_records)) - # End transformed data record loop - # End if child in selected streams - # End child streams for parent - # End if children + # sync child stream + self.sync_child_stream(children, transformed_data, catalog, state, start_date, selected_streams) # Parent record batch # Get pagination details @@ -373,13 +414,7 @@ def sync( page = page + 1 # End page/batch - while next URL loop - # Update the state with the max_bookmark_value for the endpoint - # ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint - if bookmark_field: - self.write_bookmark(state, self.stream_name, max_bookmark_value) - - # Return total_records (for all pages and date windows) - return endpoint_total + return endpoint_total, total_records, record_count, page, offset, max_bookmark_value class Accounts(ActiveCampaign): """ From 4cb1cd46eac34888888a8c9119314e8affca6128 Mon Sep 17 00:00:00 2001 From: prijendev Date: Tue, 8 Feb 2022 15:33:53 +0530 Subject: [PATCH 2/2] Resolved pylint error --- tap_activecampaign/streams.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tap_activecampaign/streams.py b/tap_activecampaign/streams.py index 8ad6c43..bc9682e 100644 --- a/tap_activecampaign/streams.py +++ b/tap_activecampaign/streams.py @@ -339,11 +339,8 @@ def get_and_transform_records(self, querystring, path, max_bookmark_value, state # time_extracted: datetime when the data was extracted from the API time_extracted = utils.now() - if not data or data is None or data == {}: # No data results - LOGGER.info('No data for URL {}{}{}'.format( - self.client.base_url, - path, - '?{}'.format(querystring) if params else '')) + if not data or data is None or data == {}: + LOGGER.info('No data for URL {}{}{}'.format(self.client.base_url, path, querystring)) # No data results else: # has data transformed_data = self.transform_data(data)