Skip to content

Commit

Permalink
Rewrite parallel DD based on futures
Browse files Browse the repository at this point in the history
Use thread-based futures for parallelism. Theoretically, we could
suffer a performance hit caused by the GIL, if tests were performed
in-process, but in practice, tests are usually executed in/by
subprocesses anyway. Thus, it is superfluous to start a subprocess
only to start up another subprocess.
  • Loading branch information
akosthekiss committed Nov 15, 2023
1 parent 842e7cf commit 9106b04
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 333 deletions.
1 change: 0 additions & 1 deletion picire/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@
from .limit_reduction import LimitReduction
from .outcome import Outcome
from .parallel_dd import ParallelDD
from .shared_cache import shared_cache_decorator
from .splitter import SplitterRegistry
from .subprocess_test import ConcatTestBuilder, SubprocessTest
8 changes: 1 addition & 7 deletions picire/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from .iterator import CombinedIterator, IteratorRegistry
from .limit_reduction import LimitReduction
from .parallel_dd import ParallelDD
from .shared_cache import shared_cache_decorator
from .splitter import SplitterRegistry
from .subprocess_test import ConcatTestBuilder, SubprocessTest

Expand Down Expand Up @@ -76,8 +75,6 @@ def int_or_inf(value):
help='run DD in parallel')
parser.add_argument('-j', '--jobs', metavar='N', type=int, default=cpu_count(),
help='maximum number of test commands to execute in parallel (has effect in parallel mode only; default: %(default)s)')
parser.add_argument('-u', '--max-utilization', metavar='N', type=int, default=100,
help='maximum CPU utilization allowed; don\'t start new parallel jobs until utilization is higher (has effect in parallel mode only; default: %(default)s)')

# Tweaks how to walk through the chunk lists.
parser.add_argument('--complement-first', dest='subset_first', action='store_false', default=True,
Expand Down Expand Up @@ -153,8 +150,6 @@ def process_args(args):
'cleanup': args.cleanup}

args.cache_class = CacheRegistry.registry[args.cache]
if args.parallel:
args.cache_class = shared_cache_decorator(args.cache_class)
args.cache_config = {'cache_fail': args.cache_fail,
'evict_after_fail': args.evict_after_fail}

Expand All @@ -175,8 +170,7 @@ def process_args(args):
args.reduce_class = DD
else:
args.reduce_class = ParallelDD
args.reduce_config.update(proc_num=args.jobs,
max_utilization=args.max_utilization)
args.reduce_config.update(proc_num=args.jobs)

logger.info('Input loaded from %s', args.input)

Expand Down
134 changes: 81 additions & 53 deletions picire/parallel_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,53 @@

import logging

from multiprocessing import Value
from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED, ThreadPoolExecutor, wait
from os import cpu_count
from threading import Lock

from . import parallel_loop
from .cache import ConfigCache
from .cache import OutcomeCache
from .dd import DD
from .outcome import Outcome
from .shared_cache import shared_cache_decorator

logger = logging.getLogger(__name__)


class SharedCache(OutcomeCache):
"""
Thread-safe cache representation that stores the evaluated configurations
and their outcome.
"""

def __init__(self, cache):
self._cache = cache
self._lock = Lock()

def set_test_builder(self, test_builder):
with self._lock:
self._cache.set_test_builder(test_builder)

def add(self, config, result):
with self._lock:
self._cache.add(config, result)

def lookup(self, config):
with self._lock:
return self._cache.lookup(config)

def clear(self):
with self._lock:
self._cache.clear()

def __str__(self):
with self._lock:
return self._cache.__str__()


class ParallelDD(DD):

def __init__(self, test, *, split=None, cache=None, id_prefix=None,
config_iterator=None, dd_star=False, stop=None,
proc_num=None, max_utilization=None):
proc_num=None):
"""
Initialize a ParallelDD object.
Expand All @@ -36,29 +67,11 @@ def __init__(self, test, *, split=None, cache=None, id_prefix=None,
:param dd_star: Boolean to enable the DD star algorithm.
:param stop: A callable invoked before the execution of every test.
:param proc_num: The level of parallelization.
:param max_utilization: The maximum CPU utilization accepted.
"""
cache = cache or shared_cache_decorator(ConfigCache)()
super().__init__(test=test, split=split, cache=cache, id_prefix=id_prefix, config_iterator=config_iterator, dd_star=dd_star, stop=stop)
self._cache = SharedCache(self._cache)

self._proc_num = proc_num
self._max_utilization = max_utilization
self._fail_index = Value('i', 0, lock=False)

def _loop_body(self, config, index, config_id):
"""
The function that will be run in parallel.
:param config: The list of entries of the current configuration.
:param index: The index of the current configuration.
:param config_id: The unique ID of the current configuration.
:return: True if the test is not interesting, False otherwise.
"""
if self._test_config(config, config_id) is Outcome.FAIL:
self._fail_index.value = index
return False

return True
self._proc_num = proc_num or cpu_count()

def _reduce_config(self, run, subsets, complement_offset):
"""
Expand All @@ -73,37 +86,49 @@ def _reduce_config(self, run, subsets, complement_offset):
next complement_offset).
"""
n = len(subsets)
self._fail_index.value = n
ploop = parallel_loop.Loop(self._proc_num, self._max_utilization)
for i in self._config_iterator(n):
if i >= 0:
config_id = (f'r{run}', f's{i}')
config_set = subsets[i]
else:
i = (-i - 1 + complement_offset) % n
config_id = (f'r{run}', f'c{i}')
config_set = [c for si, s in enumerate(subsets) for c in s if si != i]
i = -i - 1

# If we checked this test before, return its result
outcome = self._lookup_cache(config_set, config_id)
if outcome is Outcome.PASS:
continue
if outcome is Outcome.FAIL:
self._fail_index.value = i
ploop.brk()
break

self._check_stop()
# Break if we found a FAIL either in the cache or be testing it now.
if not ploop.do(self._loop_body, (config_set, i, config_id)):
# if do() returned False, the test was not started
break
ploop.join()
fvalue = n
tests = set()
with ThreadPoolExecutor(self._proc_num) as pool:
for i in self._config_iterator(n):
results, tests = wait(tests, timeout=0 if len(tests) < self._proc_num else None, return_when=FIRST_COMPLETED)
for result in results:
index, outcome = result.result()
if outcome is Outcome.FAIL:
fvalue = index
break
if fvalue < n:
break

if i >= 0:
config_id = (f'r{run}', f's{i}')
config_set = subsets[i]
else:
i = (-i - 1 + complement_offset) % n
config_id = (f'r{run}', f'c{i}')
config_set = [c for si, s in enumerate(subsets) for c in s if si != i]
i = -i - 1

# If we checked this test before, return its result
outcome = self._lookup_cache(config_set, config_id)
if outcome is Outcome.PASS:
continue
if outcome is Outcome.FAIL:
fvalue = i
break

self._check_stop()
tests.add(pool.submit(self._test_config_with_index, i, config_set, config_id))

results, _ = wait(tests, return_when=ALL_COMPLETED)
if fvalue == n:
for result in results:
index, outcome = result.result()
if outcome is Outcome.FAIL:
fvalue = index
break

# fvalue contains the index of the cycle in the previous loop
# which was found interesting. Otherwise it's n.
fvalue = self._fail_index.value
if fvalue < 0:
# Interesting complement is found.
# In next run, start removing the following subset
Expand All @@ -114,3 +139,6 @@ def _reduce_config(self, run, subsets, complement_offset):
return [subsets[fvalue]], 0

return None, complement_offset

def _test_config_with_index(self, index, config, config_id):
return index, self._test_config(config, config_id)
Loading

0 comments on commit 9106b04

Please sign in to comment.