Skip to content

Commit

Permalink
Change to dedicated worker processes
Browse files Browse the repository at this point in the history
* Reduce memory growth in killforks.
* Reduce chance of GC amplification
  • Loading branch information
mbj committed Dec 22, 2020
1 parent b1ed475 commit 4c0f371
Show file tree
Hide file tree
Showing 19 changed files with 895 additions and 222 deletions.
12 changes: 12 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# Unreleased

* [#1069](https://github.com/mbj/mutant/pull/1096)

* Add GIL scaling and memory optimization via intermediary sub-processes.
This architecture improves mutant performance slightly on the average (incremental)
case but has a significant increase for longer coverage runs.
Mostly this process model reduces the friction from forking from an ever
growing main process.
Also it reduces the chance of GC amplification, while enabling future
optimizations in the area.

# v0.10.20 2020-12-16

[#1159](https://github.com/mbj/mutant/pull/1159)
Expand Down
1 change: 1 addition & 0 deletions lib/mutant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Mutant
require 'mutant/bootstrap'
require 'mutant/version'
require 'mutant/env'
require 'mutant/pipe'
require 'mutant/util'
require 'mutant/registry'
require 'mutant/ast'
Expand Down
14 changes: 8 additions & 6 deletions lib/mutant/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ def self.empty(world, config)
end
# rubocop:enable Metrics/MethodLength

# Kill mutation
# Cover mutation with specific index
#
# @param [Mutation] mutation
# @param [Fixnum] mutationindex
#
# @return [Result::Mutation]
def kill(mutation)
# @return [Result::MutationIndex]
def cover_index(mutation_index)
mutation = mutations.fetch(mutation_index)

start = timer.now

tests = selections.fetch(mutation.subject)

Result::Mutation.new(
Result::MutationIndex.new(
isolation_result: run_mutation_tests(mutation, tests),
mutation: mutation,
mutation_index: mutation_index,
runtime: timer.now - start
)
end
Expand Down
71 changes: 43 additions & 28 deletions lib/mutant/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,61 @@ module Parallel

# Run async computation returning driver
#
# @param [World] world
# @param [Config] config
#
# @return [Driver]
def self.async(config)
shared = {
var_active_jobs: shared(Variable::IVar, config, value: Set.new),
var_final: shared(Variable::IVar, config),
var_sink: shared(Variable::IVar, config, value: config.sink)
}
def self.async(world, config)
shared = shared_state(world, config)
workers = workers(world, config, shared)

Driver.new(
threads: threads(config, worker(config, **shared)),
workers: workers,
threads: threads(world, config, workers),
**shared
)
end

# The worker
#
# @param [Config] config
#
# @return [Worker]
def self.worker(config, **shared)
Worker.new(
processor: config.processor,
var_running: shared(Variable::MVar, config, value: config.jobs),
var_source: shared(Variable::IVar, config, value: config.source),
**shared
)
def self.workers(world, config, shared)
Array.new(config.jobs) do |index|
Worker.start(
block: config.block,
index: index,
process_name: "#{config.process_name}-#{index}",
world: world,
**shared
)
end
end
private_class_method :workers

def self.shared_state(world, config)
{
var_active_jobs: shared(Variable::IVar, world, value: Set.new),
var_final: shared(Variable::IVar, world),
var_running: shared(Variable::MVar, world, value: config.jobs),
var_sink: shared(Variable::IVar, world, value: config.sink),
var_source: shared(Variable::IVar, world, value: config.source)
}
end
private_class_method :shared_state

def self.threads(world, config, workers)
thread = world.thread

def self.threads(config, worker)
Array.new(config.jobs) { config.thread.new(&worker.method(:call)) }
workers.map do |worker|
thread.new do
thread.current.name = "#{config.thread_name}-#{worker.index}"
worker.call
end
end
end
private_class_method :threads

def self.shared(klass, config, **attributes)
def self.shared(klass, world, **attributes)
klass.new(
condition_variable: config.condition_variable,
mutex: config.mutex,
condition_variable: world.condition_variable,
mutex: world.mutex,
**attributes
)
end
Expand Down Expand Up @@ -75,13 +91,12 @@ class Sink
# Parallel run configuration
class Config
include Adamantium::Flat, Anima.new(
:condition_variable,
:block,
:jobs,
:mutex,
:processor,
:process_name,
:sink,
:source,
:thread
:thread_name
)
end # Config

Expand Down
12 changes: 9 additions & 3 deletions lib/mutant/parallel/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ class Driver
:threads,
:var_active_jobs,
:var_final,
:var_sink
:var_running,
:var_sink,
:var_source,
:workers
)

private(*anima.attribute_names)
Expand All @@ -29,7 +32,10 @@ def wait_timeout(timeout)

def finalize(status)
status.tap do
threads.each(&:join) if status.done?
if status.done?
workers.each(&:join)
threads.each(&:join)
end
end
end

Expand All @@ -38,7 +44,7 @@ def status
var_sink.with do |sink|
Status.new(
active_jobs: active_jobs.dup.freeze,
done: threads.all? { |thread| !thread.alive? },
done: threads.all? { |worker| !worker.alive? },
payload: sink.status
)
end
Expand Down
62 changes: 60 additions & 2 deletions lib/mutant/parallel/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ module Mutant
module Parallel
class Worker
include Adamantium::Flat, Anima.new(
:processor,
:connection,
:index,
:pid,
:process,
:var_active_jobs,
:var_final,
:var_running,
Expand All @@ -14,6 +17,45 @@ class Worker

private(*anima.attribute_names)

public :index

# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/ParameterLists
def self.start(world:, block:, process_name:, **attributes)
io = world.io
process = world.process

request = Pipe.from_io(io)
response = Pipe.from_io(io)

pid = process.fork do
world.thread.current.name = process_name
world.process.setproctitle(process_name)

Child.new(
block: block,
connection: Pipe::Connection.from_pipes(
marshal: world.marshal,
reader: request,
writer: response
)
).call
end

new(
pid: pid,
process: process,
connection: Pipe::Connection.from_pipes(
marshal: world.marshal,
reader: response,
writer: request
),
**attributes
)
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/ParameterLists

# Run worker payload
#
# @return [self]
Expand All @@ -23,7 +65,7 @@ def call

job_start(job)

result = processor.call(job.payload)
result = connection.call(job.payload)

job_done(job)

Expand All @@ -35,6 +77,12 @@ def call
self
end

def join
process.kill('TERM', pid)
process.wait(pid)
self
end

private

def next_job
Expand Down Expand Up @@ -66,6 +114,16 @@ def finalize
var_final.put(nil) if var_running.modify(&:pred).zero?
end

class Child
include Anima.new(:block, :connection)

def call
loop do
connection.send_value(block.call(connection.receive_value))
end
end
end
private_constant :Child
end # Worker
end # Parallel
end # Mutant
94 changes: 94 additions & 0 deletions lib/mutant/pipe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# frozen_string_literal: true

module Mutant
# Pipe abstraction
class Pipe
include Adamantium::Flat, Anima.new(:reader, :writer)

# Run block with pipe in binmode
#
# @return [undefined]
def self.with(io)
io.pipe(binmode: true) do |(reader, writer)|
yield new(reader: reader, writer: writer)
end
end

def self.from_io(io)
reader, writer = io.pipe(binmode: true)
new(reader: reader, writer: writer)
end

# Writer end of the pipe
#
# @return [IO]
def to_writer
reader.close
writer
end

# Parent reader end of the pipe
#
# @return [IO]
def to_reader
writer.close
reader
end

class Connection
include Anima.new(:marshal, :reader, :writer)

Error = Class.new(RuntimeError)

class Frame
include Concord.new(:io)

HEADER_FORMAT = 'N'
MAX_BYTES = (2**32).pred
HEADER_SIZE = 4

def receive_value
header = read(HEADER_SIZE)
read(Util.one(header.unpack(HEADER_FORMAT)))
end

def send_value(body)
bytesize = body.bytesize

fail Error, 'message to big' if bytesize > MAX_BYTES

io.write([bytesize].pack(HEADER_FORMAT))
io.write(body)
end

private

def read(bytes)
io.read(bytes) or fail Error, 'Unexpected EOF'
end
end

def call(payload)
send_value(payload)
receive_value
end

def receive_value
marshal.load(reader.receive_value)
end

def send_value(value)
writer.send_value(marshal.dump(value))
self
end

def self.from_pipes(marshal:, reader:, writer:)
new(
marshal: marshal,
reader: Frame.new(reader.to_reader),
writer: Frame.new(writer.to_writer)
)
end
end
end # Pipe
end # Mutant
8 changes: 8 additions & 0 deletions lib/mutant/result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ def success?
end
end

class MutationIndex
include Anima.new(
:isolation_result,
:mutation_index,
:runtime
)
end # MutationIndex

# Mutation result
class Mutation
include Result, Anima.new(
Expand Down
Loading

0 comments on commit 4c0f371

Please sign in to comment.