Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 17689 breakdown sync method #16

Merged
merged 3 commits into from
Feb 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 117 additions & 85 deletions tap_activecampaign/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ def sync(
parent_id=None):

static_params = self.params
data_key = self.data_key
id_fields = self.key_properties
bookmark_query_field = self.bookmark_query_field
created_timestamp_field = self.created_timestamp
bookmark_field = next(iter(self.replication_keys or []), None)
# Get the latest bookmark for the stream and set the last_integer/datetime
last_datetime = None
Expand Down Expand Up @@ -234,43 +231,121 @@ def sync(
'?{}'.format(querystring) if params else ''))

# API request data
data = {}
data = self.client.get(
path=path,
params=querystring,
endpoint=self.stream_name)

# time_extracted: datetime when the data was extracted from the API
time_extracted = utils.now()
if not data or data is None or data == {}:
break # No data results

# Transform data with transform_json from transform.py
# The data_key identifies the array/list of records below the <root> element
# LOGGER.info('data = {}'.format(data)) # TESTING, comment out
transformed_data = [] # initialize the record list
data_list = []
data_dict = {}
if data_key in data:
if isinstance(data[data_key], list):
transformed_data = transform_json(data, self.stream_name, data_key)
elif isinstance(data[data_key], dict):
data_list.append(data[data_key])
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
else: # data_key not in data
if isinstance(data, list):
data_list = data
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
elif isinstance(data, dict):
data_list.append(data)
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
endpoint_total, total_records, record_count, page, offset, max_bookmark_value = self.get_and_transform_records(
querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total,
limit, total_records, record_count, page, offset, parent, parent_id, selected_streams)

# Update the state with the max_bookmark_value for the endpoint
# ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint
if bookmark_field:
self.write_bookmark(state, self.stream_name, max_bookmark_value)

# Return total_records (for all pages and date windows)
return endpoint_total

def transform_data(self, data):
"""
Transform data with transform_json from transform.py
"""
data_key = self.data_key

# The data_key identifies the array/list of records below the <root> element
transformed_data = [] # initialize the record list
data_list = []
data_dict = {}
if data_key in data:
if isinstance(data[data_key], list):
transformed_data = transform_json(data, self.stream_name, data_key)
elif isinstance(data[data_key], dict):
data_list.append(data[data_key])
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
else: # data_key not in data
if isinstance(data, list):
data_list = data
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)
elif isinstance(data, dict):
data_list.append(data)
data_dict[data_key] = data_list
transformed_data = transform_json(data_dict, self.stream_name, data_key)

return transformed_data

def sync_child_stream(self, children, transformed_data, catalog, state, start_date, selected_streams):
"""
sync the child stream. Loop through all children and if it is selected then collect data based on parent_id.
"""
id_fields = self.key_properties

for child_stream_name in children:
if child_stream_name in selected_streams:
LOGGER.info('START Syncing: {}'.format(child_stream_name))
child_stream_obj = STREAMS[child_stream_name](self.client)
child_stream_obj.write_schema(catalog, child_stream_name)
# For each parent record
for record in transformed_data:
i = 0
# Set parent_id
for id_field in id_fields:
if i == 0:
parent_id_field = id_field
if id_field == 'id':
parent_id_field = id_field
i = i + 1
parent_id = record.get(parent_id_field)

# sync_endpoint for child
LOGGER.info(
'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\
.format(child_stream_name, self.stream_name, parent_id))
child_path = child_stream_obj.path.format(str(parent_id))

child_total_records = child_stream_obj.sync(
client=self.client,
catalog=catalog,
state=state,
start_date=start_date,
path=child_path,
selected_streams=selected_streams,
parent=child_stream_obj.parent,
parent_id=parent_id)
LOGGER.info(
'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\
.format(child_stream_name, parent_id, child_total_records))
# End transformed data record loop
# End if child in selected streams
# End child streams for parent
# End if children

def get_and_transform_records(self, querystring, path, max_bookmark_value, state, catalog, start_date, last_datetime, endpoint_total,
limit, total_records, record_count, page, offset, parent, parent_id, selected_streams):

"""
Get the records using the client get request and transform it using transform_records
"""

bookmark_field = next(iter(self.replication_keys or []), None)
created_timestamp_field = self.created_timestamp
id_fields = self.key_properties

# API request data
data = {}
data = self.client.get(
path=path,
params=querystring,
endpoint=self.stream_name)

# time_extracted: datetime when the data was extracted from the API
time_extracted = utils.now()

if not data or data is None or data == {}:
LOGGER.info('No data for URL {}{}{}'.format(self.client.base_url, path, querystring)) # No data results
else: # has data
transformed_data = self.transform_data(data)

if not transformed_data or transformed_data is None:
LOGGER.info('No transformed data for data = {}'.format(data))
break # No data results
LOGGER.info('No transformed data for data = {}'.format(data)) # No data results

i = 0
for record in transformed_data:
Expand All @@ -289,7 +364,7 @@ def sync(
self.stream_name, key, record))
raise RuntimeError
i = i + 1

# Process records and get the max_bookmark_value and record_count for the set of records
max_bookmark_value, record_count = self.process_records(
catalog=catalog,
Expand All @@ -309,45 +384,8 @@ def sync(
children = self.children

if children:
for child_stream_name in children:
if child_stream_name in selected_streams:
LOGGER.info('START Syncing: {}'.format(child_stream_name))
child_stream_obj = STREAMS[child_stream_name](client)
child_stream_obj.write_schema(catalog, child_stream_name)
# For each parent record
for record in transformed_data:
i = 0
# Set parent_id
for id_field in id_fields:
if i == 0:
parent_id_field = id_field
if id_field == 'id':
parent_id_field = id_field
i = i + 1
parent_id = record.get(parent_id_field)

# sync_endpoint for child
LOGGER.info(
'START Sync for Stream: {}, parent_stream: {}, parent_id: {}'\
.format(child_stream_name, self.stream_name, parent_id))
child_path = child_stream_obj.path.format(str(parent_id))

child_total_records = child_stream_obj.sync(
client=client,
catalog=catalog,
state=state,
start_date=start_date,
path=child_path,
selected_streams=selected_streams,
parent=child_stream_obj.parent,
parent_id=parent_id)
LOGGER.info(
'FINISHED Sync for Stream: {}, parent_id: {}, total_records: {}'\
.format(child_stream_name, parent_id, child_total_records))
# End transformed data record loop
# End if child in selected streams
# End child streams for parent
# End if children
# sync child stream
self.sync_child_stream(children, transformed_data, catalog, state, start_date, selected_streams)

# Parent record batch
# Get pagination details
Expand All @@ -373,13 +411,7 @@ def sync(
page = page + 1
# End page/batch - while next URL loop

# Update the state with the max_bookmark_value for the endpoint
# ActiveCampaign API does not allow page/batch sorting; bookmark written for endpoint
if bookmark_field:
self.write_bookmark(state, self.stream_name, max_bookmark_value)

# Return total_records (for all pages and date windows)
return endpoint_total
return endpoint_total, total_records, record_count, page, offset, max_bookmark_value

class Accounts(ActiveCampaign):
"""
Expand Down