Skip to content

Commit

Permalink
Fix block allocation with two or more workers hanging on failed funct…
Browse files Browse the repository at this point in the history
…ion (#532)

* errors with local backend

* The errors are not raised with multiple processes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix parameter

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Dec 24, 2024
1 parent 95c9480 commit 403d51f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
6 changes: 5 additions & 1 deletion executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__(
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
self._set_process(
process=[
RaisingThread(
Expand Down Expand Up @@ -209,6 +210,7 @@ def execute_parallel_tasks(
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
queue_join_on_shutdown: bool = True,
**kwargs,
) -> None:
"""
Expand All @@ -227,6 +229,7 @@ def execute_parallel_tasks(
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -244,7 +247,8 @@ def execute_parallel_tasks(
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
future_queue.task_done()
future_queue.join()
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
if cache_directory is None:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def merge(lst):
return sum(lst)


def raise_error():
raise RuntimeError


class TestExecutorWithDependencies(unittest.TestCase):
def test_executor(self):
with Executor(max_cores=1, backend="local") as exe:
Expand Down Expand Up @@ -227,3 +231,29 @@ def test_many_to_one_plot(self):
)
self.assertEqual(len(nodes), 18)
self.assertEqual(len(edges), 21)


class TestExecutorErrors(unittest.TestCase):
def test_block_allocation_false_one_worker(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=1, backend="local", block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_true_one_worker(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=1, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_false_two_workers(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=2, backend="local", block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_true_two_workers(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=2, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

0 comments on commit 403d51f

Please sign in to comment.