Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yielding Runner #41

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
language: ruby
env:
- JRUBY_OPTS=--2.0
before_install:
- gem install bundler
- gem update bundler
rvm:
- 2.4.0
Expand Down
30 changes: 30 additions & 0 deletions benchmark/csv_process.etl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
$LOAD_PATH << File.dirname(__FILE__) + '/../test'

require 'support/test_csv_source'
require 'support/test_csv_destination'

pre_process do
Dir.chdir(File.expand_path(File.dirname(__FILE__))) do
file = 'sirene_2017111_E_Q.zip'
unless File.exist?(file)
puts "Downloading data file"
`axel -n 10 http://files.data.gouv.fr/sirene/#{file}`
`unzip #{file}`
`head -10000 #{file} > extract.csv`
puts "Please restart"
abort
end
end
end

source TestCsvSource, 'benchmark/extract.csv', col_sep: ';', encoding: "ISO-8859-1:UTF-8"

@count = 0

transform do |row|
@count += 1
if @count % 1000 == 0
puts "Read #{@count} rows so far"
end
row
end
1 change: 1 addition & 0 deletions lib/kiba.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'kiba/context'
require 'kiba/parser'
require 'kiba/runner'
require 'kiba/alternate_runner'

Kiba.extend(Kiba::Parser)
Kiba.extend(Kiba::Runner)
85 changes: 85 additions & 0 deletions lib/kiba/alternate_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
module Kiba
# WIP
module AlternateRunner
# allow to handle a block form just like a regular transform
class AliasingProc < Proc
alias_method :process, :call
end

def run(control)
# TODO: add a dry-run (not instantiating mode) to_instances call
# that will validate the job definition from a syntax pov before
# going any further. This could be shared with the parser.
run_pre_processes(control)
process_rows(
to_instances(control.sources),
to_instances(control.transforms, true),
to_instances(control.destinations)
)
# TODO: when I add post processes as class, I'll have to add a test to
# make sure instantiation occurs after the main processing is done (#16)
run_post_processes(control)
end

def run_pre_processes(control)
to_instances(control.pre_processes, true, false).each(&:call)
end

def run_post_processes(control)
to_instances(control.post_processes, true, false).each(&:call)
end

def lazy_transform(from, t)
Enumerator::Lazy.new(from) do |yielder, input_row|
final_row = t.process(input_row) do |yielded_row|
yielder << yielded_row
end
yielder << final_row if final_row
end
end

def process_rows(sources, transforms, destinations)
source_rows = Enumerator::Lazy.new(sources) do |yielder, source|
source.each do |row|
yielder << row
end
end

rows = source_rows.lazy

transforms.each do |transform|
rows = lazy_transform(rows, transform)
end

rows.each do |row|
destinations.each do |destination|
destination.write(row)
end
end
destinations.find_all { |d| d.respond_to?(:close) }.each(&:close)
end

# not using keyword args because JRuby defaults to 1.9 syntax currently
def to_instances(definitions, allow_block = false, allow_class = true)
definitions.map do |definition|
to_instance(
*definition.values_at(:klass, :args, :block),
allow_block, allow_class
)
end
end

def to_instance(klass, args, block, allow_block, allow_class)
if klass
fail 'Class form is not allowed here' unless allow_class
klass.new(*args)
elsif block
fail 'Block form is not allowed here' unless allow_block
AliasingProc.new(&block)
else
# TODO: support block passing to a class form definition?
fail 'Class and block form cannot be used together at the moment'
end
end
end
end
21 changes: 19 additions & 2 deletions lib/kiba/cli.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
require 'kiba'
require 'optparse'

module Kiba
class Cli
def self.run(args)
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: kiba your-script.etl [options]"
opts.on("-r", "--runner [RUNNER_CLASS]", "Specify Kiba runner class") do |runner|
options[:runner] = runner
end
end.parse!(args)

unless args.size == 1
puts 'Syntax: kiba your-script.etl'
puts 'Usage: kiba your-script.etl'
exit(-1)
end
filename = args[0]
script_content = IO.read(filename)
job_definition = Kiba.parse(script_content, filename)
Kiba.run(job_definition)
runner_instance(options).run(job_definition)
end

def self.runner_instance(options)
runner_class = options[:runner]
runner_class = runner_class ? Object.const_get(runner_class) : Kiba::Runner
runner = Object.new
runner.extend(runner_class)
runner
end
end
end
26 changes: 26 additions & 0 deletions test/common/runner.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'benchmark'

module SharedRunnerTests
def rows
@rows ||= [
Expand Down Expand Up @@ -112,4 +114,28 @@ def test_use_next_to_exit_early_from_block_transform
# and the second row should have been reformatted
assert_equal [{new_identifier: 'second-row'}], @remaining_rows
end

def test_benchmark
rows = []
job = Kiba.parse do
source TestEnumerableSource, (1..100_000)

transform do |row|
{item: row}
end

20.times do
transform do |row|
{item: row.fetch(:item) + 1}
end
end

destination TestArrayDestination, rows
end
time = Benchmark.measure do
kiba_run(job)
end.real
puts "\n#{self} : #{time.real.round(2)} sec"
assert_equal 100_000, rows.size
end
end
9 changes: 9 additions & 0 deletions test/support/test_array_destination.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class TestArrayDestination
def initialize(array)
@array = array
end

def write(row)
@array << row
end
end
4 changes: 2 additions & 2 deletions test/support/test_csv_source.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require 'csv'

class TestCsvSource
def initialize(input_file)
@csv = CSV.open(input_file, headers: true, header_converters: :symbol)
def initialize(input_file, csv_options = {})
@csv = CSV.open(input_file, {headers: true, header_converters: :symbol}.merge(csv_options))
end

def each
Expand Down
8 changes: 8 additions & 0 deletions test/support/test_yielding_transform.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class TestYieldingTransform
def process(row)
row.fetch(:tags).each do |value|
yield({item: value})
end
{item: "classic-return-value"}
end
end
44 changes: 44 additions & 0 deletions test/test_alternate_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require_relative 'helper'
require 'minitest/mock'
require_relative 'support/test_enumerable_source'
require_relative 'support/test_array_destination'
require_relative 'support/test_yielding_transform'
require_relative 'common/runner'

class TestAlternateRunner < Kiba::Test
include SharedRunnerTests

def kiba_run(job)
runner = Object.new
runner.extend(Kiba::AlternateRunner)
runner.run(job)
end


def test_yielding_class_transform
input_row = {tags: ["one", "two", "three"]}

control = Kiba::Control.new
control.sources << {
klass: TestEnumerableSource,
args: [[input_row]]
}
control.transforms << {
klass: TestYieldingTransform
}
array = []
control.destinations << {
klass: TestArrayDestination,
args: [array]
}

kiba_run(control)

assert_equal [
{item: 'one'},
{item: 'two'},
{item: 'three'},
{item: 'classic-return-value'}
], array
end
end