Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nightly checksum and checksum mismatch detection assertion #85

Merged
merged 13 commits into from
Sep 27, 2023
Merged
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ database is only in `analytics.yml`. This would result in the web request
version being pseudonymised and the database version not being pseudonymised,
which means they can't be joined up.

### Entity Table Check Job

The entity table check job will run every night and sends data to verify that the latest version of an entity table in BigQuery matches the database. The job is defaulted to false but can be changed by updating the configuration option in
`config/initializers/dfe_analytics.rb`:

```ruby
config.entity_table_checks_enabled = true
```

## Testing

### Testing modes
Expand Down
4 changes: 4 additions & 0 deletions config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ en:
description: |
Whether to pseudonymise the user_id field in the web request event.
default: false
entity_table_checks_enabled:
description: |
Whether to run entity table checksum job.
default: false
rack_page_cached:
description: |
A proc which will be called with the rack env, and which should
Expand Down
9 changes: 8 additions & 1 deletion lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
require 'dfe/analytics/load_entities'
require 'dfe/analytics/load_entity_batch'
require 'dfe/analytics/requests'
require 'dfe/analytics/initialise'
require 'dfe/analytics/entity_table_check_job'
require 'dfe/analytics/initialisation_events'
require 'dfe/analytics/version'
require 'dfe/analytics/middleware/request_identity'
require 'dfe/analytics/middleware/send_cached_page_request_event'
Expand Down Expand Up @@ -60,6 +61,7 @@ def self.config
environment
user_identifier
pseudonymise_web_request_user_id
entity_table_checks_enabled
rack_page_cached
]

Expand All @@ -82,6 +84,7 @@ def self.configure
config.queue ||= :default
config.user_identifier ||= proc { |user| user&.id }
config.pseudonymise_web_request_user_id ||= false
config.entity_table_checks_enabled ||= false
config.rack_page_cached ||= proc { |_rack_env| false }
end

Expand Down Expand Up @@ -230,5 +233,9 @@ def self.user_identifier(user)
def self.rack_page_cached?(rack_env)
config.rack_page_cached.call(rack_env)
end

def self.entity_table_checks_enabled?
config.entity_table_checks_enabled
end
end
end
47 changes: 47 additions & 0 deletions lib/dfe/analytics/entity_table_check_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require 'active_support/values/time_zone'

module DfE
module Analytics
# Reschedules to run every 24hours
class EntityTableCheckJob < AnalyticsJob
WAIT_TIME = Date.tomorrow.midnight
TIME_ZONE = 'London'

def perform
DfE::Analytics.entities_for_analytics.each do |entity_name|
DfE::Analytics.models_for_entity(entity_name).each do |model|
entity_table_check_event = DfE::Analytics::Event.new
.with_type('entity_table_check')
.with_entity_table_name(model.table_name)
.with_data(entity_table_check_data(model))
.as_json
DfE::Analytics::SendEvents.perform_later([entity_table_check_event])
Rails.logger.info("Processing data for #{model.table_name} with row count #{model.count}")
end
end
ericaporter marked this conversation as resolved.
Show resolved Hide resolved
ensure
reschedule_job
end

def entity_table_check_data(model)
ericaporter marked this conversation as resolved.
Show resolved Hide resolved
checksum_calculated_at = Time.now.in_time_zone(TIME_ZONE).iso8601(6)
table_ids =
model
.where('updated_at < ?', Time.parse(checksum_calculated_at))
.order(updated_at: :asc)
.pluck(:id)
{
row_count: table_ids.count,
checksum: Digest::SHA256.hexdigest(table_ids.join),
checksum_calculated_at: checksum_calculated_at
}
end

def reschedule_job
self.class.set(wait_until: WAIT_TIME).perform_later
end
end
end
end
2 changes: 1 addition & 1 deletion lib/dfe/analytics/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module DfE
module Analytics
class Event
EVENT_TYPES = %w[
web_request create_entity update_entity delete_entity import_entity initialise_analytics
web_request create_entity update_entity delete_entity import_entity initialise_analytics entity_table_check
].freeze

def initialize
Expand Down
58 changes: 58 additions & 0 deletions lib/dfe/analytics/initialisation_events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

module DfE
module Analytics
# DfE Analytics initialisation events
# - Event should only be sent once, but NOT on startup as this causes errors on some services
# - Event contains the dfe analytics version, config and other items
class InitialisationEvents
# Disable rubocop class variable warnings for class - class variable required to control sending of event
# rubocop:disable Style:ClassVars
@@initialisation_events_sent = false # rubocop:disable Style:ClassVars

def self.trigger_initialisation_events
new.send_initialisation_events
end

def self.initialisation_events_sent?
@@initialisation_events_sent
end

def self.initialisation_events_sent=(value)
@@initialisation_events_sent = value # rubocop:disable Style:ClassVars
end

def send_initialisation_events
return unless DfE::Analytics.enabled?

initialise_analytics_event = DfE::Analytics::Event.new
.with_type('initialise_analytics')
.with_data(initialise_analytics_data)
.as_json

if DfE::Analytics.async?
DfE::Analytics::SendEvents.perform_later([initialise_analytics_event])
else
DfE::Analytics::SendEvents.perform_now([initialise_analytics_event])
end

DfE::Analytics::EntityTableCheckJob.perform_later if DfE::Analytics.entity_table_checks_enabled?

@@initialisation_events_sent = true # rubocop:disable Style:ClassVars
end

private

def initialise_analytics_data
{
analytics_version: DfE::Analytics::VERSION,
config: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to send the entity_table_checks_enabled config item here in the initialise event. I'm not sure though if Analytics dataform will use it though. What do you think @stevenleggdfe ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please @asatwal @ericaporter ! This would enable us to switch checks on/off in Dataform depending

pseudonymise_web_request_user_id: DfE::Analytics.config.pseudonymise_web_request_user_id,
entity_table_checks_enabled: DfE::Analytics.config.entity_table_checks_enabled
}
}
end
# rubocop:enable Style:ClassVars
end
end
end
55 changes: 0 additions & 55 deletions lib/dfe/analytics/initialise.rb

This file was deleted.

3 changes: 2 additions & 1 deletion lib/dfe/analytics/send_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ module Analytics
class SendEvents < AnalyticsJob
def self.do(events)
# The initialise event is a one-off event that must be sent to BigQuery once only
DfE::Analytics::Initialise.trigger_initialise_event unless DfE::Analytics::Initialise.initialise_event_sent?

DfE::Analytics::InitialisationEvents.trigger_initialisation_events unless DfE::Analytics::InitialisationEvents.initialisation_events_sent?

events = events.map { |event| event.is_a?(Event) ? event.as_json : event }

Expand Down
94 changes: 94 additions & 0 deletions spec/dfe/analytics/entity_table_check_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# frozen_string_literal: true

RSpec.describe DfE::Analytics::EntityTableCheckJob do
include ActiveJob::TestHelper

with_model :Candidate do
table do |t|
t.string :email_address
t.string :first_name
t.string :last_name
t.datetime :updated_at
end
end

before do
allow(DfE::Analytics::EntityTableCheckJob).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics).to receive(:allowlist).and_return({
Candidate.table_name.to_sym => %w[id]
})
allow(DfE::Analytics).to receive(:allowlist_pii).and_return({
Candidate.table_name.to_sym => %w[]
})
allow(Rails.logger).to receive(:info)
allow(Time).to receive(:now).and_return(time_now)
end

describe '#perform' do
let(:wait_time) { Date.tomorrow.midnight }
let(:time_now) { Time.new(2023, 9, 19, 12, 0, 0) }
let(:time_zone) { 'London' }
let(:checksum_calculated_at) { time_now.in_time_zone(time_zone).iso8601(6) }

it 'sends the entity_table_check event to BigQuery' do
[123, 124, 125].map { |id| Candidate.create(id: id) }
table_ids = Candidate.where('updated_at < ?', Time.parse(checksum_calculated_at)).order(updated_at: :asc).pluck(:id)
checksum = Digest::SHA256.hexdigest(table_ids.join)
described_class.new.perform

expect(DfE::Analytics::SendEvents).to have_received(:perform_later)
.with([a_hash_including({
'entity_table_name' => Candidate.table_name,
'event_type' => 'entity_table_check',
'data' => [
{ 'key' => 'row_count', 'value' => [table_ids.size] },
{ 'key' => 'checksum', 'value' => [checksum] },
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }
]
})])
end

it 'does not send the event if updated_at is greater than checksum_calculated_at' do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

checksum_calculated_at = Time.parse(time_now.in_time_zone(time_zone).iso8601(6))
Candidate.create(id: '123', updated_at: checksum_calculated_at - 2.hours)
Candidate.create(id: '124', updated_at: checksum_calculated_at - 5.hours)
Candidate.create(id: '125', updated_at: checksum_calculated_at + 5.hours)

table_ids = Candidate.where('updated_at < ?', checksum_calculated_at).order(updated_at: :asc).pluck(:id)
checksum = Digest::SHA256.hexdigest(table_ids.join)
described_class.new.perform

expect(DfE::Analytics::SendEvents).to have_received(:perform_later)
.with([a_hash_including({
'entity_table_name' => Candidate.table_name,
'event_type' => 'entity_table_check',
'data' => [
{ 'key' => 'row_count', 'value' => [table_ids.size] },
{ 'key' => 'checksum', 'value' => [checksum] },
{ 'key' => 'checksum_calculated_at', 'value' => [checksum_calculated_at] }
]
})])
end

it 'reschedules the job to the expected wait time' do
expected_time = Date.tomorrow.midnight.to_i
described_class.new.perform

assert_enqueued_with(job: described_class) do
described_class.set(wait_until: wait_time).perform_later
end

enqueued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.last
expect(enqueued_job[:at].to_i).to be_within(2).of(expected_time)
end

it 'logs the entity name and row count' do
Candidate.create(id: 123)
described_class.new.perform

expect(Rails.logger).to have_received(:info)
.with("Processing data for #{Candidate.table_name} with row count #{Candidate.count}")
end
end
end
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
# frozen_string_literal: true

RSpec.describe DfE::Analytics::Initialise do
RSpec.describe DfE::Analytics::InitialisationEvents do
before do
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics.config).to receive(:entity_table_checks_enabled).and_return(true)
allow(DfE::Analytics::EntityTableCheckJob).to receive(:perform_later)
allow(DfE::Analytics).to receive(:enabled?).and_return(true)
described_class.trigger_initialisation_events
end

describe 'trigger_initialise_event ' do
describe 'trigger_initialisation_events ' do
it 'includes the expected attributes' do
described_class.trigger_initialise_event

expect(DfE::Analytics::SendEvents).to have_received(:perform_later)
.with([a_hash_including({
'event_type' => 'initialise_analytics',
'data' => [
{ 'key' => 'analytics_version', 'value' => [DfE::Analytics::VERSION] },
{ 'key' => 'config',
'value' => ['{"pseudonymise_web_request_user_id":false}'] }
'value' => ['{"pseudonymise_web_request_user_id":false,"entity_table_checks_enabled":true}'] }
]
})])
end

it 'calls the entity_table_check_job' do
expect(DfE::Analytics::EntityTableCheckJob).to have_received(:perform_later)
end
end

describe '.initialise_event_sent=' do
describe '.initialisation_events_sent=' do
it 'allows setting of the class variable' do
described_class.initialise_event_sent = true
expect(described_class.initialise_event_sent?).to eq(true)
described_class.initialisation_events_sent = true
expect(described_class.initialisation_events_sent?).to eq(true)
end
end
end
Loading