From 91d0bd0bafc558d5686acf966d46391ca842c0ae Mon Sep 17 00:00:00 2001 From: Erica Porter Date: Tue, 19 Dec 2023 11:15:22 +0000 Subject: [PATCH] Checksum ordered by update (#102) * Update ordered_by to use created_at or ID if updated_at doesn't exist --- lib/dfe/analytics/entity_table_check_job.rb | 55 +++++++++++++------ .../analytics/entity_table_check_job_spec.rb | 10 +++- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/lib/dfe/analytics/entity_table_check_job.rb b/lib/dfe/analytics/entity_table_check_job.rb index 5356325b..c0ce27f4 100644 --- a/lib/dfe/analytics/entity_table_check_job.rb +++ b/lib/dfe/analytics/entity_table_check_job.rb @@ -13,12 +13,16 @@ def perform return unless supported_adapter_and_environment? DfE::Analytics.entities_for_analytics.each do |entity| - entity_table_check_event = build_event_for(entity) + next unless id_column_exists_for_entity?(entity) + + order_column = determine_order_column(entity) + + entity_table_check_event = build_event_for(entity, order_column) DfE::Analytics::SendEvents.perform_later([entity_table_check_event]) if entity_table_check_event.present? end end - def build_event_for(entity) + def build_event_for(entity, order_column) unless DfE::Analytics.models_for_entity(entity).any? Rails.logger.info("DfE::Analytics NOT Processing entity: #{entity} - No associated models") return @@ -27,7 +31,7 @@ def build_event_for(entity) DfE::Analytics::Event.new .with_type('entity_table_check') .with_entity_table_name(entity) - .with_data(entity_table_check_data(entity)) + .with_data(entity_table_check_data(entity, order_column)) .as_json end @@ -35,15 +39,16 @@ def adapter_name @adapter_name ||= ActiveRecord::Base.connection.adapter_name.downcase end - def entity_table_check_data(entity) + def entity_table_check_data(entity, order_column) checksum_calculated_at = fetch_current_timestamp_in_time_zone - row_count, checksum = fetch_checksum_data(entity, checksum_calculated_at) + row_count, checksum = fetch_checksum_data(entity, checksum_calculated_at, order_column) Rails.logger.info("DfE::Analytics Processing entity: #{entity}: Row count: #{row_count}") { row_count: row_count, checksum: checksum, - checksum_calculated_at: checksum_calculated_at + checksum_calculated_at: checksum_calculated_at, + order_column: order_column } end @@ -60,26 +65,44 @@ def fetch_current_timestamp_in_time_zone result.first['current_timestamp'].in_time_zone(TIME_ZONE).iso8601(6) end - def fetch_checksum_data(entity, checksum_calculated_at) + def fetch_checksum_data(entity, checksum_calculated_at, order_column) table_name_sanitized = ActiveRecord::Base.connection.quote_table_name(entity) checksum_calculated_at_sanitized = ActiveRecord::Base.connection.quote(checksum_calculated_at) if adapter_name == 'postgresql' - fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized) + fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column) + else + fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column) + end + end + + def id_column_exists_for_entity?(entity) + return true if ActiveRecord::Base.connection.column_exists?(entity, :id) + + Rails.logger.info("DfE::Analytics: Entity checksum: ID column missing in #{entity} - Skipping checks") + + false + end + + def determine_order_column(entity) + if ActiveRecord::Base.connection.column_exists?(entity, :updated_at) + 'UPDATED_AT' + elsif ActiveRecord::Base.connection.column_exists?(entity, :created_at) + 'CREATED_AT' else - fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized) + 'ID' end end - def fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized) + def fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column) checksum_sql_query = <<-SQL SELECT COUNT(*) as row_count, - MD5(COALESCE(STRING_AGG(CHECKSUM_TABLE.ID, '' ORDER BY CHECKSUM_TABLE.UPDATED_AT ASC), '')) as checksum + MD5(COALESCE(STRING_AGG(CHECKSUM_TABLE.ID, '' ORDER BY CHECKSUM_TABLE.#{order_column} ASC), '')) as checksum FROM ( SELECT #{table_name_sanitized}.id::TEXT as ID, - #{table_name_sanitized}.updated_at as UPDATED_AT + #{table_name_sanitized}.#{order_column} as #{order_column} FROM #{table_name_sanitized} - WHERE #{table_name_sanitized}.updated_at < #{checksum_calculated_at_sanitized} + WHERE #{table_name_sanitized}.#{order_column} < #{checksum_calculated_at_sanitized} ) CHECKSUM_TABLE SQL @@ -87,12 +110,12 @@ def fetch_postgresql_checksum_data(table_name_sanitized, checksum_calculated_at_ [result['row_count'].to_i, result['checksum']] end - def fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized) + def fetch_generic_checksum_data(table_name_sanitized, checksum_calculated_at_sanitized, order_column) checksum_sql_query = <<-SQL SELECT #{table_name_sanitized}.ID FROM #{table_name_sanitized} - WHERE #{table_name_sanitized}.UPDATED_AT < #{checksum_calculated_at_sanitized} - ORDER BY #{table_name_sanitized}.UPDATED_AT ASC + WHERE #{table_name_sanitized}.#{order_column} < #{checksum_calculated_at_sanitized} + ORDER BY #{table_name_sanitized}.#{order_column} ASC SQL table_ids = ActiveRecord::Base.connection.execute(checksum_sql_query).pluck('id') diff --git a/spec/dfe/analytics/entity_table_check_job_spec.rb b/spec/dfe/analytics/entity_table_check_job_spec.rb index 34028b89..f8d11130 100644 --- a/spec/dfe/analytics/entity_table_check_job_spec.rb +++ b/spec/dfe/analytics/entity_table_check_job_spec.rb @@ -30,6 +30,7 @@ let(:time_now) { Time.new(2023, 9, 19, 12, 0, 0) } let(:time_zone) { 'London' } let(:checksum_calculated_at) { ActiveRecord::Base.connection.select_all('SELECT CURRENT_TIMESTAMP AS current_timestamp').first['current_timestamp'].in_time_zone('London').iso8601(6) } + let(:order_column) { 'UPDATED_AT' } before { Timecop.freeze(checksum_calculated_at) } after { Timecop.return } @@ -54,7 +55,8 @@ [ { 'key' => 'row_count', 'value' => [table_ids.size] }, { 'key' => 'checksum', 'value' => [checksum] }, - { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] } + { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }, + { 'key' => 'order_column', 'value' => [order_column] } ] })]) end @@ -74,7 +76,8 @@ 'data' => [ { 'key' => 'row_count', 'value' => [table_ids.size] }, { 'key' => 'checksum', 'value' => [checksum] }, - { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] } + { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }, + { 'key' => 'order_column', 'value' => [order_column] } ] })]) end @@ -89,7 +92,8 @@ 'data' => [ { 'key' => 'row_count', 'value' => [0] }, { 'key' => 'checksum', 'value' => [checksum] }, - { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] } + { 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }, + { 'key' => 'order_column', 'value' => [order_column] } ] })]) end