Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new StreamingRunner #44

Merged
merged 26 commits into from
Dec 25, 2017
Merged

Introduce new StreamingRunner #44

merged 26 commits into from
Dec 25, 2017

Conversation

thbar
Copy link
Owner

@thbar thbar commented Dec 22, 2017

This PR introduces a new "Runner" implementation named StreamingRunner. The "Runner" is the part of Kiba which is responsible for carrying out the actual data processing.

Previously, a given "class transform" receiving one input row was only able to generate one output row, or no output row.

With this new runner, leveraging Ruby's Enumerator, each "class transform" can generate N output rows, by calling yield row, in addition to the row returned by process as usual.

While this can appear like something simple, it has a massive consequence, which is the ability to separate concerns when writing reusable Kiba components. This leads the way to writing more focused, generic & composable transforms when working with Kiba.

Let's pick an example.

Imagine you have created a Kiba source able to extract XML elements from a group of files on disk (with each file containing N elements).

It would typically look like this:

class MySource
  attr_reader :dir_pattern

  def initialize(dir_pattern:)
    @dir_pattern = dir_pattern
  end

  def each
    Dir[dir_pattern].sort.each do |file|
      doc = Nokogiri::XML(IO.binread(file))
      doc.search('/invoices/invoice').each do |item|
        yield(item)
      end
    end
  end
end

Such a class has 4 responsabilities:

  • The directory iteration
  • The XML parsing
  • The XML node research
  • The act of exploding each detected sub-node as an independent row

With Kiba 1 you can achieve some level of splitting here by using the decorator technique I outlined here, but this can only take you so far.

With the new Kiba runner, you can first rewrite the code above as 4 independent components (1 source & 3 transforms):

class DirectoryLister
  def initialize(dir_pattern:)

  def each
    Dir[dir_pattern].sort.each do |filename|
      yield(filename)
    end
  end
end

class XMLReader
  def process(filename)
    Nokogiri::XML(IO.binread(filename))
  end
end

class XMLSearcher
  def initialize(selector:)

  def process(doc)
    doc.search(selector)
  end
end

class EnumerableExploder
  def process(row)
    row.each do |item|
      yield(item)
    end
    nil # tell the pipeline to ignore the final value
  end
end   

Which can then be used:

source DirectoryLister, dir_pattern: '*.csv'
transform XMLReader
transform XMLSearcher, selector: '/invoices/invoice'
transform EnumerableExploder

While it can appear to be more complicated at first, each of these 4 components can now be mix-and-matched with other components in completely unrelated scenarios.

For instance:

  • The DirectoryLister could be used to list anything (JSON files etc).
  • The EnumerableExploder, similarly, could be used for pretty much anything.

This opens the door to provide more composable & more reusable components to Kiba users, or as part of Kiba Common or Kiba Pro.

How to enable the StreamingRunner

Using the new runner is opt-in. You'll have to do:

extend Kiba::DSLExtensions::Config

config :kiba, runner: Kiba::StreamingRunner

From there, you can write "yielding class transforms", like:

class MyTransform
  def process(row)
    yield {key: 1}
    yield {key: 2}
    {key: 3}
  end
end

Limitations

You can only yield rows from "class transforms". Calling yield from a "block transform" will generate an error.

Benchmark

Real-life benchmarks I made showed that the impact (on real-life examples) is negligible at this point.

A non-real-life benchmark used in 82bc8e5 shows that the new runner takes 5% to 30% more time. It's a non-real-life example because there are no real sources nor real destinations.

Notes on current implementation

This was initiated in a private repo, later published as #41, which has been reworked & finalized here.

This cherry-picks from #41 but improves the syntax a bit.
This cherry-picks from #41 but with a more DRY code.
A much better choice than what was originally implemented in #41, since:
- it allows to decide which runner to pick on a per-ETL basis
- it will work inside sidekiq (vs only on command line)
@thbar thbar self-assigned this Dec 22, 2017
@thbar thbar mentioned this pull request Dec 22, 2017
thbar added a commit that referenced this pull request Dec 23, 2017
@thbar thbar changed the title Yielding Runner (improved version with config-based opt-in) StreamingRunner (improved version with config-based opt-in) Dec 24, 2017
I've realized that lazy is not useful yet at this stage, and also brings an extra cost. I will switch back to lazy when I'll have an actual need (future work on parallelization etc).
I think the previous commit should make the runner work on all supported platforms, unmodified.
@thbar thbar changed the title StreamingRunner (improved version with config-based opt-in) Introduce new StreamingRunner Dec 25, 2017
@thbar thbar removed the wip label Dec 25, 2017
@thbar thbar merged commit 8e1b8ab into master Dec 25, 2017
@thbar thbar deleted the yielding-runner branch December 25, 2017 00:26
@vfonic
Copy link
Contributor

vfonic commented Nov 5, 2018

This is amazing feature! Thank you so much for taking the time to add it! ❤️

@thbar
Copy link
Owner Author

thbar commented Nov 5, 2018

@vfonic thanks for your feedback! Much appreciated ^_^ 💙 💚

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants