Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiprocess del guard #784

Merged
merged 4 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/orion/executor/dask_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def submit(self, function, *args, **kwargs):
raise

def __del__(self):
self.client.close()
# This is necessary because if the factory constructor fails
# __del__ is executed right away but client might not be set
if hasattr(self, "client"):
self.client.close()

def __enter__(self):
return self
Expand Down
13 changes: 10 additions & 3 deletions src/orion/executor/multiprocess_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,25 @@ class PoolExecutor(BaseExecutor):
loky=Pool, # TODO: For compatibility with joblib backend. Remove in v0.4.0.
)

def __init__(self, n_workers, backend="multiprocess", **kwargs):
self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers)
def __init__(self, n_workers=-1, backend="multiprocess", **kwargs):
super().__init__(n_workers, **kwargs)

if n_workers <= 0:
n_workers = multiprocessing.cpu_count()

self.pool = PoolExecutor.BACKENDS.get(backend, ThreadPool)(n_workers)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.pool.shutdown()

def __del__(self):
self.pool.shutdown()
# This is necessary because if the factory constructor fails
# __del__ is executed right away but pool might not be set
if hasattr(self, "pool"):
self.pool.shutdown()

def __getstate__(self):
state = super(PoolExecutor, self).__getstate__()
Expand Down
3 changes: 2 additions & 1 deletion src/orion/executor/single_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def __init__(self, n_workers=1, **config):
self.nested = 0

def __del__(self):
self.close()
if hasattr(self, "closed"):
self.close()

def __enter__(self):
self.nested += 1
Expand Down
20 changes: 19 additions & 1 deletion tests/unittests/executor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from orion.executor.base import AsyncException, ExecutorClosed
from orion.executor.base import AsyncException, ExecutorClosed, executor_factory
from orion.executor.dask_backend import Dask
from orion.executor.multiprocess_backend import PoolExecutor
from orion.executor.single_backend import SingleExecutor
Expand All @@ -18,6 +18,8 @@ def thread(n):
return PoolExecutor(n, "threading")


executors = ["joblib", "poolexecutor", "dask", "singleexecutor"]

backends = [thread, multiprocess, Dask, SingleExecutor]


Expand Down Expand Up @@ -188,3 +190,19 @@ def test_multisubprocess(backend):
# access the results to make sure no exception is being
# suppressed
r.value


@pytest.mark.parametrize("executor", executors)
def test_executors_have_default_args(executor):

with executor_factory.create(executor):
pass


@pytest.mark.parametrize("backend", backends)
def test_executors_del_does_not_raise(backend):
# if executor init fails you can get very weird error messages,
# because of the deleter trying to close unallocated resources.

klass = type(backend(1))
klass.__del__(object())