Skip to content

Commit

Permalink
feat(event): Use event stores for unique count aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Dec 11, 2023
1 parent 2ce5d02 commit 69034a7
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 106 deletions.
1 change: 0 additions & 1 deletion app/models/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class Event < EventsRecord
include OrganizationTimezone

belongs_to :organization
belongs_to :quantified_event, optional: true

validates :transaction_id, presence: true, uniqueness: { scope: %i[organization_id external_subscription_id] }
validates :code, presence: true
Expand Down
24 changes: 13 additions & 11 deletions app/services/billable_metrics/aggregations/unique_count_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,21 @@ def base_scope
quantified_events = QuantifiedEvent
.where(organization_id: billable_metric.organization_id)
.where(billable_metric_id: billable_metric.id)
.where(external_subscription_id: subscription.external_id)

quantified_events = if billable_metric.recurring?
quantified_events.where(external_subscription_id: subscription.external_id)
else
scope = Event.where(external_subscription_id: subscription.external_id)
.where('quantified_event_id IS NOT NULL')
.where(timestamp: subscription.started_at..)

scope = scope.where(timestamp: ...subscription.terminated_at) if subscription.terminated?

quantified_event_ids = scope.pluck('DISTINCT(quantified_event_id)')
unless billable_metric.recurring?
store = event_store_class.new(
code: billable_metric.code,
subscription:,
boundaries: {
from_datetime: subscription.started_at,
to_datetime: subscription.terminated_at,
},
group:,
)
store.aggregation_property = billable_metric.field_name

quantified_events.where(id: quantified_event_ids)
quantified_events = quantified_events.where(external_id: store.events_values)
end

return quantified_events unless group
Expand Down
4 changes: 0 additions & 4 deletions app/services/events/post_process_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ def should_handle_quantified_event?
def handle_quantified_event
service_result = quantified_event_service.call
service_result.raise_if_error!

# TODO: Remove this relation
event.quantified_event_id = service_result.quantified_event&.id
event.save!
end

def handle_pay_in_advance
Expand Down
2 changes: 1 addition & 1 deletion app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ class ClickhouseStore < BaseStore
# and should be deduplicated depending on the aggregation logic
def events(force_from: false)
scope = Clickhouse::EventsRaw.where(external_subscription_id: subscription.external_id)
.where('events_raw.timestamp <= ?', to_datetime)
.where(code:)
.order(timestamp: :asc)

# TODO: check how to deal with this since events are not stored forever in clickhouse
scope = scope.where('events_raw.timestamp >= ?', from_datetime) if force_from || use_from_boundary
scope = scope.where('events_raw.timestamp <= ?', to_datetime) if to_datetime
scope = scope.where(numeric_condition) if numeric_property

return scope unless group
Expand Down
2 changes: 1 addition & 1 deletion app/services/events/stores/postgres_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ module Stores
class PostgresStore < BaseStore
def events(force_from: false)
scope = Event.where(external_subscription_id: subscription.external_id)
.to_datetime(to_datetime)
.where(code:)
.order(timestamp: :asc)

scope = scope.from_datetime(from_datetime) if force_from || use_from_boundary
scope = scope.to_datetime(to_datetime) if to_datetime

if numeric_property
scope = scope.where(presence_condition)
Expand Down
89 changes: 45 additions & 44 deletions db/clickhouse_schema.rb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

class RemoveEventsQuantifiedEventsRelation < ActiveRecord::Migration[7.0]
def up
remove_column :events, :quantified_event_id
end

def down
add_column :events, :quantified_event_id, :uuid
add_index :events, :quantified_event_id
end
end
2 changes: 0 additions & 2 deletions db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@
let(:unique_count_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: added_at,
quantified_event:,
properties: { unique_id: quantified_event.external_id },
)
end
let(:quantified_event) do
create(
:quantified_event,
organization:,
added_at:,
removed_at:,
external_subscription_id: subscription.external_id,
Expand All @@ -86,6 +88,7 @@
let(:new_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 10.days,
removed_at:,
external_subscription_id: subscription.external_id,
Expand All @@ -95,11 +98,12 @@
let(:new_unique_count_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 10.days,
quantified_event: new_quantified_event,
properties: { unique_id: new_quantified_event.external_id },
)
end

Expand All @@ -123,16 +127,18 @@
let(:new_unique_count_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 10.days,
quantified_event: new_quantified_event,
properties: { unique_id: new_quantified_event.external_id },
)
end
let(:new_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 10.days,
removed_at:,
external_subscription_id: subscription.external_id,
Expand Down Expand Up @@ -275,14 +281,13 @@
let(:previous_event) do
create(
:event,
organization:,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
unique_id: previous_quantified_event.external_id,
},
)
end
Expand Down Expand Up @@ -335,21 +340,22 @@
end

context 'when event is given' do
let(:properties) { { unique_id: '111' } }
let(:properties) { { unique_id: new_quantified_event.external_id } }
let(:pay_in_advance_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 10.days,
properties:,
quantified_event: new_quantified_event,
)
end
let(:new_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 10.days,
removed_at:,
external_subscription_id: subscription.external_id,
Expand Down Expand Up @@ -393,20 +399,21 @@
let(:previous_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
unique_id: previous_quantified_event.external_id,
},
)
end

let(:previous_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 5.days,
removed_at:,
external_id: '000',
Expand Down Expand Up @@ -446,9 +453,8 @@
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
unique_id: previous_quantified_event.external_id,
},
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,13 @@
let(:previous_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
unique_id: previous_quantified_event.external_id,
},
)
end
Expand Down Expand Up @@ -318,16 +318,16 @@
end

context 'when event is given' do
let(:properties) { { unique_id: '111' } }
let(:properties) { { unique_id: new_quantified_event.external_id } }
let(:pay_in_advance_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 10.days,
properties:,
quantified_event: new_quantified_event,
)
end

Expand Down Expand Up @@ -360,24 +360,18 @@
let(:previous_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
},
metadata: {
current_aggregation: '7',
max_aggregation: '7',
max_aggregation_with_proration: '5.8',
},
properties: { unique_id: previous_quantified_event.external_id },
)
end
let(:previous_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 5.days,
removed_at:,
external_id: '000',
Expand All @@ -397,20 +391,21 @@
let(:previous_event) do
create(
:event,
organization_id: organization.id,
code: billable_metric.code,
external_customer_id: customer.external_id,
external_subscription_id: subscription.external_id,
timestamp: from_datetime + 5.days,
quantified_event: previous_quantified_event,
properties: {
unique_id: '000',
unique_id: previous_quantified_event.external_id,
},
)
end

let(:previous_quantified_event) do
create(
:quantified_event,
organization:,
added_at: from_datetime + 5.days,
removed_at:,
external_id: '000',
Expand Down
5 changes: 2 additions & 3 deletions spec/services/events/post_process_service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
let(:event) do
create(
:event,
organization:,
organization_id: organization.id,
external_customer_id:,
external_subscription_id:,
timestamp:,
Expand Down Expand Up @@ -108,15 +108,14 @@
}
end

it 'creates an association with a quantified event' do
it 'creates a quantified event' do
result = nil

aggregate_failures do
expect { result = process_service.call }
.to change(QuantifiedEvent, :count).by(1)

expect(result).to be_success
expect(event.reload.quantified_event).to be_present
end
end
end
Expand Down
Loading

0 comments on commit 69034a7

Please sign in to comment.