Skip to content

Commit

Permalink
Batch events in a way that is sensitive to BQ quotas
Browse files Browse the repository at this point in the history
BQ accepts max 10MB per request and recommends batches of 500.

This allows 20kb per event.

Batch everything in 500s. If a given batch payload exceeds 10MB, split
it before sending.
  • Loading branch information
duncanjbrown committed Sep 16, 2022
1 parent 56ccc8c commit d40d0ff
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 28 deletions.
24 changes: 14 additions & 10 deletions lib/dfe/analytics/load_entities.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@
module DfE
module Analytics
class LoadEntities
DEFAULT_BATCH_SIZE = 200
# from https://cloud.google.com/bigquery/quotas#streaming_inserts
BQ_BATCH_ROWS = 500

def initialize(entity_name:, batch_size: DEFAULT_BATCH_SIZE)
def initialize(entity_name:)
@entity_name = entity_name
@batch_size = batch_size.to_i
end

def run
model = DfE::Analytics.model_for_entity(@entity_name)
Rails.logger.info("Processing data for #{@entity_name} with row count #{model.count}")

batch_number = 0
unless model.any?
Rails.logger.info("No entities to process for #{@entity_name}")
return
end

model.order(:id).in_batches(of: @batch_size) do |relation|
batch_number += 1
Rails.logger.info("Processing data for #{@entity_name} with row count #{model.count}")

ids = relation.pluck(:id)
batch_count = 0

DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids, batch_number)
model.in_batches(of: BQ_BATCH_ROWS) do |relation|
batch_count += 1
ids = relation.pluck(:id)
DfE::Analytics::LoadEntityBatch.perform_later(model.to_s, ids)
end

Rails.logger.info "Enqueued #{batch_number} batches of #{@batch_size} #{@entity_name} for importing to BigQuery"
Rails.logger.info "Enqueued #{batch_count} batches of #{BQ_BATCH_ROWS} #{@entity_name} records for importing to BigQuery"
end
end
end
Expand Down
18 changes: 15 additions & 3 deletions lib/dfe/analytics/load_entity_batch.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
module DfE
module Analytics
class LoadEntityBatch < AnalyticsJob
def perform(model_class, ids, batch_number)
# https://cloud.google.com/bigquery/quotas#streaming_inserts
# at a batch size of 500, this allows 20kb per record
BQ_BATCH_MAX_BYTES = 10_000_000

def perform(model_class, ids)
# Support string args for Rails < 6.1
model_class = model_class.constantize if model_class.respond_to?(:constantize)

Expand All @@ -12,9 +16,17 @@ def perform(model_class, ids, batch_number)
.with_data(DfE::Analytics.extract_model_attributes(record))
end

DfE::Analytics::SendEvents.perform_now(events.as_json)
payload_byte_size = events.sum(&:byte_size_in_transit)

Rails.logger.info "Enqueued batch #{batch_number} of #{model_class.table_name}"
# just in case we overrun the max batch size
if payload_byte_size > BQ_BATCH_MAX_BYTES
events.each_slice((events.size / 2.0).round).to_a.each do |half_batch|
Rails.logger.info "Halving batch of size #{payload_byte_size} for #{model_class.name}"
DfE::Analytics::SendEvents.perform_now(half_batch.as_json)
end
else
DfE::Analytics::SendEvents.perform_now(events.as_json)
end
end
end
end
Expand Down
14 changes: 8 additions & 6 deletions spec/dfe/analytics/load_entities_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# autogenerate a compliant blocklist
allow(DfE::Analytics).to receive(:blocklist).and_return(DfE::Analytics::Fields.generate_blocklist)

allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

DfE::Analytics.initialize!
end
Expand All @@ -37,7 +37,8 @@

described_class.new(entity_name: Candidate.table_name).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).twice do |payload|
# import process
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once do |payload|
schema = DfE::Analytics::EventSchema.new.as_json
schema_validator = JSONSchemaValidator.new(schema, payload.first)

Expand All @@ -50,11 +51,12 @@
end

it 'can work in batches' do
Candidate.create
Candidate.create
stub_const('DfE::Analytics::LoadEntities::BQ_BATCH_ROWS', 2)

described_class.new(entity_name: Candidate.table_name, batch_size: 2).run
3.times { Candidate.create }

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).exactly(3).times
described_class.new(entity_name: Candidate.table_name).run

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).exactly(2).times
end
end
36 changes: 27 additions & 9 deletions spec/dfe/analytics/load_entity_batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,48 @@
end

before do
allow(DfE::Analytics::SendEvents).to receive(:perform_later)
allow(DfE::Analytics::SendEvents).to receive(:perform_now)

allow(DfE::Analytics).to receive(:allowlist).and_return({
Candidate.table_name.to_sym => ['email_address']
})
end

around do |ex|
perform_enqueued_jobs do
ex.run
end
it 'splits a batch when the batch is too big' do
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntityBatch::BQ_BATCH_MAX_BYTES', 250)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).twice
end

it 'doesn’t split a batch unless it has to' do
c = Candidate.create(email_address: '12345678910')
c2 = Candidate.create(email_address: '12345678910')
stub_const('DfE::Analytics::LoadEntityBatch::BQ_BATCH_MAX_BYTES', 500)

described_class.perform_now('Candidate', [c.id, c2.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

it 'accepts its first arg as a String to support Rails < 6.1' do
# Rails 6.1rc1 added support for deserializing Class and Module params
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later('Candidate', [c.id], 1)
described_class.perform_now('Candidate', [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end

it 'accepts its first arg as a Class' do
# backwards compatability with existing enqueued jobs
c = Candidate.create(email_address: 'foo@example.com')

described_class.perform_later(Candidate, [c.id], 1)
described_class.perform_now(Candidate, [c.id])

expect(DfE::Analytics::SendEvents).to have_received(:perform_later).once
expect(DfE::Analytics::SendEvents).to have_received(:perform_now).once
end
end

0 comments on commit d40d0ff

Please sign in to comment.