Skip to content

Commit

Permalink
Tdl 17689 breakdown sync method (#16)
Browse files Browse the repository at this point in the history
* break down sync method for better readbility

* Resolved pylint error

Co-authored-by: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com>
  • Loading branch information
prijendev and savan-chovatiya authored Feb 15, 2022
1 parent 60111c2 commit 3bcf786
Showing 1 changed file with 117 additions and 85 deletions.
202 changes: 117 additions & 85 deletions tap_activecampaign/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ def sync(
parent_id=None):

static_params = self.params
data_key = self.data_key
id_fields = self.key_properties
bookmark_query_field = self.bookmark_query_field
created_timestamp_field = self.created_timestamp
bookmark_field = next(iter(self.replication_keys or []), None)
# Get the latest bookmark for the stream and set the last_integer/datetime
last_datetime = None
Expand Down Expand Up @@ -234,43 +231,121 @@ def sync(
'?{}'.format(querystring) if params else ''))

# API request data
data = {}
data = self.client.get(
path=path,
params=querystring,
endpoint=self.stream_name)

# time_extracted: datetime when the data was extracted from the API
time_extracted = utils.now()
if not data or data is None or data == {}:
break # No data results

# Transform data with transform_json from transform.py
# The data_key identifies the array/list of records below the <root> element
# LOGGER.info('data = {}'.format(data)) # TESTING, comment out
transformed_data = [] # initialize the record list
data_list = []
data_dict = {}
if data_key in data:
if isinstance(data[data_key], list):
transformed_data = transform_json(data, self.stream_name, data_key)
elif isinstance(data[data_key], dict):
data_list.append(data[data_key])
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
else: # data_key not in data
if isinstance(data, list):
data_list = data
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
elif isinstance(data, dict):
data_list.append(data)
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
endpoint_total, total_records, record_count, page, offset, max_bookmark_value = self.get_and_transform_records(
querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total,
limit, total_records, record_count, page, offset, parent, parent_id, selected_streams)

# Update the state with the max_bookmark_value for the endpoint
# ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint
if bookmark_field:
self.write_bookmark(state, self.stream_name, max_bookmark_value)

# Return total_records (for all pages and date windows)
return endpoint_total

def transform_data(self, data):
"""
Transform data with transform_json from transform.py
"""
data_key = self.data_key

# The data_key identifies the array/list of records below the <root> element
transformed_data = [] # initialize the record list
data_list = []
data_dict = {}
if data_key in data:
if isinstance(data[data_key], list):
transformed_data = transform_json(data, self.stream_name, data_key)
elif isinstance(data[data_key], dict):
data_list.append(data[data_key])
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
else: # data_key not in data
if isinstance(data, list):
data_list = data
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
elif isinstance(data, dict):
data_list.append(data)
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)

return transformed_data

def sync_child_stream(self, children, transformed_data, catalog, state, start_date, selected_streams):
"""
sync the child stream. Loop through all children and if it is selected then collect data based on parent_id.
"""
id_fields = self.key_properties

for child_stream_name in children:
if child_stream_name in selected_streams:
LOGGER.info('START Syncing: {}'.format(child_stream_name))
child_stream_obj = STREAMS[child_stream_name](self.client)
child_stream_obj.write_schema(catalog, child_stream_name)
# For each parent record
for record in transformed_data:
i = 0
# Set parent_id
for id_field in id_fields:
if i == 0:
parent_id_field = id_field
if id_field == 'id':
parent_id_field = id_field
i = i + 1
parent_id = record.get(parent_id_field)

# sync_endpoint for child
LOGGER.info(
'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\
.format(child_stream_name, self.stream_name, parent_id))
child_path = child_stream_obj.path.format(str(parent_id))

child_total_records = child_stream_obj.sync(
client=self.client,
catalog=catalog,
state=state,
start_date=start_date,
path=child_path,
selected_streams=selected_streams,
parent=child_stream_obj.parent,
parent_id=parent_id)
LOGGER.info(
'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\
.format(child_stream_name, parent_id, child_total_records))
# End transformed data record loop
# End if child in selected streams
# End child streams for parent
# End if children

def get_and_transform_records(self, querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total,
limit, total_records, record_count, page, offset, parent, parent_id, selected_streams):

"""
Get the records using the client get request and transform it using transform_records
"""

bookmark_field = next(iter(self.replication_keys or []), None)
created_timestamp_field = self.created_timestamp
id_fields = self.key_properties

# API request data
data = {}
data = self.client.get(
path=path,
params=querystring,
endpoint=self.stream_name)

# time_extracted: datetime when the data was extracted from the API
time_extracted = utils.now()

if not data or data is None or data == {}:
LOGGER.info('No data for URL {}{}{}'.format(self.client.base_url, path, querystring)) # No data results
else: # has data
transformed_data = self.transform_data(data)

if not transformed_data or transformed_data is None:
LOGGER.info('No transformed data for data = {}'.format(data))
break # No data results
LOGGER.info('No transformed data for data = {}'.format(data)) # No data results

i = 0
for record in transformed_data:
Expand All @@ -289,7 +364,7 @@ def sync(
self.stream_name, key, record))
raise RuntimeError
i = i + 1

# Process records and get the max_bookmark_value and record_count for the set of records
max_bookmark_value, record_count = self.process_records(
catalog=catalog,
Expand All @@ -309,45 +384,8 @@ def sync(
children = self.children

if children:
for child_stream_name in children:
if child_stream_name in selected_streams:
LOGGER.info('START Syncing: {}'.format(child_stream_name))
child_stream_obj = STREAMS[child_stream_name](client)
child_stream_obj.write_schema(catalog, child_stream_name)
# For each parent record
for record in transformed_data:
i = 0
# Set parent_id
for id_field in id_fields:
if i == 0:
parent_id_field = id_field
if id_field == 'id':
parent_id_field = id_field
i = i + 1
parent_id = record.get(parent_id_field)

# sync_endpoint for child
LOGGER.info(
'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\
.format(child_stream_name, self.stream_name, parent_id))
child_path = child_stream_obj.path.format(str(parent_id))

child_total_records = child_stream_obj.sync(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
path=child_path,
selected_streams=selected_streams,
parent=child_stream_obj.parent,
parent_id=parent_id)
LOGGER.info(
'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\
.format(child_stream_name, parent_id, child_total_records))
# End transformed data record loop
# End if child in selected streams
# End child streams for parent
# End if children
# sync child stream
self.sync_child_stream(children, transformed_data, catalog, state, start_date, selected_streams)

# Parent record batch
# Get pagination details
Expand All @@ -373,13 +411,7 @@ def sync(
page = page + 1
# End page/batch - while next URL loop

# Update the state with the max_bookmark_value for the endpoint
# ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint
if bookmark_field:
self.write_bookmark(state, self.stream_name, max_bookmark_value)

# Return total_records (for all pages and date windows)
return endpoint_total
return endpoint_total, total_records, record_count, page, offset, max_bookmark_value

class Accounts(ActiveCampaign):
"""
Expand Down

0 comments on commit 3bcf786

Please sign in to comment.