Skip to content

Commit

Permalink
StreamingRunner (improved version with config-based opt-in) (#44)
Browse files Browse the repository at this point in the history
Introduce new StreamingRunner (#44)
  • Loading branch information
thbar authored Dec 25, 2017
1 parent efd0d7b commit 8e1b8ab
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 9 deletions.
11 changes: 10 additions & 1 deletion lib/kiba.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
require 'kiba/context'
require 'kiba/parser'
require 'kiba/runner'
require 'kiba/streaming_runner'
require 'kiba/dsl_extensions/config'

Kiba.extend(Kiba::Parser)
Kiba.extend(Kiba::Runner)

module Kiba
def self.run(job)
# NOTE: use Hash#dig when Ruby 2.2 reaches EOL
runner = job.config.fetch(:kiba, {}).fetch(:runner, Kiba::Runner)
runner.run(job)
end
end
4 changes: 4 additions & 0 deletions lib/kiba/control.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ class Control
def pre_processes
@pre_processes ||= []
end

def config
@config ||= {}
end

def sources
@sources ||= []
Expand Down
9 changes: 9 additions & 0 deletions lib/kiba/dsl_extensions/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Kiba
module DSLExtensions
module Config
def config(context, context_config)
(@control.config[context] ||= {}).merge!(context_config)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/kiba/runner.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Kiba
module Runner
extend self

# allow to handle a block form just like a regular transform
class AliasingProc < Proc
alias_method :process, :call
Expand Down
33 changes: 33 additions & 0 deletions lib/kiba/streaming_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Kiba
module StreamingRunner
include Runner
extend self

def transform_stream(stream, t)
Enumerator.new do |y|
stream.each do |input_row|
returned_row = t.process(input_row) do |yielded_row|
y << yielded_row
end
y << returned_row if returned_row
end
end
end

def source_stream(sources)
Enumerator.new do |y|
sources.each do |source|
source.each { |r| y << r }
end
end
end

def process_rows(sources, transforms, destinations)
stream = source_stream(sources)
recurser = lambda { |stream,t| transform_stream(stream, t) }
transforms.inject(stream, &recurser).each do |r|
destinations.each { |d| d.write(r) }
end
end
end
end
7 changes: 7 additions & 0 deletions test/common/runner.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
require 'minitest/mock'
require_relative '../support/test_enumerable_source'

module SharedRunnerTests
def kiba_run(job)
Kiba.run(job)
end

def rows
@rows ||= [
{ identifier: 'first-row' },
Expand Down
9 changes: 9 additions & 0 deletions test/support/test_array_destination.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class TestArrayDestination
def initialize(array)
@array = array
end

def write(row)
@array << row
end
end
8 changes: 8 additions & 0 deletions test/support/test_yielding_transform.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class TestYieldingTransform
def process(row)
row.fetch(:tags).each do |value|
yield({item: value})
end
{item: "classic-return-value"}
end
end
27 changes: 27 additions & 0 deletions test/test_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,31 @@ def common_source_declaration
ensure
remove_files('test/tmp/etl-common.rb', 'test/tmp/etl-main.rb')
end

def test_config
control = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :context, key: "value", other_key: "other_value"
end

assert_equal({ context: {
key: "value",
other_key: "other_value"
}}, control.config)
end

def test_config_override
control = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :context, key: "value", other_key: "other_value"
config :context, key: "new_value"
end

assert_equal({ context: {
key: "new_value",
other_key: "other_value"
}}, control.config)
end
end
8 changes: 0 additions & 8 deletions test/test_runner.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
require_relative 'helper'
require 'minitest/mock'
require_relative 'support/test_enumerable_source'
require_relative 'common/runner'

class TestRunner < Kiba::Test
include SharedRunnerTests

def kiba_run(job)
runner = Object.new
runner.extend(Kiba::Runner)
runner.run(job)
end
end
33 changes: 33 additions & 0 deletions test/test_streaming_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require_relative 'helper'
require_relative 'support/test_enumerable_source'
require_relative 'support/test_array_destination'
require_relative 'support/test_yielding_transform'
require_relative 'common/runner'

class TestStreamingRunner < Kiba::Test
include SharedRunnerTests

def test_yielding_class_transform
input_row = {tags: ["one", "two", "three"]}
destination_array = []

job = Kiba.parse do
extend Kiba::DSLExtensions::Config

config :kiba, runner: Kiba::StreamingRunner

source TestEnumerableSource, [input_row]
transform TestYieldingTransform
destination TestArrayDestination, destination_array
end

kiba_run(job)

assert_equal [
{item: 'one'},
{item: 'two'},
{item: 'three'},
{item: 'classic-return-value'}
], destination_array
end
end

0 comments on commit 8e1b8ab

Please sign in to comment.