Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix block allocation with two or more workers hanging on failed function #532

Merged
merged 8 commits into from
Dec 24, 2024
8 changes: 7 additions & 1 deletion executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def __init__(
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = (
False # The same queue is shared over multiple threads
)
self._set_process(
process=[
RaisingThread(
Expand Down Expand Up @@ -205,6 +208,7 @@ def execute_parallel_tasks(
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
queue_join_on_shutdown: bool = True,
**kwargs,
) -> None:
"""
Expand All @@ -223,6 +227,7 @@ def execute_parallel_tasks(
option to true
init_function (callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -240,7 +245,8 @@ def execute_parallel_tasks(
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
future_queue.task_done()
future_queue.join()
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
if cache_directory is None:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def merge(lst):
return sum(lst)


def raise_error():
raise RuntimeError


class TestExecutorWithDependencies(unittest.TestCase):
def test_executor(self):
with Executor(max_cores=1, backend="local") as exe:
Expand Down Expand Up @@ -227,3 +231,29 @@ def test_many_to_one_plot(self):
)
self.assertEqual(len(nodes), 18)
self.assertEqual(len(edges), 21)


class TestExecutorErrors(unittest.TestCase):
def test_block_allocation_false_one_worker(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=1, backend="local", block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_true_one_worker(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=1, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_false_two_workers(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=2, backend="local", block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

def test_block_allocation_true_two_workers(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=2, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
Loading