Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add streams for properties with history #37

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 140 additions & 69 deletions tap_hubspot_beta/client_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class hubspotV3SearchStream(hubspotStream):
starting_time = None
page_size = 100
special_replication = False
bulk_child = True

def get_starting_time(self, context):
start_date = self.get_starting_timestamp(context)
Expand Down Expand Up @@ -115,76 +116,141 @@ def _sync_records( # noqa C901 # too complex
self, context: Optional[dict] = None
) -> None:
"""Sync records, emitting RECORD and STATE messages. """
record_count = 0
current_context: Optional[dict]
context_list: Optional[List[dict]]
context_list = [context] if context is not None else self.partitions
selected = self.selected

for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: Optional[dict] = (
None if current_context is None else copy.copy(current_context)
)
child_context_bulk = {"ids": []}
for record_result in self.get_records(current_context):
if isinstance(record_result, tuple):
# Tuple items should be the record and the child context
record, child_context = record_result
else:
record = record_result
child_context = copy.copy(
self.get_child_context(record=record, context=child_context)

if not self.bulk_child:
record_count = 0
current_context: Optional[dict]
context_list: Optional[List[dict]]
context_list = [context] if context is not None else self.partitions
selected = self.selected

for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: Optional[dict] = (
None if current_context is None else copy.copy(current_context)
)
for key, val in (state_partition_context or {}).items():
# Add state context to records if not already present
if key not in record:
record[key] = val

# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
child_context_bulk["ids"].append(child_context)
if len(child_context_bulk["ids"])>=5000:
child_context_bulk = {"ids": []}
for record_result in self.get_records(current_context):
if isinstance(record_result, tuple):
# Tuple items should be the record and the child context
record, child_context = record_result
else:
record = record_result
child_context = copy.copy(
self.get_child_context(record=record, context=child_context)
)
for key, val in (state_partition_context or {}).items():
# Add state context to records if not already present
if key not in record:
record[key] = val

# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
child_context_bulk["ids"].append(child_context)
if len(child_context_bulk["ids"])>=5000:
self._sync_children(child_context_bulk)
child_context_bulk = {"ids": []}
self._check_max_record_limit(record_count)
if selected:
if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0:
self._write_state_message()
self._write_record_message(record)
try:
self._increment_stream_state(record, context=current_context)
except InvalidStreamSortException as ex:
log_sort_error(
log_fn=self.logger.error,
ex=ex,
record_count=record_count + 1,
partition_record_count=partition_record_count + 1,
current_context=current_context,
state_partition_context=state_partition_context,
stream_name=self.name,
)
raise ex

record_count += 1
partition_record_count += 1
if len(child_context_bulk):
self._sync_children(child_context_bulk)
child_context_bulk = {"ids": []}
self._check_max_record_limit(record_count)
if selected:
if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0:
self._write_state_message()
self._write_record_message(record)
try:
self._increment_stream_state(record, context=current_context)
except InvalidStreamSortException as ex:
log_sort_error(
log_fn=self.logger.error,
ex=ex,
record_count=record_count + 1,
partition_record_count=partition_record_count + 1,
current_context=current_context,
state_partition_context=state_partition_context,
stream_name=self.name,
)
raise ex

record_count += 1
partition_record_count += 1
if len(child_context_bulk):
self._sync_children(child_context_bulk)
if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if not context:
# Finalize total stream only if we have the full full context.
# Otherwise will be finalized by tap at end of sync.
finalize_state_progress_markers(self.stream_state)
self._write_record_count_log(record_count=record_count, context=context)
# Reset interim bookmarks before emitting final STATE message:
self._write_state_message()

if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if not context:
# Finalize total stream only if we have the full full context.
# Otherwise will be finalized by tap at end of sync.
finalize_state_progress_markers(self.stream_state)
self._write_record_count_log(record_count=record_count, context=context)
# Reset interim bookmarks before emitting final STATE message:
self._write_state_message()
else:
record_count = 0
current_context: Optional[dict]
context_list: Optional[List[dict]]
context_list = [context] if context is not None else self.partitions
selected = self.selected

for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: Optional[dict] = (
None if current_context is None else copy.copy(current_context)
)
for record_result in self.get_records(current_context):
if isinstance(record_result, tuple):
# Tuple items should be the record and the child context
record, child_context = record_result
else:
record = record_result
child_context = copy.copy(
self.get_child_context(record=record, context=child_context)
)
for key, val in (state_partition_context or {}).items():
# Add state context to records if not already present
if key not in record:
record[key] = val

# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
self._sync_children(child_context)
self._check_max_record_limit(record_count)
if selected:
if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0:
self._write_state_message()
self._write_record_message(record)
try:
self._increment_stream_state(record, context=current_context)
except InvalidStreamSortException as ex:
log_sort_error(
log_fn=self.logger.error,
ex=ex,
record_count=record_count + 1,
partition_record_count=partition_record_count + 1,
current_context=current_context,
state_partition_context=state_partition_context,
stream_name=self.name,
)
raise ex

record_count += 1
partition_record_count += 1
if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if not context:
# Finalize total stream only if we have the full full context.
# Otherwise will be finalized by tap at end of sync.
finalize_state_progress_markers(self.stream_state)
self._write_record_count_log(record_count=record_count, context=context)
# Reset interim bookmarks before emitting final STATE message:
self._write_state_message()

class hubspotV3Stream(hubspotStream):
"""hubspot stream class."""
Expand All @@ -207,7 +273,12 @@ def get_url_params(
params["limit"] = self.page_size
params.update(self.additional_prarams)
if self.properties_url:
params["properties"] = ",".join(self.selected_properties)
# requesting either properties or properties with history
# if we send both it returns an error saying the url is too long
if params.get("propertiesWithHistory"):
params["propertiesWithHistory"] = ",".join(self.selected_properties)
else:
params["properties"] = ",".join(self.selected_properties)
if next_page_token:
params["after"] = next_page_token
return params
Expand Down
30 changes: 29 additions & 1 deletion tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,21 @@ def apply_catalog(self, catalog) -> None:
def get_child_context(self, record: dict, context) -> dict:
return {"id": record["id"]}


class ContactsHistoryPropertiesStream(hubspotV3Stream):
"""Contacts History Properties Stream"""

name = "contacts_history_properties"
path = "crm/v3/objects/contacts/{id}"
properties_url = "properties/v1/contacts/properties"
additional_prarams = {"propertiesWithHistory": True}
parent_stream_type = ContactsV3Stream
no_bulk_child = True
records_jsonpath = "$[*]"
base_properties = [
th.Property("propertiesWithHistory", th.CustomType({"type": ["object", "string"]})),
]

class ArchivedStream(hubspotV3Stream):

def post_process(self, row, context):
Expand All @@ -812,7 +827,6 @@ def post_process(self, row, context):
return None



class CompaniesStream(ObjectSearchV3):
"""Companies Stream"""

Expand Down Expand Up @@ -954,6 +968,20 @@ class DealsStream(ObjectSearchV3):

def get_child_context(self, record: dict, context) -> dict:
return {"id": record["id"]}

class DealsHistoryPropertiesStream(hubspotV3Stream):
"""Deals Stream"""

name = "deals_history_properties"
path = "crm/v3/objects/deals/{id}"
properties_url = "properties/v1/deals/properties"
additional_prarams = {"propertiesWithHistory": True}
parent_stream_type = DealsStream
no_bulk_child = True
records_jsonpath = "$[*]"
base_properties = [
th.Property("propertiesWithHistory", th.CustomType({"type": ["object", "string"]})),
]

class DealsAssociationParent(DealsStream):
name = "deals_association_parent"
Expand Down
4 changes: 4 additions & 0 deletions tap_hubspot_beta/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
AssociationTasksCompaniesStream,
AssociationTasksContactsStream,
AssociationTasksDealsStream,
DealsHistoryPropertiesStream,
ContactsHistoryPropertiesStream
)

STREAM_TYPES = [
Expand Down Expand Up @@ -137,6 +139,8 @@
AssociationTasksCompaniesStream,
AssociationTasksContactsStream,
AssociationTasksDealsStream,
DealsHistoryPropertiesStream,
ContactsHistoryPropertiesStream
]


Expand Down