Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify backfill process #21

Merged
merged 4 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2022-06-16 10:08:54 UTC using RuboCop version 1.26.1.
# on 2022-06-24 12:31:36 UTC using RuboCop version 1.26.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
Expand All @@ -14,17 +14,19 @@ Naming/FileName:
Exclude:
- 'spec/dummy/config/initializers/dfe-analytics.rb'

# Offense count: 13
# Offense count: 15
# Configuration parameters: AllowedConstants.
Style/Documentation:
Exclude:
- 'spec/**/*'
- 'test/**/*'
- 'lib/dfe/analytics.rb'
- 'lib/dfe/analytics/analytics_job.rb'
- 'lib/dfe/analytics/entities.rb'
- 'lib/dfe/analytics/event.rb'
- 'lib/dfe/analytics/event_schema.rb'
- 'lib/dfe/analytics/load_entities.rb'
- 'lib/dfe/analytics/load_entity_batch.rb'
- 'lib/dfe/analytics/requests.rb'
- 'lib/dfe/analytics/send_events.rb'
- 'lib/dfe/analytics/testing.rb'
Expand Down
8 changes: 5 additions & 3 deletions lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
require 'dfe/analytics/fields'
require 'dfe/analytics/entities'
require 'dfe/analytics/event'
require 'dfe/analytics/analytics_job'
require 'dfe/analytics/send_events'
require 'dfe/analytics/load_entities'
require 'dfe/analytics/load_entity_batch'
require 'dfe/analytics/requests'
require 'dfe/analytics/version'
require 'dfe/analytics/middleware/request_identity'
Expand Down Expand Up @@ -77,15 +79,15 @@ def self.enabled?
end

def self.allowlist
Rails.application.config_for(:analytics)
@allowlist ||= Rails.application.config_for(:analytics)
end

def self.allowlist_pii
Rails.application.config_for(:analytics_pii)
@allowlist_pii ||= Rails.application.config_for(:analytics_pii)
end

def self.blocklist
Rails.application.config_for(:analytics_blocklist)
@blocklist ||= Rails.application.config_for(:analytics_blocklist)
end

def self.environment
Expand Down
8 changes: 8 additions & 0 deletions lib/dfe/analytics/analytics_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module DfE
module Analytics
class AnalyticsJob < ActiveJob::Base
queue_as { DfE::Analytics.config.queue }
retry_on StandardError, wait: :exponentially_longer, attempts: 5
end
end
end
32 changes: 9 additions & 23 deletions lib/dfe/analytics/load_entities.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,28 @@
module DfE
module Analytics
class LoadEntities
DEFAULT_SLEEP_TIME = 2
DEFAULT_BATCH_SIZE = 200

def initialize(model_name:, start_at_id: nil, sleep_time: nil, batch_size: nil)
def initialize(model_name:, batch_size: DEFAULT_BATCH_SIZE)
@model_name = model_name
@model_class = Object.const_get(model_name)
@sleep_time = (sleep_time.presence || DEFAULT_SLEEP_TIME).to_i
@batch_size = (batch_size.presence || DEFAULT_BATCH_SIZE).to_i
@starting_id = start_at_id || 0 # enable us to complete from a known point of failure :\
@batch_size = batch_size.to_i
end

def run
Rails.logger.info("Processing data for #{@model_class.name} with row count #{@model_class.count}")

processed_so_far = 0
batch_number = 0

@model_class.order(:id).where('id >= ?', @starting_id).find_in_batches(batch_size: @batch_size) do |records|
id = records.first.id
@model_class.order(:id).in_batches(of: @batch_size) do |relation|
batch_number += 1

events = records.map do |record|
DfE::Analytics::Event.new
.with_type('import_entity')
.with_entity_table_name(@model_class.table_name)
.with_data(DfE::Analytics.extract_model_attributes(record))
end
ids = relation.pluck(:id)

DfE::Analytics::SendEvents.do(events.as_json)

processed_so_far += records.count

sleep @sleep_time
rescue StandardError => e
Rails.logger.info("Process failed while processing #{@model_class.name} within the id range #{id} to #{id + @batch_size}")
Rails.logger.info(e.message)
DfE::Analytics::LoadEntityBatch.perform_later(@model_class, ids, batch_number)
end

Rails.logger.info "Processed #{processed_so_far} records importing #{@model_class.name}"
Rails.logger.info "Enqueued #{batch_number} batches of #{@batch_size} #{@model_name} for importing to BigQuery"
end
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/dfe/analytics/load_entity_batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module DfE
module Analytics
class LoadEntityBatch < AnalyticsJob
def perform(model_class, ids, batch_number)
events = model_class.where(id: ids).map do |record|
DfE::Analytics::Event.new
.with_type('import_entity')
.with_entity_table_name(model_class.table_name)
.with_data(DfE::Analytics.extract_model_attributes(record))
end

DfE::Analytics::SendEvents.do(events.as_json)

Rails.logger.info "Enqueued batch #{batch_number} of #{model_class.table_name}"
end
end
end
end
5 changes: 1 addition & 4 deletions lib/dfe/analytics/send_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

module DfE
module Analytics
class SendEvents < ActiveJob::Base
queue_as { DfE::Analytics.config.queue }
retry_on StandardError, wait: :exponentially_longer, attempts: 5

class SendEvents < AnalyticsJob
def self.do(events)
if DfE::Analytics.async?
perform_later(events)
Expand Down
4 changes: 2 additions & 2 deletions lib/dfe/analytics/tasks/import_entities.rake
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
namespace :dfe do
namespace :analytics do
desc 'Send Analytics events for the (allowlisted) state of all records in the database'
task :import_all_entities, %i[sleep_time batch_size] => :environment do |_, args|
task :import_all_entities, %i[batch_size] => :environment do |_, args|
models_for_analytics.each do |model_name|
DfE::Analytics::LoadEntities.new(model_name: model_name, **args).run
end
end

desc 'Send Analytics events for the state of all records in a specified model'
task :import_entity, %i[model_name sleep_time batch_size start_at_id] => :environment do |_, args|
task :import_entity, %i[model_name batch_size] => :environment do |_, args|
abort('You need to specify a model name as an argument to the Rake task, eg dfe:analytics:import_entity[Model]') unless args[:model_name]

DfE::Analytics::LoadEntities.new(args).run
Expand Down
31 changes: 10 additions & 21 deletions spec/dfe/analytics/load_entities_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

RSpec.describe DfE::Analytics::LoadEntities do
include ActiveJob::TestHelper

before do
allow(DfE::Analytics).to receive(:allowlist).and_return(
{
Expand All @@ -17,10 +19,16 @@
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
end

around do |ex|
perform_enqueued_jobs do
ex.run
end
end

it 'sends a model’s fields to BQ' do
Candidate.create(email_address: 'known@address.com')

described_class.new(model_name: 'Candidate', sleep_time: 0).run
described_class.new(model_name: 'Candidate').run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later) do |payload|
schema = DfE::Analytics::EventSchema.new.as_json
Expand All @@ -34,30 +42,11 @@
end
end

it 'converts arguments values' do
Candidate.create
Candidate.create

described_class.new(model_name: 'Candidate', batch_size: '1', sleep_time: '0').run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).twice
end

it 'can work in batches' do
Candidate.create
Candidate.create

described_class.new(model_name: 'Candidate', batch_size: 1, sleep_time: 0).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).twice
end

it 'can start from an id' do
Candidate.create
Candidate.create
highest_id = Candidate.maximum(:id)

described_class.new(model_name: 'Candidate', batch_size: 1, sleep_time: 0, start_at_id: highest_id).run
described_class.new(model_name: 'Candidate', batch_size: 2).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
end
Expand Down