Skip to content

Commit

Permalink
Made checking for and testing defunct processes a bit more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
sybrenjansen committed Apr 19, 2022
1 parent 7e6ac5c commit 202400a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
25 changes: 15 additions & 10 deletions mpire/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,25 @@ def _start_workers(self, progress_bar: bool) -> None:
def _check_worker_status(self) -> List[Any]:
"""
Checks the worker status:
- Restarts workers that need to be restarted.
- If the worker is supposed to be alive, but isn't, terminate.
- Restarts workers that need to be restarted.
:return: List of unordered results produces by workers
"""
# Check that workers that are supposed to be alive, are actually alive. If not, then a worker died unexpectedly.
# For extremely slow machines it is possible for the worker alive Event to be True, while the check for
# process.is_alive() right after that is False. This happens when the worker actually terminated in the mean
# time. To avoid this scenario, we check the Event object before and after. In theory, it could happen that a
# new process started again, but what are the odds??
for worker_id in range(self.pool_params.n_jobs):
if (self._worker_comms.is_worker_alive(worker_id) and not self._workers[worker_id].is_alive() and
self._worker_comms.is_worker_alive(worker_id)):
# We need to add an exception if we're using the progress bar handler
if self._worker_comms.has_progress_bar():
self._worker_comms.add_exception(RuntimeError, f"Worker-{worker_id} died unexpectedly")
self.terminate()
raise RuntimeError(f"Worker-{worker_id} died unexpectedly")

# Check restarts
obtained_results = []
for worker_id in self._worker_comms.get_worker_restarts():
Expand All @@ -181,15 +195,6 @@ def _check_worker_status(self) -> List[Any]:
# Start new worker
self._workers[worker_id] = self._start_worker(worker_id)

# Check that workers that are supposed to be alive, are actually alive. If not, then a worker died unexpectedly
for worker_id in range(self.pool_params.n_jobs):
if self._worker_comms.is_worker_alive(worker_id) and not self._workers[worker_id].is_alive():
# We need to add an exception if we're using the progress bar handler
if self._worker_comms.has_progress_bar():
self._worker_comms.add_exception(RuntimeError, f"Worker-{worker_id} died unexpectedly")
self.terminate()
raise RuntimeError(f"Worker-{worker_id} died unexpectedly")

return obtained_results

def _start_worker(self, worker_id: int) -> mp.Process:
Expand Down
20 changes: 14 additions & 6 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,9 +1161,11 @@ def test_defunct_processes_exit(self):
"""
Tests if MPIRE correctly shuts down after process becomes defunct using exit()
"""
for n_jobs, progress_bar, worker_lifespan in product([1, 2, 4], [False, True], [None, 1, 3]):
with self.subTest(n_jobs=n_jobs, progress_bar=progress_bar, worker_lifespan=worker_lifespan), \
self.assertRaises(SystemExit), WorkerPool(n_jobs=n_jobs) as pool:
for n_jobs, progress_bar, worker_lifespan, start_method in product([1, 4], [False, True], [None, 1],
TEST_START_METHODS):
with self.subTest(n_jobs=n_jobs, progress_bar=progress_bar, worker_lifespan=worker_lifespan,
start_method=start_method), self.assertRaises(SystemExit), \
WorkerPool(n_jobs=n_jobs, start_method=start_method) as pool:
pool.map(self._exit, range(100), progress_bar=progress_bar, worker_lifespan=worker_lifespan)

def test_defunct_processes_kill(self):
Expand All @@ -1174,9 +1176,15 @@ def test_defunct_processes_kill(self):
thread waits until the event is set and then kills the worker. The other workers are also ensured to have done
something so we can test what happens during restarts
"""
for n_jobs, progress_bar, worker_lifespan in product([1, 3], [False, True], [None, 1]):
with self.subTest(n_jobs=n_jobs, progress_bar=progress_bar, worker_lifespan=worker_lifespan), \
self.assertRaises(RuntimeError), WorkerPool(n_jobs=n_jobs, pass_worker_id=True) as pool:
for n_jobs, progress_bar, worker_lifespan, start_method in product([1, 3], [False, True], [None, 1],
TEST_START_METHODS):
# Can't kill threads
if start_method == 'threading':
continue

with self.subTest(n_jobs=n_jobs, progress_bar=progress_bar, worker_lifespan=worker_lifespan,
start_method=start_method), self.assertRaises(RuntimeError), \
WorkerPool(n_jobs=n_jobs, pass_worker_id=True, start_method=start_method) as pool:
events = [pool.ctx.Event() for _ in range(n_jobs)]
kill_thread = Thread(target=self._kill_process, args=(events[0], pool))
kill_thread.start()
Expand Down

0 comments on commit 202400a

Please sign in to comment.