Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make backfill resilient #45

Merged
merged 5 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
/spec/dummy/storage/
/spec/dummy/tmp/
.rspec_status
.yardoc
4 changes: 4 additions & 0 deletions lib/dfe/analytics/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def with_request_uuid(request_id)
self
end

def byte_size_in_transit
as_json.to_json.size
end

private

def convert_value_to_json(value)
Expand Down
24 changes: 14 additions & 10 deletions lib/dfe/analytics/load_entities.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@
module DfE
module Analytics
class LoadEntities
DEFAULT_BATCH_SIZE = 200
# from https://cloud.google.com/bigquery/quotas#streaming_inserts
BQ_BATCH_ROWS = 500

def initialize(entity_name:, batch_size: DEFAULT_BATCH_SIZE)
def initialize(entity_name:)
@entity_name = entity_name
@batch_size = batch_size.to_i
end

def run
model = DfE::Analytics.model_for_entity(@entity_name)
Rails.logger.info("Processing data for #{@entity_name} with row count #{model.count}")

batch_number = 0
unless model.any?
Rails.logger.info("No entities to process for #{@entity_name}")
return
end

model.order(:id).in_batches(of: @batch_size) do |relation|
batch_number += 1
Rails.logger.info("Processing data for #{@entity_name} with row count #{model.count}")

ids = relation.pluck(:id)
batch_count = 0

DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids, batch_number)
model.in_batches(of: BQ_BATCH_ROWS) do |relation|
batch_count += 1
ids = relation.pluck(:id)
DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids)
end

Rails.logger.info "Enqueued #{batch_number} batches of #{@batch_size} #{@entity_name} for importing to BigQuery"
Rails.logger.info "Enqueued #{batch_count} batches of #{BQ_BATCH_ROWS} #{@entity_name} records for importing to BigQuery"
end
end
end
Expand Down
24 changes: 20 additions & 4 deletions lib/dfe/analytics/load_entity_batch.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
module DfE
module Analytics
class LoadEntityBatch < AnalyticsJob
def perform(model_class, ids, batch_number)
# https://cloud.google.com/bigquery/quotas#streaming_inserts
# at a batch size of 500, this allows 20kb per record
BQ_BATCH_MAX_BYTES = 10_000_000

def perform(model_class_arg, ids)
# Support string args for Rails < 6.1
model_class = model_class.constantize if model_class.respond_to?(:constantize)
model_class = if model_class_arg.respond_to?(:constantize)
model_class_arg.constantize
else
model_class_arg
end

events = model_class.where(id: ids).map do |record|
DfE::Analytics::Event.new
Expand All @@ -12,9 +20,17 @@ def perform(model_class, ids, batch_number)
.with_data(DfE::Analytics.extract_model_attributes(record))
end

DfE::Analytics::SendEvents.do(events.as_json)
payload_byte_size = events.sum(&:byte_size_in_transit)

Rails.logger.info "Enqueued batch #{batch_number} of #{model_class.table_name}"
# if we overrun the max batch size, recurse with each half of the list
if payload_byte_size > BQ_BATCH_MAX_BYTES
ids.each_slice((ids.size / 2.0).round).to_a.each do |half_batch|
Rails.logger.info "Halving batch of size #{payload_byte_size} for #{model_class.name}"
self.class.perform_later(model_class_arg, half_batch)
end
else
DfE::Analytics::SendEvents.perform_now(events.as_json)
end
end
end
end
Expand Down
11 changes: 7 additions & 4 deletions lib/dfe/analytics/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ def switch_test_mode(test_mode)
raise('Invalid test mode for DfE::Analytics') unless %i[fake webmock].include? test_mode

if block_given?
old_test_mode = @test_mode
@test_mode = test_mode
yield
@test_mode = old_test_mode
begin
old_test_mode = @test_mode
@test_mode = test_mode
yield
ensure
@test_mode = old_test_mode
end
else
@test_mode = test_mode
end
Expand Down
14 changes: 8 additions & 6 deletions spec/dfe/analytics/load_entities_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# autogenerate a compliant blocklist
allow(DfE::Analytics).to receive(:blocklist).and_return(DfE::Analytics::Fields.generate_blocklist)

allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

DfE::Analytics.initialize!
end
Expand All @@ -37,7 +37,8 @@

described_class.new(entity_name: Candidate.table_name).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).twice do |payload|
# import process
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once do |payload|
schema = DfE::Analytics::EventSchema.new.as_json
schema_validator = JSONSchemaValidator.new(schema, payload.first)

Expand All @@ -50,11 +51,12 @@
end

it 'can work in batches' do
Candidate.create
Candidate.create
stub_const('DfE::Analytics::LoadEntities::BQ_BATCH_ROWS', 2)

described_class.new(entity_name: Candidate.table_name, batch_size: 2).run
3.times { Candidate.create }

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).exactly(3).times
described_class.new(entity_name: Candidate.table_name).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).exactly(2).times
end
end
34 changes: 27 additions & 7 deletions spec/dfe/analytics/load_entity_batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,52 @@
end

before do
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

allow(DfE::Analytics).to receive(:allowlist).and_return({
Candidate.table_name.to_sym => ['email_address']
})
end

around do |ex|
it 'splits a batch when the batch is too big' do
perform_enqueued_jobs do
ex.run
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntityBatch::BQ_BATCH_MAX_BYTES', 250)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).twice
end
end

it 'doesn’t split a batch unless it has to' do
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntityBatch::BQ_BATCH_MAX_BYTES', 500)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

it 'accepts its first arg as a String to support Rails < 6.1' do
# Rails 6.1rc1 added support for deserializing Class and Module params
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later('Candidate', [c.id], 1)
described_class.perform_now('Candidate', [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

if Gem::Version.new(Rails.version) >= Gem::Version.new('6.1')
it 'accepts its first arg as a Class' do
# backwards compatability with existing enqueued jobs
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later(Candidate, [c.id], 1)
described_class.perform_now(Candidate, [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end
end
end