Skip to content

Commit

Permalink
feat(event): Add specs for all sum related event store methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Dec 11, 2023
1 parent 7226bc8 commit f374f42
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,7 @@ def cached_aggregation
.where(charge_id: charge.id)
.from_datetime(from_datetime)
.to_datetime(to_datetime)
.order(created_at: :desc)

# NOTE: For now we are using the relation between event and quantified event, but
# this relation will be removed in a comming refactor as it will not possible
# to handle clickhouse events that way
query = query
.joins('INNER JOIN events ON events.id = cached_aggregations.event_id')
.joins('INNER JOIN quantified_events ON events.quantified_event_id = quantified_events.id')
.where('quantified_events.added_at::timestamp(0) >= ?', from_datetime)
.where('quantified_events.added_at::timestamp(0) <= ?', to_datetime)
.where('quantified_events.removed_at::timestamp(0) IS NULL')
.or(
query
.where('quantified_events.removed_at::timestamp(0) >= ?', from_datetime)
.where('quantified_events.removed_at::timestamp(0) <= ?', to_datetime),
)
.order(timestamp: :desc)

# NOTE: For now we are using the relation between event and quantified event, but
# this relation will be removed in a comming refactor as it will not possible
Expand Down
8 changes: 4 additions & 4 deletions app/services/billable_metrics/breakdown/sum_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def persisted_breakdown
def period_breakdown
event_store.sum_date_breakdown.map do |aggregation|
OpenStruct.new(
date: aggregation.first.to_date,
action: aggregation.last.negative? ? 'remove' : 'add',
amount: aggregation.last,
duration: (to_date_in_customer_timezone + 1.day - aggregation.first).to_i,
date: aggregation[:date],
action: aggregation[:value].negative? ? 'remove' : 'add',
amount: aggregation[:value],
duration: (to_date_in_customer_timezone + 1.day - aggregation[:date]).to_i,
total_duration: period_duration,
)
end
Expand Down
16 changes: 14 additions & 2 deletions app/services/billable_metrics/prorated_aggregations/sum_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def compute_aggregation
result = 0.0

# NOTE: Billed on the full period
result += (persisted_sum || 0)
result += persisted_sum || 0

# NOTE: Added during the period
result + (event_store.prorated_sum(period_duration:) || 0)
Expand Down Expand Up @@ -89,7 +89,19 @@ def recurring_value
previous_charge_fee_units = previous_charge_fee&.units
return previous_charge_fee_units if previous_charge_fee_units

recurring_value_before_first_fee = persisted_query.sum("(#{sanitized_field_name})::numeric")
event_store = event_store_class.new(
code: billable_metric.code,
subscription:,
boundaries: { to_datetime: from_datetime },
group:,
event:,
)

event_store.use_from_boundary = false
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

recurring_value_before_first_fee = event_store.sum

((recurring_value_before_first_fee || 0) <= 0) ? nil : recurring_value_before_first_fee
end
Expand Down
5 changes: 4 additions & 1 deletion app/services/events/stores/base_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ def sum
end

def prorated_sum(period_duration:, persisted_duration: nil)
# TODO
raise NotImplementedError
end

# NOTE: returns the breakdown of the sum grouped by date
# The result format will be an array of hash with the format:
# [{ date: Date.parse('2023-11-27'), value: 12.9 }, ...]
def sum_date_breakdown
raise NotImplementedError
end
Expand Down
86 changes: 80 additions & 6 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def events(force_from: false)
.where(code:)
.order(timestamp: :asc)

# TODO: check how to deal with this since events are not stored forever in clickhouse
scope = scope.where('events_raw.timestamp >= ?', from_datetime) if force_from || use_from_boundary
scope = scope.where(numeric_condition) if numeric_property

Expand All @@ -30,8 +31,11 @@ def events_values(limit: nil, force_from: false)
scope.pluck(Arel.sql(sanitized_numeric_property))
end

def prorated_events_values
# TODO
def prorated_events_values(total_duration)
ratio_sql = duration_ratio_sql('events_raw.timestamp', to_datetime, total_duration)

events.group(DEDUPLICATION_GROUP)
.pluck(Arel.sql("#{sanitized_numeric_property} * (#{ratio_sql})"))
end

def count
Expand Down Expand Up @@ -63,15 +67,64 @@ def last
end

def sum
# TODO
cte_sql = events.group(DEDUPLICATION_GROUP)
.select(Arel.sql("#{sanitized_numeric_property} AS property"))
.to_sql

sql = <<-SQL
with events as (#{cte_sql})
select sum(events.property)
from events
SQL

Clickhouse::EventsRaw.connection.select_value(sql)
end

def prorated_sum
# TODO
def prorated_sum(period_duration:, persisted_duration: nil)
ratio = if persisted_duration
persisted_duration.fdiv(period_duration)
else
duration_ratio_sql('events_raw.timestamp', to_datetime, period_duration)
end

cte_sql = events
.reorder('')
.group(DEDUPLICATION_GROUP)
.select(Arel.sql("(#{sanitized_numeric_property}) * (#{ratio}) AS prorated_value"))
.to_sql

sql = <<-SQL
with events as (#{cte_sql})
select sum(events.prorated_value)
from events
SQL

Clickhouse::EventsRaw.connection.select_value(sql)
end

def sum_date_breakdown
# TODO
date_field = date_in_customer_timezone_sql('events_raw.timestamp')

cte_sql = events.group(DEDUPLICATION_GROUP)
.select("toDate(#{date_field}) as day, #{sanitized_numeric_property} as property")
.to_sql

sql = <<-SQL
with events as (#{cte_sql})
select
events.day,
sum(events.property) as day_sum
from events
group by events.day
order by events.day asc
SQL

Clickhouse::EventsRaw.connection.select_all(Arel.sql(sql)).rows.map do |row|
{ date: row.first.to_date, value: row.last }
end
end

private
Expand All @@ -98,6 +151,27 @@ def sanitized_numeric_property
['toDecimal128(events_raw.properties[?], ?)', aggregation_property, DECIMAL_SCALE],
)
end

def date_in_customer_timezone_sql(date)
sql = if date.is_a?(String)
"toTimezone(#{date}, :timezone)"
else
"toTimezone(toDateTime64(:date, 5, 'UTC'), :timezone)"
end

ActiveRecord::Base.sanitize_sql_for_conditions(
[sql, { date:, timezone: customer.applicable_timezone }],
)
end

# NOTE: Compute pro-rata of the duration in days between the datetimes over the duration of the billing period
# Dates are in customer timezone to make sure the duration is good
def duration_ratio_sql(from, to, duration)
from_in_timezone = date_in_customer_timezone_sql(from)
to_in_timezone = date_in_customer_timezone_sql(to)

"(date_diff('days', #{from_in_timezone}, #{to_in_timezone}) + 1) / #{duration}"
end
end
end
end
3 changes: 3 additions & 0 deletions app/services/events/stores/postgres_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def sum_date_breakdown
events.group(Arel.sql("DATE(#{date_field})"))
.reorder(Arel.sql("DATE(#{date_field}) ASC"))
.pluck(Arel.sql("DATE(#{date_field}) AS date, SUM((#{sanitized_propery_name})::numeric)"))
.map do |row|
{ date: row.first.to_date, value: row.last }
end
end

private
Expand Down
2 changes: 0 additions & 2 deletions db/migrate/20231017082921_fill_cached_aggregations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ class BillableMetric < ApplicationRecord
has_many :charges
end

class Charge < ApplicationRecord; end

def change
reversible do |dir|
dir.up do
Expand Down
54 changes: 54 additions & 0 deletions spec/services/events/stores/clickhouse_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@
end
end

describe '.prorated_events_values' do
it 'returns the values attached to each event with prorata on period duration' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_events_values(31).map { |v| v.round(3) }).to eq(
[0.516, 0.968, 1.355, 1.677, 1.935],
)
end
end

describe '.max' do
it 'returns the max value' do
event_store.aggregation_property = billable_metric.field_name
Expand All @@ -114,4 +125,47 @@
expect(event_store.last).to eq(5)
end
end

describe '.sum' do
it 'returns the sum of event properties' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.sum).to eq(15)
end
end

describe '.prorated_sum' do
it 'returns the prorated sum of event properties' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31).round(5)).to eq(6.45161)
end

context 'with persisted_duration' do
it 'returns the prorated sum of event properties' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31, persisted_duration: 10).round(5)).to eq(4.83871)
end
end
end

describe '.sum_date_breakdown' do
it 'returns the sum grouped by day' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.sum_date_breakdown).to eq(
events.map do |e|
{
date: e.timestamp.to_date,
value: e.properties[billable_metric.field_name].to_f,
}
end,
)
end
end
end
60 changes: 57 additions & 3 deletions spec/services/events/stores/postgres_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
end

let(:billable_metric) { create(:billable_metric) }
let(:billable_metric) { create(:billable_metric, field_name: 'value') }
let(:organization) { billable_metric.organization }

let(:customer) { create(:customer, organization:) }
Expand All @@ -23,8 +23,8 @@

let(:boundaries) do
{
charges_from_datetime: subscription.started_at.beginning_of_day,
charges_to_datetime: subscription.started_at.end_of_month.end_of_day,
from_datetime: subscription.started_at.beginning_of_day,
to_datetime: subscription.started_at.end_of_month.end_of_day,
}
end

Expand Down Expand Up @@ -89,6 +89,17 @@
end
end

describe '.prorated_events_values' do
it 'returns the value attached to each event prorated on the provided duration' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_events_values(31).map { |v| v.round(3) }).to eq(
[0.516, 0.968, 1.355, 1.677, 1.935],
)
end
end

describe '.max' do
it 'returns the max value' do
event_store.aggregation_property = billable_metric.field_name
Expand All @@ -106,4 +117,47 @@
expect(event_store.last).to eq(5)
end
end

describe '.sum' do
it 'returns the sum of event values' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.sum).to eq(15)
end
end

describe '.prorated_sum' do
it 'returns the prorated sum of event properties' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31).round(5)).to eq(6.45161)
end

context 'with persisted_duration' do
it 'returns the prorated sum of event properties' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.prorated_sum(period_duration: 31, persisted_duration: 10).round(5)).to eq(4.83871)
end
end
end

describe '.sum_date_breakdown' do
it 'returns the sum grouped by day' do
event_store.aggregation_property = billable_metric.field_name
event_store.numeric_property = true

expect(event_store.sum_date_breakdown).to eq(
events.map do |e|
{
date: e.timestamp.to_date,
value: e.properties[billable_metric.field_name],
}
end,
)
end
end
end

0 comments on commit f374f42

Please sign in to comment.