Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite parallel DD based on futures #44

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion picire/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@
from .limit_reduction import LimitReduction
from .outcome import Outcome
from .parallel_dd import ParallelDD
from .shared_cache import shared_cache_decorator
from .splitter import SplitterRegistry
from .subprocess_test import ConcatTestBuilder, SubprocessTest
8 changes: 1 addition & 7 deletions picire/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from .iterator import CombinedIterator, IteratorRegistry
from .limit_reduction import LimitReduction
from .parallel_dd import ParallelDD
from .shared_cache import shared_cache_decorator
from .splitter import SplitterRegistry
from .subprocess_test import ConcatTestBuilder, SubprocessTest

Expand Down Expand Up @@ -76,8 +75,6 @@ def int_or_inf(value):
help='run DD in parallel')
parser.add_argument('-j', '--jobs', metavar='N', type=int, default=cpu_count(),
help='maximum number of test commands to execute in parallel (has effect in parallel mode only; default: %(default)s)')
parser.add_argument('-u', '--max-utilization', metavar='N', type=int, default=100,
help='maximum CPU utilization allowed; don\'t start new parallel jobs until utilization is higher (has effect in parallel mode only; default: %(default)s)')

# Tweaks how to walk through the chunk lists.
parser.add_argument('--complement-first', dest='subset_first', action='store_false', default=True,
Expand Down Expand Up @@ -153,8 +150,6 @@ def process_args(args):
'cleanup': args.cleanup}

args.cache_class = CacheRegistry.registry[args.cache]
if args.parallel:
args.cache_class = shared_cache_decorator(args.cache_class)
args.cache_config = {'cache_fail': args.cache_fail,
'evict_after_fail': args.evict_after_fail}

Expand All @@ -175,8 +170,7 @@ def process_args(args):
args.reduce_class = DD
else:
args.reduce_class = ParallelDD
args.reduce_config.update(proc_num=args.jobs,
max_utilization=args.max_utilization)
args.reduce_config.update(proc_num=args.jobs)

logger.info('Input loaded from %s', args.input)

Expand Down
134 changes: 81 additions & 53 deletions picire/parallel_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,53 @@

import logging

from multiprocessing import Value
from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED, ThreadPoolExecutor, wait
from os import cpu_count
from threading import Lock

from . import parallel_loop
from .cache import ConfigCache
from .cache import OutcomeCache
from .dd import DD
from .outcome import Outcome
from .shared_cache import shared_cache_decorator

logger = logging.getLogger(__name__)


class SharedCache(OutcomeCache):
"""
Thread-safe cache representation that stores the evaluated configurations
and their outcome.
"""

def __init__(self, cache):
self._cache = cache
self._lock = Lock()

def set_test_builder(self, test_builder):
with self._lock:
self._cache.set_test_builder(test_builder)

def add(self, config, result):
with self._lock:
self._cache.add(config, result)

def lookup(self, config):
with self._lock:
return self._cache.lookup(config)

def clear(self):
with self._lock:
self._cache.clear()

def __str__(self):
with self._lock:
return self._cache.__str__()


class ParallelDD(DD):

def __init__(self, test, *, split=None, cache=None, id_prefix=None,
config_iterator=None, dd_star=False, stop=None,
proc_num=None, max_utilization=None):
proc_num=None):
"""
Initialize a ParallelDD object.

Expand All @@ -36,29 +67,11 @@ def __init__(self, test, *, split=None, cache=None, id_prefix=None,
:param dd_star: Boolean to enable the DD star algorithm.
:param stop: A callable invoked before the execution of every test.
:param proc_num: The level of parallelization.
:param max_utilization: The maximum CPU utilization accepted.
"""
cache = cache or shared_cache_decorator(ConfigCache)()
super().__init__(test=test, split=split, cache=cache, id_prefix=id_prefix, config_iterator=config_iterator, dd_star=dd_star, stop=stop)
self._cache = SharedCache(self._cache)

self._proc_num = proc_num
self._max_utilization = max_utilization
self._fail_index = Value('i', 0, lock=False)

def _loop_body(self, config, index, config_id):
"""
The function that will be run in parallel.

:param config: The list of entries of the current configuration.
:param index: The index of the current configuration.
:param config_id: The unique ID of the current configuration.
:return: True if the test is not interesting, False otherwise.
"""
if self._test_config(config, config_id) is Outcome.FAIL:
self._fail_index.value = index
return False

return True
self._proc_num = proc_num or cpu_count()

def _reduce_config(self, run, subsets, complement_offset):
"""
Expand All @@ -73,37 +86,49 @@ def _reduce_config(self, run, subsets, complement_offset):
next complement_offset).
"""
n = len(subsets)
self._fail_index.value = n
ploop = parallel_loop.Loop(self._proc_num, self._max_utilization)
for i in self._config_iterator(n):
if i >= 0:
config_id = (f'r{run}', f's{i}')
config_set = subsets[i]
else:
i = (-i - 1 + complement_offset) % n
config_id = (f'r{run}', f'c{i}')
config_set = [c for si, s in enumerate(subsets) for c in s if si != i]
i = -i - 1

# If we checked this test before, return its result
outcome = self._lookup_cache(config_set, config_id)
if outcome is Outcome.PASS:
continue
if outcome is Outcome.FAIL:
self._fail_index.value = i
ploop.brk()
break

self._check_stop()
# Break if we found a FAIL either in the cache or be testing it now.
if not ploop.do(self._loop_body, (config_set, i, config_id)):
# if do() returned False, the test was not started
break
ploop.join()
fvalue = n
tests = set()
with ThreadPoolExecutor(self._proc_num) as pool:
for i in self._config_iterator(n):
results, tests = wait(tests, timeout=0 if len(tests) < self._proc_num else None, return_when=FIRST_COMPLETED)
for result in results:
index, outcome = result.result()
if outcome is Outcome.FAIL:
fvalue = index
break
if fvalue < n:
break

if i >= 0:
config_id = (f'r{run}', f's{i}')
config_set = subsets[i]
else:
i = (-i - 1 + complement_offset) % n
config_id = (f'r{run}', f'c{i}')
config_set = [c for si, s in enumerate(subsets) for c in s if si != i]
i = -i - 1

# If we checked this test before, return its result
outcome = self._lookup_cache(config_set, config_id)
if outcome is Outcome.PASS:
continue
if outcome is Outcome.FAIL:
fvalue = i
break

self._check_stop()
tests.add(pool.submit(self._test_config_with_index, i, config_set, config_id))

results, _ = wait(tests, return_when=ALL_COMPLETED)
if fvalue == n:
for result in results:
index, outcome = result.result()
if outcome is Outcome.FAIL:
fvalue = index
break

# fvalue contains the index of the cycle in the previous loop
# which was found interesting. Otherwise it's n.
fvalue = self._fail_index.value
if fvalue < 0:
# Interesting complement is found.
# In next run, start removing the following subset
Expand All @@ -114,3 +139,6 @@ def _reduce_config(self, run, subsets, complement_offset):
return [subsets[fvalue]], 0

return None, complement_offset

def _test_config_with_index(self, index, config, config_id):
return index, self._test_config(config, config_id)
Loading