From 1b795e0640b618bf4f6946274482f59a7da7e818 Mon Sep 17 00:00:00 2001 From: Azmat Ali Date: Wed, 5 Feb 2025 15:52:00 -0500 Subject: [PATCH 1/2] HGI-7143 implemented archived products stream --- .gitignore | 2 ++ tap_hubspot_beta/streams.py | 68 +++++++++++++++++++++++++++++++++++++ tap_hubspot_beta/tap.py | 28 ++++++++++++--- 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index bbec7fc..e77d5c5 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,5 @@ dmypy.json # Pyre type checker .pyre/ +.vscode +.history diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 69508af..d22dbe3 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -1050,6 +1050,74 @@ def get_url_params(self, context, next_page_token): params["properties"] = "id,createdAt,updatedAt,archived,archivedAt" return params +class ArchivedProductsStream(hubspotV3Stream): + """Archived Products Stream""" + + name = "products_archived" + path = "crm/v3/objects/products?archived=true" + replication_key = "archivedAt" + properties_url = "properties/v2/products/properties" + primary_keys = ["id"] + + base_properties = [ + th.Property("id", th.StringType), + th.Property("archived", th.BooleanType), + th.Property("_hg_archived", th.BooleanType), + th.Property("archivedAt", th.DateTimeType), + th.Property("createdAt", th.DateTimeType), + th.Property("updatedAt", th.DateTimeType) + ] + + @property + def selected(self) -> bool: + """Check if stream is selected. + Returns: + True if the stream is selected. + """ + # It has to be in the catalog or it will cause issues + if not self._tap.catalog.get("products_archived"): + return False + + try: + # Make this stream auto-select if products is selected + self._tap.catalog["products_archived"] = self._tap.catalog["products"] + return self.mask.get((), False) or self._tap.catalog["products"].metadata.get(()).selected + except: + return self.mask.get((), False) + + def _write_record_message(self, record: dict) -> None: + """Write out a RECORD message. + Args: + record: A single stream record. + """ + for record_message in self._generate_record_messages(record): + # force this to think it's the products stream + record_message.stream = "products" + singer.write_message(record_message) + + @property + def metadata(self): + new_metadata = super().metadata + new_metadata[("properties", "archivedAt")].selected = True + new_metadata[("properties", "archivedAt")].selected_by_default = True + return new_metadata + + def get_url_params(self, context, next_page_token): + params = super().get_url_params(context, next_page_token) + if len(urlencode(params)) > 3000: + params["properties"] = "id,createdAt,updatedAt,archived,archivedAt" + return params + + def post_process(self, row, context): + row = super().post_process(row, context) + + rep_key = self.get_starting_timestamp(context).replace(tzinfo=pytz.utc) + archived_at = parse(row['archivedAt']).replace(tzinfo=pytz.utc) + + if archived_at > rep_key: + return row + + return None class TicketsStream(ObjectSearchV3): """Companies Stream""" diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index a81849d..ad62371 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -1,5 +1,6 @@ """hubspot tap class.""" +import os from typing import List from singer_sdk import Stream, Tap @@ -72,10 +73,28 @@ AssociationTasksDealsStream, DealsHistoryPropertiesStream, ContactsHistoryPropertiesStream, - ArchivedOwnersStream + ArchivedOwnersStream, + ArchivedProductsStream, ) -STREAM_TYPES = [ + #When a new stream is added to the tap, it would break existing test suites. +# By allowing caller to ignore the stream we are able ensure existing tests continue to pass. +# 1. Get the environment variable IGNORE_STREAMS and split by commas +ignore_streams = os.environ.get('IGNORE_STREAMS', '').split(',') +print(f"IGNORE_STREAMS: "+ os.environ.get('IGNORE_STREAMS', '')) + +# Function to add multiple streams to STREAM_TYPES if not in ignore_streams +def add_streams(stream_classes): + + stream_types = [] + for stream_class in stream_classes: + if stream_class.__name__ not in ignore_streams: + stream_types.append(stream_class) + else: + print(f"Ignored stream {stream_class.__name__} as it's in IGNORE_STREAMS.") + return stream_types + +STREAM_TYPES = add_streams([ ContactsStream, ListsStream, CompaniesStream, @@ -142,8 +161,9 @@ AssociationTasksDealsStream, DealsHistoryPropertiesStream, ContactsHistoryPropertiesStream, - ArchivedOwnersStream -] + ArchivedOwnersStream, + ArchivedProductsStream +]) class Taphubspot(Tap): From cf19b9d391520979600fb5ff363281d3ce99a38a Mon Sep 17 00:00:00 2001 From: Azmat Ali Date: Thu, 6 Feb 2025 05:43:07 -0500 Subject: [PATCH 2/2] HGI-7143 changed base class and refactored code --- tap_hubspot_beta/streams.py | 13 +------------ tap_hubspot_beta/tap.py | 5 +++-- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index d22dbe3..7b80244 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -1050,7 +1050,7 @@ def get_url_params(self, context, next_page_token): params["properties"] = "id,createdAt,updatedAt,archived,archivedAt" return params -class ArchivedProductsStream(hubspotV3Stream): +class ArchivedProductsStream(ArchivedStream): """Archived Products Stream""" name = "products_archived" @@ -1108,17 +1108,6 @@ def get_url_params(self, context, next_page_token): params["properties"] = "id,createdAt,updatedAt,archived,archivedAt" return params - def post_process(self, row, context): - row = super().post_process(row, context) - - rep_key = self.get_starting_timestamp(context).replace(tzinfo=pytz.utc) - archived_at = parse(row['archivedAt']).replace(tzinfo=pytz.utc) - - if archived_at > rep_key: - return row - - return None - class TicketsStream(ObjectSearchV3): """Companies Stream""" diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index ad62371..f4383d3 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -2,6 +2,7 @@ import os from typing import List +import logging from singer_sdk import Stream, Tap from singer_sdk import typing as th @@ -81,7 +82,7 @@ # By allowing caller to ignore the stream we are able ensure existing tests continue to pass. # 1. Get the environment variable IGNORE_STREAMS and split by commas ignore_streams = os.environ.get('IGNORE_STREAMS', '').split(',') -print(f"IGNORE_STREAMS: "+ os.environ.get('IGNORE_STREAMS', '')) +logging.info(f"IGNORE_STREAMS: "+ os.environ.get('IGNORE_STREAMS', '')) # Function to add multiple streams to STREAM_TYPES if not in ignore_streams def add_streams(stream_classes): @@ -91,7 +92,7 @@ def add_streams(stream_classes): if stream_class.__name__ not in ignore_streams: stream_types.append(stream_class) else: - print(f"Ignored stream {stream_class.__name__} as it's in IGNORE_STREAMS.") + logging.info(f"Ignored stream {stream_class.__name__} as it's in IGNORE_STREAMS.") return stream_types STREAM_TYPES = add_streams([