Skip to content

Commit

Permalink
Add intercom custom attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
tspenov committed Jul 24, 2023
1 parent e4c537f commit cc67e54
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
4 changes: 4 additions & 0 deletions config/scheduler_config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,9 @@ config :sanbase, Sanbase.Scrapers.Scheduler,
manage_pinecone_index: [
schedule: "15 * * * *",
task: {Sanbase.OpenAI, :manage_pinecone_index, []}
],
sync_stripe_attributes_intercom: [
schedule: "30 01 * * *",
task: {Sanbase.Intercom.StripeAttributes, :run, []}
]
]
17 changes: 17 additions & 0 deletions lib/sanbase/intercom/intercom.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Sanbase.Intercom do

@user_events_url "https://api.intercom.io/events?type=user"
@contacts_url "https://api.intercom.io/contacts"
@data_attributes_url "https://api.intercom.io/data_attributes"

@batch_size 100
@max_retries 5
Expand Down Expand Up @@ -111,6 +112,22 @@ defmodule Sanbase.Intercom do
HTTPoison.post(@contacts_url, body, intercom_headers())
end

def create_data_attribute(name, type) do
body =
%{
name: name,
model: "contact",
data_type: type
}
|> Jason.encode!()

HTTPoison.post(@data_attributes_url, body, intercom_headers())
end

def list_data_attributes() do
HTTPoison.get(@data_attributes_url <> "?model=contact", intercom_headers())
end

def get_contact_by_user_id(user_id) do
body =
%{
Expand Down
184 changes: 184 additions & 0 deletions lib/sanbase/intercom/stripe_attributes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
defmodule Sanbase.Intercom.StripeAttributes do
require Logger

import Ecto.Query

alias Sanbase.Billing.Subscription
alias Sanbase.Repo
alias Sanbase.Billing.Subscription.Timeseries
alias Sanbase.Accounts.User
alias Sanbase.ClickhouseRepo
alias Sanbase.Intercom

def run do
all_stats = all_stats()
user_ids = fetch_all_db_user_ids()
run(user_ids, all_stats)
end

def run(user_ids, all_stats) do
total = length(user_ids)

Enum.with_index(user_ids)
|> Enum.each(fn {user_id, index} ->
try do
stats = stats(all_stats, user_id)

case Intercom.get_contact_by_user_id(user_id) do
nil ->
Intercom.create_contact(user_id)

%{"id" => intercom_id, "custom_attributes" => custom_attributes} ->
Intercom.update_contact(intercom_id, %{
"custom_attributes" => Map.merge(custom_attributes, stats)
})
end

# print progress every 100 user_ids
if rem(index, 100) == 0 do
progress_percent = (index + 1) / total * 100

Logger.info(
"stripe_attributes_intercom: Progress: Processed #{index + 1} out of #{total} user_ids (#{Float.round(progress_percent, 2)}%)"
)
end
rescue
exception ->
Logger.error(
"stripe_attributes_intercom: An error occurred processing user_id: #{user_id} - #{exception.message}"
)
end
end)
end

def fetch_all_db_user_ids() do
from(u in User, order_by: [asc: u.id], select: u.id)
|> Repo.all()
end

def all_stats do
%{
users_with_paid_active_subscriptions: users_with_paid_active_subscriptions(),
users_with_trialing_subscriptions: users_with_trialing_subscriptions(),
users_renewal_upcoming_at: users_renewal_upcoming_at(),
users_subscription_set_to_cancel: users_subscription_set_to_cancel(),
users_last_seen_at: users_last_seen_at()
}
end

def stats(all_stats, user_id) do
%{
paid_active_subscription:
Enum.member?(all_stats[:users_with_paid_active_subscriptions], user_id),
trialing_subscription: Enum.member?(all_stats[:users_with_trialing_subscriptions], user_id),
renewal_upcoming_at: Map.get(all_stats[:users_renewal_upcoming_at], user_id),
subscription_set_to_cancel:
Enum.member?(all_stats[:users_subscription_set_to_cancel], user_id),
last_seen_at: Map.get(all_stats[:users_last_seen_at], user_id)
}
end

def users_with_trialing_subscriptions do
from(
s in Subscription,
where: s.status == "trialing",
select: s.user_id
)
|> Repo.all()
end

def users_with_paid_active_subscriptions() do
stripe_customer_ids = current_active_paid_subs() |> Enum.map(& &1.customer_id)

stripe_customer_sanbase_user_map = stripe_customer_sanbase_user_map()

Enum.map(stripe_customer_ids, fn customer_id ->
Map.get(stripe_customer_sanbase_user_map, customer_id)
end)
end

def users_renewal_upcoming_at do
sub_ids = current_active_subs() |> Enum.map(& &1.id)

from(
s in Subscription,
where: s.stripe_id in ^sub_ids,
select: {s.user_id, s.current_period_end}
)
|> Repo.all()
|> Enum.map(fn {user_id, current_period_end} ->
{user_id, DateTime.to_unix(current_period_end)}
end)
|> Enum.into(%{})
end

def users_subscription_set_to_cancel() do
sub_ids = current_active_subs() |> Enum.map(& &1.id)

from(
s in Subscription,
where: s.stripe_id in ^sub_ids and s.cancel_at_period_end == true,
select: s.user_id
)
|> Repo.all()
end

def users_last_seen_at() do
sql = """
SELECT
user_id,
max(dt) as last_dt
FROM
api_call_data
GROUP BY user_id
"""

query_struct = Sanbase.Clickhouse.Query.new(sql, %{})

ClickhouseRepo.query_transform(query_struct, fn [user_id, dt] -> {user_id, dt} end)
|> case do
{:ok, result} ->
result
|> Enum.map(fn {user_id, dt} ->
{user_id, DateTime.from_naive!(dt, "Etc/UTC") |> DateTime.to_unix()}
end)
|> Enum.into(%{})

{:error, _} ->
%{}
end
end

def stripe_customer_sanbase_user_map do
from(
u in User,
where: not is_nil(u.stripe_customer_id),
select: {u.stripe_customer_id, u.id}
)
|> Repo.all()
|> Enum.into(%{})
end

def current_subs() do
from(s in Timeseries, order_by: [desc: s.id], limit: 1)
|> Repo.one()
|> Map.get(:subscriptions)
|> Enum.map(fn map_with_string_keys ->
Enum.map(map_with_string_keys, fn {key, value} ->
{String.to_existing_atom(key), value}
end)
|> Enum.into(%{})
end)
end

def current_active_subs() do
current_subs()
|> Timeseries.active_subscriptions()
end

def current_active_paid_subs() do
current_subs()
|> Timeseries.active_subscriptions()
|> Timeseries.paid()
end
end

0 comments on commit cc67e54

Please sign in to comment.