Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Dec 24, 2024
1 parent 57b7e2a commit fcb6c06
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
6 changes: 5 additions & 1 deletion executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(
super().__init__(max_cores=executor_kwargs.get("max_cores", None))
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False # The same queue is shared over multiple threads
self._set_process(
process=[
RaisingThread(
Expand Down Expand Up @@ -205,6 +206,7 @@ def execute_parallel_tasks(
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
queue_join_on_shutdown: bool = True,
**kwargs,
) -> None:
"""
Expand All @@ -223,6 +225,7 @@ def execute_parallel_tasks(
option to true
init_function (callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -240,7 +243,8 @@ def execute_parallel_tasks(
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
future_queue.task_done()
future_queue.join()
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
if cache_directory is None:
Expand Down
10 changes: 5 additions & 5 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ def test_block_allocation_false_two_workers(self):
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

# def test_block_allocation_true_two_workers(self):
# with self.assertRaises(RuntimeError):
# with Executor(max_cores=2, backend="local", block_allocation=True) as exe:
# cloudpickle_register(ind=1)
# _ = exe.submit(raise_error)
def test_block_allocation_true_two_workers(self):
with self.assertRaises(RuntimeError):
with Executor(max_cores=2, backend="local", block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)

0 comments on commit fcb6c06

Please sign in to comment.