Skip to content

Commit

Permalink
Merge pull request #18 from onemedical/update_to_use_import
Browse files Browse the repository at this point in the history
Adding support for mysql upsert via AR import
  • Loading branch information
mitch-lindsay authored Aug 17, 2021
2 parents 929fd7f + dfac3cf commit b827696
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 179 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
Gemfile.lock

.idea/*
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,22 @@ ActsAsScrubbable.configure do |c|
end
end
```

### UPDATE VS UPSERT

By default, the scrubbing proces will run independent UPDATE statements for each scrubbed model. This can be time
consuming if you are scrubbing a large number of records. As an alternative, some
databases support doing bulk database updates using an upsert using the INSERT command. `activerecord-import`
gives us easy support for this and as such it is a requirement to using
this feature. Some details about the implementation can be found here. https://github.com/zdennis/activerecord-import#duplicate-key-update

Note that we only support the MySQL implementation at this time.

To use UPSERT over UPDATE, it can be enabled by specifying the environment variable `USE_UPSERT='true'` or through configuration.

```ruby
ActsAsScrubbable.configure do |c|
c.use_upsert = true
end
```

67 changes: 42 additions & 25 deletions lib/acts_as_scrubbable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,76 @@
require 'active_record/version'
require 'active_support/core_ext/module'
require 'acts_as_scrubbable/tasks'

require 'term/ansicolor'
require 'logger'

module ActsAsScrubbable
extend self
extend ActiveSupport::Autoload
include Term::ANSIColor

autoload :Scrubbable
autoload :Scrub
autoload :VERSION

attr_accessor :use_upsert

class << self
def configure(&block)
self.use_upsert = ENV["USE_UPSERT"] == "true"

def self.configure(&block)
yield self
yield self
end
end

def self.after_hook(&block)
def after_hook(&block)
@after_hook = block
end

def self.execute_after_hook
def execute_after_hook
@after_hook.call if @after_hook
end

def self.add(key, value)
def logger
@logger ||= begin
loggger = Logger.new($stdout)
loggger.formatter = proc do |severity, datetime, progname, msg|
"#{datetime}: [#{severity}] - #{msg}\n"
end
loggger
end
end

def add(key, value)
ActsAsScrubbable.scrub_map[key] = value
end

def self.scrub_map
def scrub_map
require 'faker'

@_scrub_map ||= {
:first_name => -> { Faker::Name.first_name },
:last_name => -> { Faker::Name.last_name },
:middle_name => -> { Faker::Name.name },
:name => -> { Faker::Name.name },
:email => -> { Faker::Internet.email },
:name_title => -> { defined? Faker::Job ? Faker::Job.title : Faker::Name.title },
:company_name => -> { Faker::Company.name },
:street_address => -> { Faker::Address.street_address },
:first_name => -> { Faker::Name.first_name },
:last_name => -> { Faker::Name.last_name },
:middle_name => -> { Faker::Name.name },
:name => -> { Faker::Name.name },
:email => -> { Faker::Internet.email },
:name_title => -> { defined? Faker::Job ? Faker::Job.title : Faker::Name.title },
:company_name => -> { Faker::Company.name },
:street_address => -> { Faker::Address.street_address },
:secondary_address => -> { Faker::Address.secondary_address },
:zip_code => -> { Faker::Address.zip_code },
:state_abbr => -> { Faker::Address.state_abbr },
:state => -> { Faker::Address.state },
:city => -> { Faker::Address.city },
:latitude => -> { Faker::Address.latitude },
:longitude => -> { Faker::Address.longitude },
:username => -> { Faker::Internet.user_name },
:boolean => -> { [true, false ].sample },
:school => -> { Faker::University.name }
:zip_code => -> { Faker::Address.zip_code },
:state_abbr => -> { Faker::Address.state_abbr },
:state => -> { Faker::Address.state },
:city => -> { Faker::Address.city },
:latitude => -> { Faker::Address.latitude },
:longitude => -> { Faker::Address.longitude },
:username => -> { Faker::Internet.user_name },
:boolean => -> { [true, false].sample },
:school => -> { Faker::University.name }
}
end
end


ActiveSupport.on_load(:active_record) do
extend ActsAsScrubbable::Scrubbable
end
36 changes: 36 additions & 0 deletions lib/acts_as_scrubbable/ar_class_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'acts_as_scrubbable/import_processor'
require 'acts_as_scrubbable/update_processor'
require 'term/ansicolor'

module ActsAsScrubbable
class ArClassProcessor

attr_reader :ar_class, :query_processor

def initialize(ar_class)
@ar_class = ar_class

if ActsAsScrubbable.use_upsert
ActsAsScrubbable.logger.info Term::ANSIColor.white("Using Upsert")
@query_processor = ImportProcessor.new(ar_class)
else
ActsAsScrubbable.logger.info Term::ANSIColor.white("Using Update")
@query_processor = UpdateProcessor.new(ar_class)
end
end

def process(num_of_batches)
ActsAsScrubbable.logger.info Term::ANSIColor.green("Scrubbing #{ar_class} ...")

num_of_batches = Integer(ENV.fetch("SCRUB_BATCHES", "256")) if num_of_batches.nil?
scrubbed_count = ActsAsScrubbable::ParallelTableScrubber.new(ar_class, num_of_batches).each_query do |query|
query_processor.scrub_query(query)
end

ActsAsScrubbable.logger.info Term::ANSIColor.blue("#{scrubbed_count} #{ar_class} objects scrubbed")
ActiveRecord::Base.connection.verify!

ActsAsScrubbable.logger.info Term::ANSIColor.white("Scrub Complete!")
end
end
end
28 changes: 28 additions & 0 deletions lib/acts_as_scrubbable/base_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module ActsAsScrubbable
module BaseProcessor
attr_reader :ar_class
private :ar_class

def initialize(ar_class)
@ar_class = ar_class
end

def scrub_query(query = nil)
scrubbed_count = 0
ActiveRecord::Base.connection_pool.with_connection do
if ar_class.respond_to?(:scrubbable_scope)
relation = ar_class.send(:scrubbable_scope)
else
relation = ar_class.all
end

relation.where(query).find_in_batches(batch_size: 1000) do |batch|
ActiveRecord::Base.transaction do
scrubbed_count += handle_batch(batch)
end
end
end
scrubbed_count
end
end
end
24 changes: 24 additions & 0 deletions lib/acts_as_scrubbable/import_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
require 'acts_as_scrubbable/base_processor'

module ActsAsScrubbable
class ImportProcessor
include BaseProcessor

private
def handle_batch(batch)
scrubbed_count = 0
batch.each do |obj|
_updates = obj.scrubbed_values
obj.assign_attributes(_updates)
scrubbed_count += 1
end
ar_class.import(
batch,
on_duplicate_key_update: ar_class.scrubbable_fields.keys.map { |x| "#{x} = values(#{x})" }.join(" , "),
validate: false,
timestamps: false
)
scrubbed_count
end
end
end
40 changes: 14 additions & 26 deletions lib/acts_as_scrubbable/parallel_table_scrubber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,28 @@

module ActsAsScrubbable
class ParallelTableScrubber
def initialize(ar_class)
attr_reader :ar_class, :num_of_batches
private :ar_class, :num_of_batches

def initialize(ar_class, num_of_batches)
@ar_class = ar_class
@num_of_batches = num_of_batches
end

def scrub(num_batches:)
def each_query
# Removing any find or initialize callbacks from model
ar_class.reset_callbacks(:initialize)
ar_class.reset_callbacks(:find)

queries = parallel_queries(ar_class: ar_class, num_batches: num_batches)
scrubbed_count = Parallel.map(queries) { |query|
scrubbed_count = 0
ActiveRecord::Base.connection_pool.with_connection do
relation = ar_class
relation = relation.send(:scrubbable_scope) if ar_class.respond_to?(:scrubbable_scope)
relation.where(query).find_in_batches(batch_size: 1000) do |batch|
ActiveRecord::Base.transaction do
batch.each do |obj|
obj.scrub!
scrubbed_count += 1
end
end
end
end
scrubbed_count
}.reduce(:+)
Parallel.map(parallel_queries) { |query|
yield(query)
}.reduce(:+) # returns the aggregated scrub count
end

private

attr_reader :ar_class

# create even ID ranges for the table
def parallel_queries(ar_class:, num_batches:)
def parallel_queries
raise "Model is missing id column" if ar_class.columns.none? { |column| column.name == "id" }

if ar_class.respond_to?(:scrubbable_scope)
Expand All @@ -45,22 +33,22 @@ def parallel_queries(ar_class:, num_batches:)
end
return [] if num_records == 0 # no records to import

record_window_size, modulus = num_records.divmod(num_batches)
record_window_size, modulus = num_records.divmod(num_of_batches)
if record_window_size < 1
record_window_size = 1
modulus = 0
end

start_id = next_id(ar_class: ar_class, offset: 0)
queries = num_batches.times.each_with_object([]) do |_, queries|
queries = num_of_batches.times.each_with_object([]) do |_, queries|
next unless start_id

end_id = next_id(ar_class: ar_class, id: start_id, offset: record_window_size-1)
end_id = next_id(ar_class: ar_class, id: start_id, offset: record_window_size - 1)
if modulus > 0
end_id = next_id(ar_class: ar_class, id: end_id)
modulus -= 1
end
queries << {id: start_id..end_id} if end_id
queries << { id: start_id..end_id } if end_id
start_id = next_id(ar_class: ar_class, id: end_id || start_id)
end

Expand Down
4 changes: 2 additions & 2 deletions lib/acts_as_scrubbable/scrub.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ActsAsScrubbable
module Scrub

def scrub!
def scrubbed_values
return unless self.class.scrubbable?

run_callbacks(:scrub) do
Expand All @@ -20,7 +20,7 @@ def scrub!
end
end

self.update_columns(_updates) unless _updates.empty?
_updates
end
end
end
Expand Down
66 changes: 66 additions & 0 deletions lib/acts_as_scrubbable/task_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
require 'acts_as_scrubbable/parallel_table_scrubber'
require 'highline/import'
require 'acts_as_scrubbable/ar_class_processor'
require 'term/ansicolor'

module ActsAsScrubbable
class TaskRunner
attr_reader :ar_classes
private :ar_classes

def initialize
@ar_classes = []
end

def prompt_db_configuration
db_host = ActiveRecord::Base.connection_config[:host]
db_name = ActiveRecord::Base.connection_config[:database]

ActsAsScrubbable.logger.warn Term::ANSIColor.red("Please verify the information below to continue")
ActsAsScrubbable.logger.warn Term::ANSIColor.red("Host: ") + Term::ANSIColor.white(" #{db_host}")
ActsAsScrubbable.logger.warn Term::ANSIColor.red("Database: ") + Term::ANSIColor.white("#{db_name}")
end

def confirmed_configuration?
db_host = ActiveRecord::Base.connection_config[:host]

unless ENV["SKIP_CONFIRM"] == "true"
answer = ask("Type '#{db_host}' to continue. \n".red + '-> '.white)
unless answer == db_host
ActsAsScrubbable.logger.error Term::ANSIColor.red("exiting ...")
return false
end
end
true
end

def extract_ar_classes
Rails.application.eager_load! # make sure all the classes are loaded
@ar_classes = ActiveRecord::Base.descendants.select { |d| d.scrubbable? }.sort_by { |d| d.to_s }

if ENV["SCRUB_CLASSES"].present?
class_list = ENV["SCRUB_CLASSES"].split(",")
class_list = class_list.map { |_class_str| _class_str.constantize }
@ar_classes = ar_classes & class_list
end
end

def set_ar_class(ar_class)
ar_classes << ar_class
end

def scrub(num_of_batches: nil)
Parallel.each(ar_classes) do |ar_class|
ActsAsScrubbable::ArClassProcessor.new(ar_class).process(num_of_batches)
end
ActiveRecord::Base.connection.verify!
end

def after_hooks
if ENV["SKIP_AFTERHOOK"].blank?
ActsAsScrubbable.logger.info Term::ANSIColor.red("Running after hook")
ActsAsScrubbable.execute_after_hook
end
end
end
end
Loading

0 comments on commit b827696

Please sign in to comment.