Skip to content

Commit

Permalink
feat(grouped_by): Add support for unique count (#1668)
Browse files Browse the repository at this point in the history
* feat(grouped-by): Basic refactoring for unique count aggregation

* feat(grouped_by): Add support for unique count
  • Loading branch information
vincent-pochet authored Feb 5, 2024
1 parent 6ff82e4 commit f9fcd13
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 13 deletions.
62 changes: 53 additions & 9 deletions app/services/billable_metrics/aggregations/unique_count_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def compute_aggregation(options: {})
# (exept for the current_usage update)
# as pay in advance aggregation will be computed on a single group
# with the grouped_by_values filter
def compute_grouped_by_aggregation(_options)
aggregations = [compute_single_aggregation.ceil(5)] # TODO: compute_grouped_aggregations
def compute_grouped_by_aggregation(options: {})
aggregations = compute_grouped_aggregations
return empty_results if aggregations.blank?

result.aggregations = aggregations.map do |aggregation|
Expand Down Expand Up @@ -120,6 +120,7 @@ def find_cached_aggregation(with_from_datetime: from_datetime, with_to_datetime:
'INNER JOIN quantified_events ON quantified_events.organization_id = cached_aggregations.organization_id',
'quantified_events.external_subscription_id = cached_aggregations.external_subscription_id',
'quantified_events.billable_metric_id = charges.billable_metric_id',
'quantified_events.grouped_by = cached_aggregations.grouped_by',
].join(' AND '))
.where(quantified_events: { external_id: event_store.events_values })
.where('quantified_events.added_at::timestamp(0) >= ?', with_from_datetime)
Expand All @@ -130,12 +131,7 @@ def find_cached_aggregation(with_from_datetime: from_datetime, with_to_datetime:
.where('quantified_events.removed_at::timestamp(0) >= ?', with_from_datetime)
.where('quantified_events.removed_at::timestamp(0) <= ?', with_to_datetime),
)

query = if grouped_by.present?
query.where(grouped_by:).where('quantified_events.grouped_by = ?', grouped_by)
else
query.where(grouped_by: {}).where("quantified_events.grouped_by = '{}'")
end
.where(grouped_by: grouped_by.presence || {})

# TODO: event_id for clickhouse events
query = query.where.not(event_id: event.id) if event.present?
Expand Down Expand Up @@ -176,7 +172,9 @@ def compute_single_aggregation
end

def compute_grouped_aggregations
# TODO
event_store.prepare_grouped_result(
ActiveRecord::Base.connection.select_all(grouped_aggregation_query).rows,
)
end

def aggregation_query
Expand All @@ -193,6 +191,52 @@ def aggregation_query
"SELECT (#{queries.map { |q| "COALESCE((#{q}), 0)" }.join(' + ')}) AS aggregation_result"
end

def grouped_aggregation_query
groups = grouped_by.map do |group|
ActiveRecord::Base.sanitize_sql_for_conditions(
['quantified_events.grouped_by->>?', group],
)
end
group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ')

# NOTE: Billed on the full period
persisted = persisted_query
.select(
[
groups.map.with_index { |group, index| "#{group} AS g_#{index}" },
'1::numeric AS group_sum',
].flatten.join(', '),
)
.group(groups.join(', '))
.to_sql

# NOTE: Added during the period
added = added_query
.select(
[
groups.map.with_index { |group, index| "#{group} AS g_#{index}" },
'1::numeric AS group_sum',
].flatten.join(', '),
)
.group(groups.join(', '))
.to_sql

<<-SQL
with persisted AS (#{persisted}),
added AS (#{added})
SELECT
#{group_names},
SUM(group_sum)
FROM (
(select * from persisted)
UNION ALL
(select * from added)
) grouped_count
GROUP BY #{group_names}
SQL
end

def persisted_query
return QuantifiedEvent.none unless billable_metric.recurring?

Expand Down
Loading

0 comments on commit f9fcd13

Please sign in to comment.