diff --git a/lib/kiba.rb b/lib/kiba.rb index 898d36d..16c90a4 100644 --- a/lib/kiba.rb +++ b/lib/kiba.rb @@ -5,6 +5,15 @@ require 'kiba/context' require 'kiba/parser' require 'kiba/runner' +require 'kiba/streaming_runner' +require 'kiba/dsl_extensions/config' Kiba.extend(Kiba::Parser) -Kiba.extend(Kiba::Runner) + +module Kiba + def self.run(job) + # NOTE: use Hash#dig when Ruby 2.2 reaches EOL + runner = job.config.fetch(:kiba, {}).fetch(:runner, Kiba::Runner) + runner.run(job) + end +end diff --git a/lib/kiba/control.rb b/lib/kiba/control.rb index 8abfd91..dc27eee 100644 --- a/lib/kiba/control.rb +++ b/lib/kiba/control.rb @@ -3,6 +3,10 @@ class Control def pre_processes @pre_processes ||= [] end + + def config + @config ||= {} + end def sources @sources ||= [] diff --git a/lib/kiba/dsl_extensions/config.rb b/lib/kiba/dsl_extensions/config.rb new file mode 100644 index 0000000..616dbc3 --- /dev/null +++ b/lib/kiba/dsl_extensions/config.rb @@ -0,0 +1,9 @@ +module Kiba + module DSLExtensions + module Config + def config(context, context_config) + (@control.config[context] ||= {}).merge!(context_config) + end + end + end +end \ No newline at end of file diff --git a/lib/kiba/runner.rb b/lib/kiba/runner.rb index e3fab74..d66dedb 100644 --- a/lib/kiba/runner.rb +++ b/lib/kiba/runner.rb @@ -1,5 +1,7 @@ module Kiba module Runner + extend self + # allow to handle a block form just like a regular transform class AliasingProc < Proc alias_method :process, :call diff --git a/lib/kiba/streaming_runner.rb b/lib/kiba/streaming_runner.rb new file mode 100644 index 0000000..69eece8 --- /dev/null +++ b/lib/kiba/streaming_runner.rb @@ -0,0 +1,33 @@ +module Kiba + module StreamingRunner + include Runner + extend self + + def transform_stream(stream, t) + Enumerator.new do |y| + stream.each do |input_row| + returned_row = t.process(input_row) do |yielded_row| + y << yielded_row + end + y << returned_row if returned_row + end + end + end + + def source_stream(sources) + Enumerator.new do |y| + sources.each do |source| + source.each { |r| y << r } + end + end + end + + def process_rows(sources, transforms, destinations) + stream = source_stream(sources) + recurser = lambda { |stream,t| transform_stream(stream, t) } + transforms.inject(stream, &recurser).each do |r| + destinations.each { |d| d.write(r) } + end + end + end +end \ No newline at end of file diff --git a/test/common/runner.rb b/test/common/runner.rb index 1915401..c7452d5 100644 --- a/test/common/runner.rb +++ b/test/common/runner.rb @@ -1,4 +1,11 @@ +require 'minitest/mock' +require_relative '../support/test_enumerable_source' + module SharedRunnerTests + def kiba_run(job) + Kiba.run(job) + end + def rows @rows ||= [ { identifier: 'first-row' }, diff --git a/test/support/test_array_destination.rb b/test/support/test_array_destination.rb new file mode 100644 index 0000000..d4229c4 --- /dev/null +++ b/test/support/test_array_destination.rb @@ -0,0 +1,9 @@ +class TestArrayDestination + def initialize(array) + @array = array + end + + def write(row) + @array << row + end +end \ No newline at end of file diff --git a/test/support/test_yielding_transform.rb b/test/support/test_yielding_transform.rb new file mode 100644 index 0000000..04dad21 --- /dev/null +++ b/test/support/test_yielding_transform.rb @@ -0,0 +1,8 @@ +class TestYieldingTransform + def process(row) + row.fetch(:tags).each do |value| + yield({item: value}) + end + {item: "classic-return-value"} + end +end \ No newline at end of file diff --git a/test/test_parser.rb b/test/test_parser.rb index b183ebb..a69da2f 100644 --- a/test/test_parser.rb +++ b/test/test_parser.rb @@ -100,4 +100,31 @@ def common_source_declaration ensure remove_files('test/tmp/etl-common.rb', 'test/tmp/etl-main.rb') end + + def test_config + control = Kiba.parse do + extend Kiba::DSLExtensions::Config + + config :context, key: "value", other_key: "other_value" + end + + assert_equal({ context: { + key: "value", + other_key: "other_value" + }}, control.config) + end + + def test_config_override + control = Kiba.parse do + extend Kiba::DSLExtensions::Config + + config :context, key: "value", other_key: "other_value" + config :context, key: "new_value" + end + + assert_equal({ context: { + key: "new_value", + other_key: "other_value" + }}, control.config) + end end diff --git a/test/test_runner.rb b/test/test_runner.rb index 70a2096..9fc0bbf 100644 --- a/test/test_runner.rb +++ b/test/test_runner.rb @@ -1,14 +1,6 @@ require_relative 'helper' -require 'minitest/mock' -require_relative 'support/test_enumerable_source' require_relative 'common/runner' class TestRunner < Kiba::Test include SharedRunnerTests - - def kiba_run(job) - runner = Object.new - runner.extend(Kiba::Runner) - runner.run(job) - end end diff --git a/test/test_streaming_runner.rb b/test/test_streaming_runner.rb new file mode 100644 index 0000000..723616b --- /dev/null +++ b/test/test_streaming_runner.rb @@ -0,0 +1,33 @@ +require_relative 'helper' +require_relative 'support/test_enumerable_source' +require_relative 'support/test_array_destination' +require_relative 'support/test_yielding_transform' +require_relative 'common/runner' + +class TestStreamingRunner < Kiba::Test + include SharedRunnerTests + + def test_yielding_class_transform + input_row = {tags: ["one", "two", "three"]} + destination_array = [] + + job = Kiba.parse do + extend Kiba::DSLExtensions::Config + + config :kiba, runner: Kiba::StreamingRunner + + source TestEnumerableSource, [input_row] + transform TestYieldingTransform + destination TestArrayDestination, destination_array + end + + kiba_run(job) + + assert_equal [ + {item: 'one'}, + {item: 'two'}, + {item: 'three'}, + {item: 'classic-return-value'} + ], destination_array + end +end