Skip to content

Commit

Permalink
Checksum ordered by update (#102)
Browse files Browse the repository at this point in the history
* Update ordered_by to use created_at or ID if updated_at doesn't exist
  • Loading branch information
ericaporter authored Dec 19, 2023
1 parent 06b9d7b commit 91d0bd0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
55 changes: 39 additions & 16 deletions lib/dfe/analytics/entity_table_check_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ def perform
return unless supported_adapter_and_environment?

DfE::Analytics.entities_for_analytics.each do |entity|
entity_table_check_event = build_event_for(entity)
next unless id_column_exists_for_entity?(entity)

order_column = determine_order_column(entity)

entity_table_check_event = build_event_for(entity, order_column)
DfE::Analytics::SendEvents.perform_later([entity_table_check_event]) if entity_table_check_event.present?
end
end

def build_event_for(entity)
def build_event_for(entity, order_column)
unless DfE::Analytics.models_for_entity(entity).any?
Rails.logger.info("DfE::Analytics NOT Processing entity: #{entity} - No associated models")
return
Expand All @@ -27,23 +31,24 @@ def build_event_for(entity)
DfE::Analytics::Event.new
.with_type('entity_table_check')
.with_entity_table_name(entity)
.with_data(entity_table_check_data(entity))
.with_data(entity_table_check_data(entity, order_column))
.as_json
end

def adapter_name
@adapter_name ||= ActiveRecord::Base.connection.adapter_name.downcase
end

def entity_table_check_data(entity)
def entity_table_check_data(entity, order_column)
checksum_calculated_at = fetch_current_timestamp_in_time_zone

row_count, checksum = fetch_checksum_data(entity, checksum_calculated_at)
row_count, checksum = fetch_checksum_data(entity, checksum_calculated_at, order_column)
Rails.logger.info("DfE::Analytics Processing entity: #{entity}: Row count: #{row_count}")
{
row_count: row_count,
checksum: checksum,
checksum_calculated_at: checksum_calculated_at
checksum_calculated_at: checksum_calculated_at,
order_column: order_column
}
end

Expand All @@ -60,39 +65,57 @@ def fetch_current_timestamp_in_time_zone
result.first['current_timestamp'].in_time_zone(TIME_ZONE).iso8601(6)
end

def fetch_checksum_data(entity, checksum_calculated_at)
def fetch_checksum_data(entity, checksum_calculated_at, order_column)
table_name_sanitized = ActiveRecord::Base.connection.quote_table_name(entity)
checksum_calculated_at_sanitized = ActiveRecord::Base.connection.quote(checksum_calculated_at)

if adapter_name == 'postgresql'
fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized)
fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column)
else
fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column)
end
end

def id_column_exists_for_entity?(entity)
return true if ActiveRecord::Base.connection.column_exists?(entity, :id)

Rails.logger.info("DfE::Analytics: Entity checksum: ID column missing in #{entity} - Skipping checks")

false
end

def determine_order_column(entity)
if ActiveRecord::Base.connection.column_exists?(entity, :updated_at)
'UPDATED_AT'
elsif ActiveRecord::Base.connection.column_exists?(entity, :created_at)
'CREATED_AT'
else
fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized)
'ID'
end
end

def fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized)
def fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column)
checksum_sql_query = <<-SQL
SELECT COUNT(*) as row_count,
MD5(COALESCE(STRING_AGG(CHECKSUM_TABLE.ID, '' ORDER BY CHECKSUM_TABLE.UPDATED_AT ASC), '')) as checksum
MD5(COALESCE(STRING_AGG(CHECKSUM_TABLE.ID, '' ORDER BY CHECKSUM_TABLE.#{order_column} ASC), '')) as checksum
FROM (
SELECT #{table_name_sanitized}.id::TEXT as ID,
#{table_name_sanitized}.updated_at as UPDATED_AT
#{table_name_sanitized}.#{order_column} as #{order_column}
FROM #{table_name_sanitized}
WHERE #{table_name_sanitized}.updated_at < #{checksum_calculated_at_sanitized}
WHERE #{table_name_sanitized}.#{order_column} < #{checksum_calculated_at_sanitized}
) CHECKSUM_TABLE
SQL

result = ActiveRecord::Base.connection.execute(checksum_sql_query).first
[result['row_count'].to_i, result['checksum']]
end

def fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized)
def fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column)
checksum_sql_query = <<-SQL
SELECT #{table_name_sanitized}.ID
FROM #{table_name_sanitized}
WHERE #{table_name_sanitized}.UPDATED_AT < #{checksum_calculated_at_sanitized}
ORDER BY #{table_name_sanitized}.UPDATED_AT ASC
WHERE #{table_name_sanitized}.#{order_column} < #{checksum_calculated_at_sanitized}
ORDER BY #{table_name_sanitized}.#{order_column} ASC
SQL

table_ids = ActiveRecord::Base.connection.execute(checksum_sql_query).pluck('id')
Expand Down
10 changes: 7 additions & 3 deletions spec/dfe/analytics/entity_table_check_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
let(:time_now) { Time.new(2023, 9, 19, 12, 0, 0) }
let(:time_zone) { 'London' }
let(:checksum_calculated_at) { ActiveRecord::Base.connection.select_all('SELECT CURRENT_TIMESTAMP AS current_timestamp').first['current_timestamp'].in_time_zone('London').iso8601(6) }
let(:order_column) { 'UPDATED_AT' }

before { Timecop.freeze(checksum_calculated_at) }
after { Timecop.return }
Expand All @@ -54,7 +55,8 @@
[
{ 'key' => 'row_count', 'value' => [table_ids.size] },
{ 'key' => 'checksum', 'value' => [checksum] },
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] },
{ 'key' => 'order_column', 'value' => [order_column] }
]
})])
end
Expand All @@ -74,7 +76,8 @@
'data' => [
{ 'key' => 'row_count', 'value' => [table_ids.size] },
{ 'key' => 'checksum', 'value' => [checksum] },
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] },
{ 'key' => 'order_column', 'value' => [order_column] }
]
})])
end
Expand All @@ -89,7 +92,8 @@
'data' => [
{ 'key' => 'row_count', 'value' => [0] },
{ 'key' => 'checksum', 'value' => [checksum] },
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] },
{ 'key' => 'order_column', 'value' => [order_column] }
]
})])
end
Expand Down

0 comments on commit 91d0bd0

Please sign in to comment.