From f9fcd13fc2e8d91d395d82250e573dfd430316ec Mon Sep 17 00:00:00 2001 From: Vincent Pochet Date: Mon, 5 Feb 2024 14:54:46 +0100 Subject: [PATCH] feat(grouped_by): Add support for unique count (#1668) * feat(grouped-by): Basic refactoring for unique count aggregation * feat(grouped_by): Add support for unique count --- .../aggregations/unique_count_service.rb | 62 +++- .../aggregations/unique_count_service_spec.rb | 309 +++++++++++++++++- 2 files changed, 358 insertions(+), 13 deletions(-) diff --git a/app/services/billable_metrics/aggregations/unique_count_service.rb b/app/services/billable_metrics/aggregations/unique_count_service.rb index 69186071598..5f650677497 100644 --- a/app/services/billable_metrics/aggregations/unique_count_service.rb +++ b/app/services/billable_metrics/aggregations/unique_count_service.rb @@ -32,8 +32,8 @@ def compute_aggregation(options: {}) # (exept for the current_usage update) # as pay in advance aggregation will be computed on a single group # with the grouped_by_values filter - def compute_grouped_by_aggregation(_options) - aggregations = [compute_single_aggregation.ceil(5)] # TODO: compute_grouped_aggregations + def compute_grouped_by_aggregation(options: {}) + aggregations = compute_grouped_aggregations return empty_results if aggregations.blank? result.aggregations = aggregations.map do |aggregation| @@ -120,6 +120,7 @@ def find_cached_aggregation(with_from_datetime: from_datetime, with_to_datetime: 'INNER JOIN quantified_events ON quantified_events.organization_id = cached_aggregations.organization_id', 'quantified_events.external_subscription_id = cached_aggregations.external_subscription_id', 'quantified_events.billable_metric_id = charges.billable_metric_id', + 'quantified_events.grouped_by = cached_aggregations.grouped_by', ].join(' AND ')) .where(quantified_events: { external_id: event_store.events_values }) .where('quantified_events.added_at::timestamp(0) >= ?', with_from_datetime) @@ -130,12 +131,7 @@ def find_cached_aggregation(with_from_datetime: from_datetime, with_to_datetime: .where('quantified_events.removed_at::timestamp(0) >= ?', with_from_datetime) .where('quantified_events.removed_at::timestamp(0) <= ?', with_to_datetime), ) - - query = if grouped_by.present? - query.where(grouped_by:).where('quantified_events.grouped_by = ?', grouped_by) - else - query.where(grouped_by: {}).where("quantified_events.grouped_by = '{}'") - end + .where(grouped_by: grouped_by.presence || {}) # TODO: event_id for clickhouse events query = query.where.not(event_id: event.id) if event.present? @@ -176,7 +172,9 @@ def compute_single_aggregation end def compute_grouped_aggregations - # TODO + event_store.prepare_grouped_result( + ActiveRecord::Base.connection.select_all(grouped_aggregation_query).rows, + ) end def aggregation_query @@ -193,6 +191,52 @@ def aggregation_query "SELECT (#{queries.map { |q| "COALESCE((#{q}), 0)" }.join(' + ')}) AS aggregation_result" end + def grouped_aggregation_query + groups = grouped_by.map do |group| + ActiveRecord::Base.sanitize_sql_for_conditions( + ['quantified_events.grouped_by->>?', group], + ) + end + group_names = groups.map.with_index { |_, index| "g_#{index}" }.join(', ') + + # NOTE: Billed on the full period + persisted = persisted_query + .select( + [ + groups.map.with_index { |group, index| "#{group} AS g_#{index}" }, + '1::numeric AS group_sum', + ].flatten.join(', '), + ) + .group(groups.join(', ')) + .to_sql + + # NOTE: Added during the period + added = added_query + .select( + [ + groups.map.with_index { |group, index| "#{group} AS g_#{index}" }, + '1::numeric AS group_sum', + ].flatten.join(', '), + ) + .group(groups.join(', ')) + .to_sql + + <<-SQL + with persisted AS (#{persisted}), + added AS (#{added}) + + SELECT + #{group_names}, + SUM(group_sum) + FROM ( + (select * from persisted) + UNION ALL + (select * from added) + ) grouped_count + GROUP BY #{group_names} + SQL + end + def persisted_query return QuantifiedEvent.none unless billable_metric.recurring? diff --git a/spec/services/billable_metrics/aggregations/unique_count_service_spec.rb b/spec/services/billable_metrics/aggregations/unique_count_service_spec.rb index 7916099bbf9..e82754b2fdb 100644 --- a/spec/services/billable_metrics/aggregations/unique_count_service_spec.rb +++ b/spec/services/billable_metrics/aggregations/unique_count_service_spec.rb @@ -12,14 +12,12 @@ from_datetime:, to_datetime:, }, - filters: { - group:, - event: pay_in_advance_event, - }, + filters:, ) end let(:event_store_class) { Events::Stores::PostgresStore } + let(:filters) { { group:, event: pay_in_advance_event, grouped_by: } } let(:subscription) do create( @@ -36,6 +34,7 @@ let(:organization) { subscription.organization } let(:customer) { subscription.customer } let(:group) { nil } + let(:grouped_by) { nil } let(:billable_metric) do create( @@ -497,6 +496,308 @@ end end + describe '.grouped_by_aggregation' do + let(:grouped_by) { ['agent_name'] } + let(:agent_names) { %w[aragorn frodo] } + let(:quantified_event) { nil } + let(:unique_count_event) { nil } + + context 'when there is persisted event and event added in period' do + let(:unique_count_events) do + agent_names.map.with_index do |agent_name, index| + create( + :event, + organization_id: organization.id, + code: billable_metric.code, + external_customer_id: customer.external_id, + external_subscription_id: subscription.external_id, + timestamp: added_at, + properties: { + unique_id: quantified_events[index].external_id, + agent_name:, + }, + ) + end + end + + let(:quantified_events) do + agent_names.map do |agent_name| + create( + :quantified_event, + organization:, + added_at:, + removed_at:, + external_subscription_id: subscription.external_id, + billable_metric:, + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + let(:new_quantified_events) do + agent_names.map do |agent_name| + create( + :quantified_event, + organization:, + added_at: from_datetime + 10.days, + removed_at:, + external_subscription_id: subscription.external_id, + billable_metric:, + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + let(:new_unique_count_events) do + agent_names.map.with_index do |agent_name, index| + create( + :event, + organization_id: organization.id, + code: billable_metric.code, + external_customer_id: customer.external_id, + external_subscription_id: subscription.external_id, + timestamp: from_datetime + 10.days, + properties: { + unique_id: new_quantified_events[index].external_id, + agent_name:, + }, + ) + end + end + + before do + unique_count_events + new_unique_count_events + end + + it 'returns the correct result' do + result = count_service.aggregate + + expect(result.aggregations.count).to eq(2) + + result.aggregations.each do |aggregation| + expect(aggregation.grouped_by.keys).to include('agent_name') + expect(aggregation.grouped_by['agent_name']).to eq('frodo').or eq('aragorn') + expect(aggregation.count).to eq(2) + expect(aggregation.aggregation).to eq(2) + end + end + + context 'when billable metric is not recurring' do + let(:billable_metric) do + create( + :billable_metric, + organization:, + aggregation_type: 'unique_count_agg', + field_name: 'unique_id', + recurring: false, + ) + end + + it 'returns only the number of events ingested in the current period' do + result = count_service.aggregate + + expect(result.aggregations.count).to eq(2) + + result.aggregations.each do |aggregation| + expect(aggregation.grouped_by.keys).to include('agent_name') + expect(aggregation.grouped_by['agent_name']).to eq('frodo').or eq('aragorn') + expect(aggregation.count).to eq(1) + expect(aggregation.aggregation).to eq(1) + end + end + end + end + + context 'without events in the period' do + let(:unique_count_events) do + agent_names.map.with_index do |agent_name, index| + create( + :event, + organization_id: organization.id, + code: billable_metric.code, + external_customer_id: customer.external_id, + external_subscription_id: subscription.external_id, + timestamp: added_at, + properties: { + unique_id: quantified_events[index].external_id, + agent_name:, + }, + ) + end + end + + let(:quantified_events) do + agent_names.map do |agent_name| + create( + :quantified_event, + organization:, + added_at:, + removed_at:, + external_subscription_id: subscription.external_id, + billable_metric:, + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + before { unique_count_events } + + it 'returns only the number of events persisted events' do + result = count_service.aggregate + + expect(result.aggregations.count).to eq(2) + + result.aggregations.each do |aggregation| + expect(aggregation.grouped_by.keys).to include('agent_name') + expect(aggregation.grouped_by['agent_name']).to eq('frodo').or eq('aragorn') + expect(aggregation.count).to eq(1) + expect(aggregation.aggregation).to eq(1) + end + end + end + + context 'without quantified events' do + let(:quantified_event) { nil } + let(:unique_count_event) { nil } + + it 'returns an empty result' do + result = count_service.aggregate + + expect(result.aggregations.count).to eq(1) + + aggregation = result.aggregations.first + expect(aggregation.aggregation).to eq(0) + expect(aggregation.count).to eq(0) + expect(aggregation.grouped_by).to eq({ 'agent_name' => nil }) + end + end + + context 'when current usage context and charge is pay in advance' do + let(:options) do + { is_pay_in_advance: true, is_current_usage: true } + end + + let(:quantified_event) { nil } + let(:unique_count_event) { nil } + + let(:unique_count_events) do + agent_names.map.with_index do |agent_name, index| + create( + :event, + organization_id: organization.id, + code: billable_metric.code, + external_customer_id: customer.external_id, + external_subscription_id: subscription.external_id, + timestamp: added_at, + properties: { + unique_id: quantified_events[index].external_id, + agent_name:, + }, + ) + end + end + + let(:quantified_events) do + agent_names.map do |agent_name| + create( + :quantified_event, + organization:, + added_at:, + removed_at:, + external_subscription_id: subscription.external_id, + billable_metric:, + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + let(:previous_events) do + agent_names.map.with_index do |agent_name, index| + create( + :event, + organization_id: organization.id, + code: billable_metric.code, + external_customer_id: customer.external_id, + external_subscription_id: subscription.external_id, + timestamp: from_datetime + 5.days, + properties: { + unique_id: previous_quantified_events[index].external_id, + agent_name:, + }, + ) + end + end + + let(:previous_quantified_events) do + agent_names.map do |agent_name| + create( + :quantified_event, + organization:, + added_at: from_datetime + 5.days, + removed_at:, + external_id: '000', + external_subscription_id: subscription.external_id, + billable_metric:, + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + let(:cached_aggregations) do + agent_names.map.with_index do |agent_name, index| + create( + :cached_aggregation, + organization:, + charge:, + event_id: previous_events[index].id, + external_subscription_id: subscription.external_id, + timestamp: previous_events[index].timestamp, + current_aggregation: '1', + max_aggregation: '3', + grouped_by: { 'agent_name' => agent_name }, + ) + end + end + + before do + unique_count_events + cached_aggregations + end + + it 'returns period maximum as aggregation' do + result = count_service.aggregate(options:) + + expect(result.aggregations.count).to eq(2) + + result.aggregations.each do |aggregation| + expect(aggregation.grouped_by.keys).to include('agent_name') + expect(aggregation.grouped_by['agent_name']).to eq('frodo').or eq('aragorn') + expect(aggregation.count).to eq(2) + expect(aggregation.aggregation).to eq(4) + end + end + + context 'when cached aggregation does not exist' do + let(:cached_aggregations) { nil } + let(:previous_quantified_event) { nil } + + before { billable_metric.update!(recurring: false) } + + it 'returns an empty result' do + result = count_service.aggregate(options:) + + expect(result.aggregations.count).to eq(1) + + aggregation = result.aggregations.first + expect(aggregation.aggregation).to eq(0) + expect(aggregation.count).to eq(0) + expect(aggregation.grouped_by).to eq({ 'agent_name' => nil }) + end + end + end + end + describe '.per_event_aggregation' do let(:added_at) { from_datetime }