Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new StreamingRunner #44

Merged
merged 26 commits into from
Dec 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2d09468
Add test ArrayDestination
thbar Dec 22, 2017
ca31e00
Implement YieldingTransform tests
thbar Dec 22, 2017
d30cc0b
Implement YieldingRunner
thbar Dec 22, 2017
e389a32
Attempt to fix JRuby 1.7 build
thbar Dec 22, 2017
55694c6
Implement a mix-inspired config mechanism
thbar Dec 22, 2017
c79c326
Implement config-based runner choice
thbar Dec 22, 2017
aff7a29
Use config-based engine choice
thbar Dec 22, 2017
9b3d107
Clean up tests
thbar Dec 22, 2017
04126fd
Merge branch 'master' into yielding-runner
thbar Dec 23, 2017
749bb94
Merge branch 'master' into yielding-runner
thbar Dec 23, 2017
d27fa6a
Merge branch 'master' into yielding-runner
thbar Dec 23, 2017
9f5e126
Remove close (already called by regular runner)
thbar Dec 23, 2017
2694981
Extract method
thbar Dec 23, 2017
1b209c7
Add minitest-focus to development dependencies
thbar Dec 24, 2017
97418ac
Rename for clarity
thbar Dec 24, 2017
4e31d88
Group code on one line
thbar Dec 24, 2017
48b8cbd
Rewrite in a slightly more functional fashion
thbar Dec 24, 2017
66a4371
Rename for clarity
thbar Dec 24, 2017
4939cab
Remove duplicate lazy invokation
thbar Dec 24, 2017
1190565
Rename YieldingRunner to StreamingRunner
thbar Dec 24, 2017
d230720
Make config access opt-in
thbar Dec 24, 2017
9435737
Use merge! for more concise code
thbar Dec 24, 2017
8d6761c
Use Enumerator instead of Enumerator::Lazy
thbar Dec 24, 2017
96839b4
Remove JRUBY_OPTS
thbar Dec 24, 2017
797a7a9
Merge branch 'master' into yielding-runner
thbar Dec 24, 2017
627dbf2
Remove whitespace
thbar Dec 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/kiba.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
require 'kiba/context'
require 'kiba/parser'
require 'kiba/runner'
require 'kiba/streaming_runner'
require 'kiba/dsl_extensions/config'

Kiba.extend(Kiba::Parser)
Kiba.extend(Kiba::Runner)

module Kiba
def self.run(job)
# NOTE: use Hash#dig when Ruby 2.2 reaches EOL
runner = job.config.fetch(:kiba, {}).fetch(:runner, Kiba::Runner)
runner.run(job)
end
end
4 changes: 4 additions & 0 deletions lib/kiba/control.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ class Control
def pre_processes
@pre_processes ||= []
end

def config
@config ||= {}
end

def sources
@sources ||= []
Expand Down
9 changes: 9 additions & 0 deletions lib/kiba/dsl_extensions/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Kiba
module DSLExtensions
module Config
def config(context, context_config)
(@control.config[context] ||= {}).merge!(context_config)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/kiba/runner.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Kiba
module Runner
extend self

# allow to handle a block form just like a regular transform
class AliasingProc < Proc
alias_method :process, :call
Expand Down
33 changes: 33 additions & 0 deletions lib/kiba/streaming_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Kiba
module StreamingRunner
include Runner
extend self

def transform_stream(stream, t)
Enumerator.new do |y|
stream.each do |input_row|
returned_row = t.process(input_row) do |yielded_row|
y << yielded_row
end
y << returned_row if returned_row
end
end
end

def source_stream(sources)
Enumerator.new do |y|
sources.each do |source|
source.each { |r| y << r }
end
end
end

def process_rows(sources, transforms, destinations)
stream = source_stream(sources)
recurser = lambda { |stream,t| transform_stream(stream, t) }
transforms.inject(stream, &recurser).each do |r|
destinations.each { |d| d.write(r) }
end
end
end
end
7 changes: 7 additions & 0 deletions test/common/runner.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
require 'minitest/mock'
require_relative '../support/test_enumerable_source'

module SharedRunnerTests
def kiba_run(job)
Kiba.run(job)
end

def rows
@rows ||= [
{ identifier: 'first-row' },
Expand Down
9 changes: 9 additions & 0 deletions test/support/test_array_destination.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class TestArrayDestination
def initialize(array)
@array = array
end

def write(row)
@array << row
end
end
8 changes: 8 additions & 0 deletions test/support/test_yielding_transform.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class TestYieldingTransform
def process(row)
row.fetch(:tags).each do |value|
yield({item: value})
end
{item: "classic-return-value"}
end
end
27 changes: 27 additions & 0 deletions test/test_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,31 @@ def common_source_declaration
ensure
remove_files('test/tmp/etl-common.rb', 'test/tmp/etl-main.rb')
end

def test_config
control = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :context, key: "value", other_key: "other_value"
end

assert_equal({ context: {
key: "value",
other_key: "other_value"
}}, control.config)
end

def test_config_override
control = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :context, key: "value", other_key: "other_value"
config :context, key: "new_value"
end

assert_equal({ context: {
key: "new_value",
other_key: "other_value"
}}, control.config)
end
end
8 changes: 0 additions & 8 deletions test/test_runner.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
require_relative 'helper'
require 'minitest/mock'
require_relative 'support/test_enumerable_source'
require_relative 'common/runner'

class TestRunner < Kiba::Test
include SharedRunnerTests

def kiba_run(job)
runner = Object.new
runner.extend(Kiba::Runner)
runner.run(job)
end
end
33 changes: 33 additions & 0 deletions test/test_streaming_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require_relative 'helper'
require_relative 'support/test_enumerable_source'
require_relative 'support/test_array_destination'
require_relative 'support/test_yielding_transform'
require_relative 'common/runner'

class TestStreamingRunner < Kiba::Test
include SharedRunnerTests

def test_yielding_class_transform
input_row = {tags: ["one", "two", "three"]}
destination_array = []

job = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :kiba, runner: Kiba::StreamingRunner

source TestEnumerableSource, [input_row]
transform TestYieldingTransform
destination TestArrayDestination, destination_array
end

kiba_run(job)

assert_equal [
{item: 'one'},
{item: 'two'},
{item: 'three'},
{item: 'classic-return-value'}
], destination_array
end
end