diff --git a/lib/strategy/base.rb b/lib/strategy/base.rb index d2d2891..7110a50 100644 --- a/lib/strategy/base.rb +++ b/lib/strategy/base.rb @@ -114,14 +114,16 @@ def process def process_table progress index = 0 - source_table_limited.each do |record| - index += 1 - begin - process_record_if index, record - rescue => exception - @errors.log_error record, exception + source_table.transaction do + source_table_limited.each do |record| + index += 1 + begin + process_record_if index, record + rescue => exception + @errors.log_error record, exception + end + progress.show index end - progress.show index end end @@ -129,14 +131,16 @@ def process_table_in_batches progress logger.info "Processing table #{@name} records in batch size of #{@batch_size}" index = 0 - source_table_limited.find_each(:batch_size => @batch_size) do |record| - index += 1 - begin - process_record_if index, record - rescue => exception - @errors.log_error record, exception + source_table.transaction do + source_table_limited.find_each(:batch_size => @batch_size) do |record| + index += 1 + begin + process_record_if index, record + rescue => exception + @errors.log_error record, exception + end + progress.show index end - progress.show index end end @@ -146,25 +150,27 @@ def process_table_in_threads progress index = 0 threads = [] - source_table.find_in_batches(batch_size: @batch_size) do |records| - until threads.count(&:alive?) <= @thread_num - thr = threads.delete_at 0 - thr.join - progress.show index - end + source_table.transaction do + source_table.find_in_batches(batch_size: @batch_size) do |records| + until threads.count(&:alive?) <= @thread_num + thr = threads.delete_at 0 + thr.join + progress.show index + end - thr = Thread.new { - records.each do |record| - begin - process_record_if index, record - index += 1 - rescue => exception - puts exception.inspect - @errors.log_error record, exception + thr = Thread.new { + records.each do |record| + begin + process_record_if index, record + index += 1 + rescue => exception + puts exception.inspect + @errors.log_error record, exception + end end - end - } - threads << thr + } + threads << thr + end end until threads.empty?