Skip to content

Commit

Permalink
Batch events in a way that is sensitive to BQ byte limits
Browse files Browse the repository at this point in the history
  • Loading branch information
duncanjbrown committed Sep 1, 2022
1 parent 56ccc8c commit e99a5df
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 26 deletions.
32 changes: 24 additions & 8 deletions lib/dfe/analytics/load_entities.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,44 @@
module DfE
module Analytics
class LoadEntities
DEFAULT_BATCH_SIZE = 200
BQ_BATCH_MAX_BYTES = 10_000

def initialize(entity_name:, batch_size: DEFAULT_BATCH_SIZE)
def initialize(entity_name:)
@entity_name = entity_name
@batch_size = batch_size.to_i
end

def run
model = DfE::Analytics.model_for_entity(@entity_name)

unless model.any?
Rails.logger.info("No entities to process for #{@entity_name}")
return
end

Rails.logger.info("Processing data for #{@entity_name} with row count #{model.count}")

batch_number = 0
# sample 10% of rows for max size, which we’ll use to calculate batch size
sample_events = model.order(:rand).take([model.count / 10, 10].max).map do |m|
DfE::Analytics::Event.new
.with_type('import_entity')
.with_entity_table_name(m.class.table_name)
.with_data(DfE::Analytics.extract_model_attributes(m))
end

max_byte_size = sample_events.map(&:byte_size_in_transit).max
records_per_batch = BQ_BATCH_MAX_BYTES / max_byte_size

batch_count = 0

model.order(:id).in_batches(of: @batch_size) do |relation|
batch_number += 1
model.in_batches(of: records_per_batch) do |relation|
batch_count += 1

ids = relation.pluck(:id)

DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids, batch_number)
DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids)
end

Rails.logger.info "Enqueued #{batch_number} batches of #{@batch_size} #{@entity_name} for importing to BigQuery"
Rails.logger.info "Enqueued #{batch_count} batches of #{records_per_batch} #{@entity_name} for importing to BigQuery"
end
end
end
Expand Down
14 changes: 11 additions & 3 deletions lib/dfe/analytics/load_entity_batch.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module DfE
module Analytics
class LoadEntityBatch < AnalyticsJob
def perform(model_class, ids, batch_number)
def perform(model_class, ids)
# Support string args for Rails < 6.1
model_class = model_class.constantize if model_class.respond_to?(:constantize)

Expand All @@ -12,9 +12,17 @@ def perform(model_class, ids, batch_number)
.with_data(DfE::Analytics.extract_model_attributes(record))
end

DfE::Analytics::SendEvents.perform_now(events.as_json)
payload_byte_size = events.sum(&:byte_size_in_transit)

Rails.logger.info "Enqueued batch #{batch_number} of #{model_class.table_name}"
# just in case we overrun the max batch size
if payload_byte_size > DfE::Analytics::LoadEntities::BQ_BATCH_MAX_BYTES
events.each_slice((events.size / 2.0).round).to_a.each do |half_batch|
Rails.logger.info "Halving batch of size #{payload_byte_size} for #{model_class.name}"
DfE::Analytics::SendEvents.perform_now(half_batch.as_json)
end
else
DfE::Analytics::SendEvents.perform_now(events.as_json)
end
end
end
end
Expand Down
20 changes: 14 additions & 6 deletions spec/dfe/analytics/load_entities_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# autogenerate a compliant blocklist
allow(DfE::Analytics).to receive(:blocklist).and_return(DfE::Analytics::Fields.generate_blocklist)

allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

DfE::Analytics.initialize!
end
Expand All @@ -37,7 +37,8 @@

described_class.new(entity_name: Candidate.table_name).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).twice do |payload|
# import process
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once do |payload|
schema = DfE::Analytics::EventSchema.new.as_json
schema_validator = JSONSchemaValidator.new(schema, payload.first)

Expand All @@ -50,11 +51,18 @@
end

it 'can work in batches' do
Candidate.create
Candidate.create
# each of these records is 211 bytes on the wire
9.times { Candidate.create(email_address: '1234567890') }

described_class.new(entity_name: Candidate.table_name, batch_size: 2).run
very_long_string = '1234567890' * 10

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).exactly(3).times
Candidate.create(email_address: very_long_string) # 90 extra bytes = 301 bytes for this record

stub_const('DfE::Analytics::LoadEntities::BQ_BATCH_MAX_BYTES', 400) # will fit 2 small records, but only one big one

described_class.new(entity_name: Candidate.table_name).run

# one batch per record, because the largest record will only fit once per BQ_BATCH_MAX_BYTES
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).exactly(10).times
end
end
36 changes: 27 additions & 9 deletions spec/dfe/analytics/load_entity_batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,48 @@
end

before do
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

allow(DfE::Analytics).to receive(:allowlist).and_return({
Candidate.table_name.to_sym => ['email_address']
})
end

around do |ex|
perform_enqueued_jobs do
ex.run
end
it 'splits a batch when the batch is too big' do
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntities::BQ_BATCH_MAX_BYTES', 250)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).twice
end

it 'doesn’t split a batch unless it has to' do
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntities::BQ_BATCH_MAX_BYTES', 500)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

it 'accepts its first arg as a String to support Rails < 6.1' do
# Rails 6.1rc1 added support for deserializing Class and Module params
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later('Candidate', [c.id], 1)
described_class.perform_now('Candidate', [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

it 'accepts its first arg as a Class' do
# backwards compatability with existing enqueued jobs
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later(Candidate, [c.id], 1)
described_class.perform_now(Candidate, [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end
end

0 comments on commit e99a5df

Please sign in to comment.