Skip to content

Commit

Permalink
Change to dedicated worker processes
Browse files Browse the repository at this point in the history
  • Loading branch information
mbj committed Dec 19, 2020
1 parent fc8c82c commit 4050bf9
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 49 deletions.
6 changes: 6 additions & 0 deletions lib/mutant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Mutant
require 'mutant/bootstrap'
require 'mutant/version'
require 'mutant/env'
require 'mutant/pipe'
require 'mutant/util'
require 'mutant/registry'
require 'mutant/ast'
Expand Down Expand Up @@ -260,3 +261,8 @@ def self.traverse(action, values)
)
end
end # Mutant

def d(value)
$stderr.puts("#{Thread.current.name}: #{value.inspect}")
value
end
14 changes: 8 additions & 6 deletions lib/mutant/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ def self.empty(world, config)
end
# rubocop:enable Metrics/MethodLength

# Kill mutation
# Cover mutation with specific index
#
# @param [Mutation] mutation
# @param [Fixnum] mutationindex
#
# @return [Result::Mutation]
def kill(mutation)
# @return [Result::MutationIndex]
def cover_index(mutation_index)
mutation = mutations.fetch(mutation_index)

start = timer.now

tests = selections.fetch(mutation.subject)

Result::Mutation.new(
Result::MutationIndex.new(
isolation_result: run_mutation_tests(mutation, tests),
mutation: mutation,
mutation_index: mutation_index,
runtime: timer.now - start
)
end
Expand Down
63 changes: 37 additions & 26 deletions lib/mutant/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,57 @@ module Parallel

# Run async computation returning driver
#
# @param [World] world
# @param [Config] config
#
# @return [Driver]
def self.async(config)
def self.async(world, config)
shared = {
var_active_jobs: shared(Variable::IVar, config, value: Set.new),
var_final: shared(Variable::IVar, config),
var_sink: shared(Variable::IVar, config, value: config.sink)
var_active_jobs: shared(Variable::IVar, world, value: Set.new),
var_final: shared(Variable::IVar, world),
var_running: shared(Variable::MVar, world, value: config.jobs),
var_sink: shared(Variable::IVar, world, value: config.sink),
var_source: shared(Variable::IVar, world, value: config.source)
}

workers = workers(world, config, shared)

Driver.new(
threads: threads(config, worker(config, **shared)),
workers: workers,
threads: threads(world, config, workers),
**shared
)
end

# The worker
#
# @param [Config] config
#
# @return [Worker]
def self.worker(config, **shared)
Worker.new(
processor: config.processor,
var_running: shared(Variable::MVar, config, value: config.jobs),
var_source: shared(Variable::IVar, config, value: config.source),
**shared
)
def self.workers(world, config, shared)
Array.new(config.jobs) do |index|
Worker.start(
block: config.block,
index: index,
process_name: "#{config.process_name}-#{index}",
world: world,
**shared
)
end
end
private_class_method :workers

def self.threads(world, config, workers)
thread = world.thread

def self.threads(config, worker)
Array.new(config.jobs) { config.thread.new(&worker.method(:call)) }
workers.map do |worker|
thread.new do
thread.current.name = "#{config.thread_name}-#{worker.index}"
worker.call
end
end
end
private_class_method :threads

def self.shared(klass, config, **attributes)
def self.shared(klass, world, **attributes)
klass.new(
condition_variable: config.condition_variable,
mutex: config.mutex,
condition_variable: world.condition_variable,
mutex: world.mutex,
**attributes
)
end
Expand Down Expand Up @@ -75,13 +87,12 @@ class Sink
# Parallel run configuration
class Config
include Adamantium::Flat, Anima.new(
:condition_variable,
:block,
:jobs,
:mutex,
:processor,
:process_name,
:sink,
:source,
:thread
:thread_name
)
end # Config

Expand Down
12 changes: 9 additions & 3 deletions lib/mutant/parallel/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ class Driver
:threads,
:var_active_jobs,
:var_final,
:var_sink
:var_running,
:var_sink,
:var_source,
:workers
)

private(*anima.attribute_names)
Expand All @@ -29,7 +32,10 @@ def wait_timeout(timeout)

def finalize(status)
status.tap do
threads.each(&:join) if status.done?
if status.done?
workers.each(&:term)
threads.each(&:join)
end
end
end

Expand All @@ -38,7 +44,7 @@ def status
var_sink.with do |sink|
Status.new(
active_jobs: active_jobs.dup.freeze,
done: threads.all? { |thread| !thread.alive? },
done: threads.all? { |worker| !worker.alive? },
payload: sink.status
)
end
Expand Down
61 changes: 59 additions & 2 deletions lib/mutant/parallel/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module Mutant
module Parallel
class Worker
include Adamantium::Flat, Anima.new(
:processor,
:handle,
:index,
:var_active_jobs,
:var_final,
:var_running,
Expand All @@ -14,6 +15,12 @@ class Worker

private(*anima.attribute_names)

public :index

def self.start(world:, block:, process_name:, **attributes)
new(handle: Child.start(world, process_name, block), **attributes)
end

# Run worker payload
#
# @return [self]
Expand All @@ -23,7 +30,7 @@ def call

job_start(job)

result = processor.call(job.payload)
result = handle.execute(job.payload)

job_done(job)

Expand All @@ -35,6 +42,10 @@ def call
self
end

def term
handle.term
end

private

def next_job
Expand Down Expand Up @@ -66,6 +77,52 @@ def finalize
var_final.put(nil) if var_running.modify(&:pred).zero?
end

class Handle
include Anima.new(:process, :pid, :connection)

def execute(payload)
connection.send(payload).receive
end

def term
process.kill('TERM', pid)
process.wait(pid)
end
end

class Child
include Anima.new(:block, :connection)

def call
loop do
connection.send(block.call(connection.receive))
end
end

def self.start(world, process_name, block)
io = world.io
process = world.process

request = Pipe.from_io(io)
response = Pipe.from_io(io)

pid = process.fork do
world.thread.current.name = process_name
world.process.setproctitle(process_name)

Child.new(
block: block,
connection: Pipe::Connection.from_pipes(reader: request, writer: response)
).call
end

Handle.new(
pid: pid,
process: process,
connection: Pipe::Connection.from_pipes(reader: response, writer: request)
)
end
end
end # Worker
end # Parallel
end # Mutant
97 changes: 97 additions & 0 deletions lib/mutant/pipe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# frozen_string_literal: true

module Mutant
# Pipe abstraction
class Pipe
include Adamantium::Flat, Anima.new(:reader, :writer)

# Run block with pipe in binmode
#
# @return [undefined]
def self.with(io)
io.pipe(binmode: true) do |(reader, writer)|
yield new(reader: reader, writer: writer)
end
end

def self.from_io(io)
reader, writer = io.pipe(binmode: true)
new(reader: reader, writer: writer)
end

# Writer end of the pipe
#
# @return [IO]
def to_writer
reader.close
writer
end

# Parent reader end of the pipe
#
# @return [IO]
def to_reader
writer.close
reader
end

class Encode
include Concord.new(:io)

def receive
Marshal.load(io.receive)
end

def send(value)
io.send(Marshal.dump(value))
end

def self.from_io(io)
self.new(Frame.new(io))
end
end

class Frame
include Concord.new(:io)

HEADER_FORMAT = 'N'
MAX_BYTES = (2**32).pred
HEADER_SIZE = 4

def receive
header = io.read(HEADER_SIZE) or fail 'Unexpected EOF'
io.read(Mutant::Util.one(header.unpack(HEADER_FORMAT)))
end

def send(body)
bytesize = body.bytesize

fail 'message to big' if bytesize > MAX_BYTES

io.write([bytesize].pack(HEADER_FORMAT))
io.write(body)
self
end
end

class Connection
include Anima.new(:reader, :writer)

def receive
reader.receive
end

def send(value)
writer.send(value)
self
end

def self.from_pipes(reader:, writer:)
new(
reader: Encode.from_io(reader.to_reader),
writer: Encode.from_io(writer.to_writer)
)
end
end
end # Pipe
end # Mutant
8 changes: 8 additions & 0 deletions lib/mutant/result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ def success?
end
end

class MutationIndex
include Anima.new(
:isolation_result,
:mutation_index,
:runtime
)
end # MutationIndex

# Mutation result
class Mutation
include Result, Anima.new(
Expand Down
Loading

0 comments on commit 4050bf9

Please sign in to comment.