Skip to content

Commit

Permalink
Add append operation (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkamel authored Apr 9, 2024
1 parent 48ac7cf commit 8cc4b67
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ Style/StringLiteralsInInterpolation:
Enabled: true
EnforcedStyle: double_quotes

Style/RedundantSelfAssignment:
Enabled: false

Layout/LineLength:
Max: 250

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## v0.10.0

* `append` operation added

## v0.9.0

* Argments are no longer passed to the `call` method, but to the
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,15 @@ The `key` itself is also passed to the block for the case that you need to
customize the reduce calculation according to the value of the key. However,
most of the time, this is not neccessary and the key can simply be ignored.

* `append`: Appends the results of 2 jobs, such that all key-value pairs
of both jobs will be in the result. `append` does not accept any block.

```ruby
job.append(other_job, worker: MyKrapsWorker, jobs: 8)
```
Please note that the partitioners and the number of partitions must match for
the jobs to be appended.

* `combine`: Combines the results of 2 jobs by combining every key available
in the current job result with the corresponding key from the passed job
result. When the passed job result does not have the corresponding key,
Expand Down
3 changes: 2 additions & 1 deletion lib/kraps/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ module Actions
MAP_PARTITIONS = "map_partitions",
REDUCE = "reduce",
COMBINE = "combine",
EACH_PARTITION = "each_partition"
EACH_PARTITION = "each_partition",
APPEND = "append"
]
end
end
18 changes: 18 additions & 0 deletions lib/kraps/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ def combine(other_job, jobs: nil, worker: @worker, before: nil, &block)
end
end

def append(other_job, jobs: nil, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
@steps << Step.new(
action: Actions::APPEND,
jobs: [jobs, @partitions].compact.min,
partitions: @partitions,
partitioner: @partitioner,
worker: worker,
before: before,
block: block,
dependency: other_job,
options: { append_step_index: other_job.steps.size - 1 }
)
end
end
end

def each_partition(jobs: nil, worker: @worker, before: nil, &block)
fresh.tap do |job|
job.instance_eval do
Expand Down
15 changes: 15 additions & 0 deletions lib/kraps/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ def perform_combine
Frame.new(token: token, partitions: @step.partitions)
end

def perform_append
append_job = @step.dependency
append_step = append_job.steps[@step.options[:append_step_index]]

raise(IncompatibleFrame, "Incompatible number of partitions") if append_step.partitions != @step.partitions

enum = (0...@frame.partitions).map do |partition|
{ partition: partition, append_frame: append_step.frame.to_h }
end

token = push_and_wait(job_count: @step.jobs, enum: enum)

Frame.new(token: token, partitions: @step.partitions)
end

def perform_each_partition
enum = (0...@frame.partitions).map { |partition| { partition: partition } }
push_and_wait(job_count: @step.jobs, enum: enum)
Expand Down
2 changes: 1 addition & 1 deletion lib/kraps/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Kraps
VERSION = "0.9.0"
VERSION = "0.10.0"
end
51 changes: 51 additions & 0 deletions lib/kraps/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,57 @@ def perform_map(payload)
temp_paths&.delete
end

def perform_append(payload)
temp_paths1 = download_all(token: @args["frame"]["token"], partition: payload["partition"])
temp_paths2 = download_all(token: payload["append_frame"]["token"], partition: payload["partition"])

implementation = Object.new
implementation.define_singleton_method(:map) do |key, value, &block|
block.call(key, value)
end

subsequent_step = next_step

if subsequent_step&.action == Actions::REDUCE
implementation.define_singleton_method(:reduce) do |key, value1, value2|
subsequent_step.block.call(key, value1, value2)
end
end

mapper = MapReduce::Mapper.new(implementation, partitioner: partitioner, memory_limit: @memory_limit)

temp_paths1.each do |temp_path|
File.open(temp_path.path) do |stream|
stream.each_line do |line|
key, value = JSON.parse(line)

mapper.map(key, value)
end
end
end

temp_paths2.each do |temp_path|
File.open(temp_path.path) do |stream|
stream.each_line do |line|
key, value = JSON.parse(line)

mapper.map(key, value)
end
end
end

mapper.shuffle(chunk_limit: @chunk_limit) do |partitions|
Parallelizer.each(partitions.to_a, @concurrency) do |partition, path|
File.open(path) do |stream|
Kraps.driver.store(Kraps.driver.with_prefix("#{@args["token"]}/#{partition}/chunk.#{payload["partition"]}.json"), stream)
end
end
end
ensure
temp_paths1&.delete
temp_paths2&.delete
end

def perform_map_partitions(payload)
temp_paths = download_all(token: @args["frame"]["token"], partition: payload["partition"])

Expand Down
80 changes: 78 additions & 2 deletions spec/kraps/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,81 @@ module Kraps
end
end

describe "#append" do
it "appends the corresponding step" do
job1 = described_class.new(worker: TestJobWorker1)
job1 = job1.parallelize(partitions: 8) do |collector|
collector.call("key1", 1)
collector.call("key2", 2)
collector.call("key3", 3)
end
job1 = job1.map do |key, value, collector|
collector.call(key, value + 1)
end

job2 = described_class.new(worker: TestJobWorker1)
job2 = job2.parallelize(partitions: 8) do |collector|
collector.call("key1", 3)
collector.call("key2", 2)
collector.call("key3", 1)
end
job2 = job2.append(job1)

expect(job2.steps).to match(
[
an_object_having_attributes(action: Actions::PARALLELIZE),
an_object_having_attributes(
action: Actions::APPEND,
partitions: 8,
partitioner: kind_of(HashPartitioner),
worker: TestJobWorker1,
before: nil,
block: nil,
dependency: job1,
options: { append_step_index: 1 }
)
]
)
end

it "respects the passed jobs, worker and before" do
before = -> {}

job1 = described_class.new(worker: TestJobWorker1)
job1 = job1.parallelize(partitions: 8) do |collector|
collector.call("key1", 1)
collector.call("key2", 2)
collector.call("key3", 3)
end
job1 = job1.map do |key, value, collector|
collector.call(key, value + 1)
end

job2 = described_class.new(worker: TestJobWorker1)
job2 = job2.parallelize(partitions: 8) do |collector|
collector.call("key1", 3)
collector.call("key2", 2)
collector.call("key3", 1)
end
job2 = job2.append(job1, jobs: 4, worker: TestJobWorker2, before: before)

expect(job2.steps).to match(
[
an_object_having_attributes(action: Actions::PARALLELIZE),
an_object_having_attributes(
action: Actions::APPEND,
jobs: 4,
partitions: 8,
partitioner: kind_of(HashPartitioner),
worker: TestJobWorker2,
before: before,
block: nil
)
]
)
end
end

describe "#combine" do
it "adds a corresponding step" do
block = -> {}
Expand All @@ -226,9 +301,10 @@ module Kraps
collector.call("key2", 2)
collector.call("key3", 1)
end
job2 = job2.combine(job1, &block)

expect(job2.steps).to match(
job3 = job2.combine(job1, &block)

expect(job3.steps).to match(
[
an_object_having_attributes(action: Actions::PARALLELIZE),
an_object_having_attributes(
Expand Down
32 changes: 32 additions & 0 deletions spec/kraps/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,38 @@ module Kraps
)
end

it "allows to append steps" do
store = []

allow_any_instance_of(TestRunner).to receive(:call) do
job1 = Kraps::Job.new(worker: TestRunnerWorker)

job1 = job1.parallelize(partitions: 8) { |collector| collector.call(1) }.map do |_, _, collector|
("key1".."key3").each { |item| collector.call(item, 1) }
end

job1 = job1.map { |key, value, collector| collector.call(key, value + 1) }

job2 = Kraps::Job.new(worker: TestRunnerWorker)

job2 = job2.parallelize(partitions: 8) { |collector| collector.call(1) }.map do |_, _, collector|
("key3".."key6").each { |item| collector.call(item, 2) }
end

job2 = job2.map { |key, value, collector| collector.call(key, value + 1) }

job1.append(job2).each_partition do |_, pairs|
pairs.each do |key, value|
store << [key, value]
end
end
end

described_class.new(TestRunner).call

expect(store.sort).to eq([["key1", 2], ["key2", 2], ["key3", 2], ["key3", 3], ["key4", 3], ["key5", 3], ["key6", 3]])
end

it "correctly resolves the job dependencies even when recursive" do
store = {}

Expand Down
Loading

0 comments on commit 8cc4b67

Please sign in to comment.