Skip to content

Commit

Permalink
fetch deleted stages (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 authored Jan 3, 2025
1 parent c78153e commit 101ec5e
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import pytz
from pendulum import parse
from urllib.parse import urlencode
import json
from itertools import groupby

from singer_sdk.helpers._state import (
finalize_state_progress_markers,
Expand Down Expand Up @@ -661,6 +663,63 @@ class DealsPipelinesStream(hubspotV1Stream):
))),
).to_dict()

def get_deleted_stages(self, row):
if not "stages" in row:
row["stages"] = []

# get stages ids to not send dups
row_stages = [stage["stageId"] for stage in row.get("stages")]
# get audit history of each pipeline
stages_history = requests.get(
f"{self.url_base}crm/v3/pipelines/deals/{row['pipelineId']}/audit",
headers=self.authenticator.auth_headers or {},
)
# join all stages from history
stages_ = []
for obj in stages_history.json().get("results", []):
obj_stages = json.loads(obj.get("rawObject", "{}")).get("stages") or []
stages_.extend(obj_stages)

# Sort data by stageId and by updatedAt (None values at the end)
stages_.sort(
key=lambda x: (
x["stageId"],
x["updatedAt"] if x["updatedAt"] is not None else float("-inf"),
),
reverse=True,
)

# Group by stageId and pick the first entry per group
for stage_id, group in groupby(stages_, key=lambda x: x["stageId"]):
# only add stages that are not in the response
if stage_id in row_stages:
continue
# get stage with highest updatedAt value, if no updatedAt values it means the
# stage was never updated after creation so we chose the one with updatedAt value null
group = list(group)
best = next((item for item in group if item["updatedAt"] is not None), None)
if best is None:
best = group[0]
# change active value to false (to know it's a deleted stage) and add it to the row
row["stages"].append(
{
"label": best["label"],
"displayOrder": best["displayOrder"],
"metadata": best["metadata"],
"stageId": best["stageId"],
"createdAt": best["createdAt"],
"updatedAt": best["updatedAt"],
"active": False,
}
)

return row

def post_process(self, row, context) -> dict:
row = super().post_process(row, context)
row = self.get_deleted_stages(row)
return row


class ContactListsStream(hubspotStreamSchema):
"""Lists Stream"""
Expand Down

0 comments on commit 101ec5e

Please sign in to comment.