-
-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nested scatter
calls lead to KeyError
#3703
Comments
Thank you for this. For others, a traceback. It looks like this might be caused by using replicate many times concurrently (replicate is notoriously not robust) Tracebackp(base) mrocklin@carbon-7:~$ python foo.py
running outer task 3
running outer task 9
running inner task 0 of outer task 3
running inner task 1 of outer task 3
running inner task 0 of outer task 9
running inner task 1 of outer task 9
running inner task 2 of outer task 9
running inner task 2 of outer task 3
running inner task 3 of outer task 9
running inner task 3 of outer task 3
running inner task 4 of outer task 9
running inner task 4 of outer task 3
running inner task 5 of outer task 9
running inner task 5 of outer task 3
running inner task 6 of outer task 9
running inner task 6 of outer task 3
running inner task 7 of outer task 9
running inner task 7 of outer task 3
running inner task 8 of outer task 9
running inner task 8 of outer task 3
running inner task 9 of outer task 9
running inner task 9 of outer task 3
running outer task 8
running outer task 7
distributed.core - ERROR - 'ndarray-e13a0cd21557a3394c2c32361a932aea'
Traceback (most recent call last):
File "/home/mrocklin/workspace/distributed/distributed/core.py", line 408, in handle_comm
result = await result
File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in replicate
tasks = {self.tasks[k] for k in keys}
File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in <setcomp>
tasks = {self.tasks[k] for k in keys}
KeyError: 'ndarray-e13a0cd21557a3394c2c32361a932aea'
distributed.worker - WARNING - Compute Failed
Function: outer_function
args: (array([1., 1., 1., ..., 1., 1., 1.]), 7)
kwargs: {}
Exception: KeyError('ndarray-e13a0cd21557a3394c2c32361a932aea')
running inner task 0 of outer task 8
running inner task 1 of outer task 8
running inner task 2 of outer task 8
Traceback (most recent call last):
File "foo.py", line 52, in <module>
results = client.gather(futures)
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1892, in gather
running inner task 3 of outer task 8
asynchronous=asynchronous,
running inner task 4 of outer task 8
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 778, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 348, in sync
raise exc.with_traceback(tb)
File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 332, in f
result[0] = yield future
File "/home/mrocklin/miniconda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
running inner task 5 of outer task 8
value = future.result()
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1751, in _gather
running inner task 6 of outer task 8
raise exception.with_traceback(traceback)
File "foo.py", line 23, in outer_function
slices = client.scatter(slices, broadcast=True)
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 2098, in scatter
running inner task 7 of outer task 8
hash=hash,
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 778, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 348, in sync
raise exc.with_traceback(tb)
File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 332, in f
running inner task 8 of outer task 8
result[0] = yield future
File "/home/mrocklin/miniconda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
running inner task 9 of outer task 8
value = future.result()
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1994, in _scatter
running outer task 6
await self._replicate(list(out.values()), workers=workers, n=n)
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 3045, in _replicate
keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
File "/home/mrocklin/workspace/distributed/distributed/core.py", line 755, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/home/mrocklin/workspace/distributed/distributed/core.py", line 554, in send_recv
raise exc.with_traceback(tb)
File "/home/mrocklin/workspace/distributed/distributed/core.py", line 408, in handle_comm
result = await result
File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in replicate
tasks = {self.tasks[k] for k in keys}
File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in <setcomp>
tasks = {self.tasks[k] for k in keys}
KeyError: 'ndarray-e13a0cd21557a3394c2c32361a932aea' If we turn off |
My bad. Thanks for adding it.
If I turn off broadcasting, then I personally get un-deterministic Here is the traceback. running inner task 6 of outer task 7
running inner task 6 of outer task 8
running inner task 5 of outer task 8
distributed.worker - WARNING - Compute Failed
Function: outer_function
args: (array([1., 1., 1., ..., 1., 1., 1.]), 7)
kwargs: {}
Exception: CancelledError()
Traceback (most recent call last):
File "test_scatter_simplified.py", line 61, in <module>
results = client.gather(futures)
File "/home/pierreglaser/repos/distributed/distributed/client.py", line 1892, in gather
asynchronous=asynchronous,
File "/home/pierreglaser/repos/distributed/distributed/client.py", line 778, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 348, in sync
raise exc.with_traceback(tb)
File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 332, in f
result[0] = yield future
File "/home/pierreglaser/.virtualenvs/joblib_py36/lib/python3.6/site-packages/tornado/gen.py", line 735, in run
value = future.result()
concurrent.futures._base.CancelledError PS: the use of |
Hrm, I tried this and can reproduce. I don't have an immediate answer about what is going on. To debug this further, the next thing I would try is to ...
cc'ing @jrbourbeau @lr4d in case either of them have an interest in diving in to scheduler internals. |
Thank you for the hints. I'll post an update once I figure this out (if others don't do it before me). |
if only I had the time... |
Thanks for the ping @mrocklin, I will look into this issue over the next couple of days |
For general education, here is how I'm debugging this. I rewrote this using the @gen_cluster(client=True)
async def test_nested_scatter(c, s, a, b):
np = pytest.importorskip("numpy")
from joblib import Parallel, delayed, parallel_backend
NUM_INNER_TASKS = 10
NUM_OUTER_TASKS = 10
def my_sum(x, i, j):
print(f"running inner task {j} of outer task {i}")
return np.sum(x)
def outer_function(array, i):
print(f"running outer task {i}")
client = get_client()
slices = [array[i + j :] for j in range(NUM_INNER_TASKS)]
# commenting this line makes the code run successfully
slices = client.scatter(slices, broadcast=True)
futures = client.map(my_sum, slices, [i] * NUM_INNER_TASKS, range(NUM_INNER_TASKS))
secede()
results = client.gather(futures)
rejoin()
return sum(results)
my_arrays = [np.ones(100000) for _ in range(10)]
future_arrays = await c.scatter(my_arrays, direct=False)
# using .map() instead of .submit() makes the code run successfully.
# futures = client.map(outer_function, future_arrays, range(10))
futures = []
for i, arr in enumerate(future_arrays):
future = c.submit(outer_function, arr, i)
futures.append(future)
results = await c.gather(futures)
print(results) We're getting a results = client.gather(futures) Putting in a breakpoint there before = str(futures)
secede()
try:
results = client.gather(futures)
except Exception:
breakpoint()
rejoin() I see that indeed, some of the futures have been cancelled
I want to get access to the scheduler within this function, however I need to be a little bit clever about this, because I don't want to include the scheduler in the scope, because then it'll get serialized when we move this from the client to the workers (even though they're both in the same process). So I attach the scheduler to a global module and then reference it in the function. @gen_cluster(client=True, timeout=None)
async def test_nested_scatter(c, s, a, b):
print("main client", c)
np = pytest.importorskip("numpy")
from joblib import Parallel, delayed, parallel_backend
import distributed
distributed.s = s
...
def outer_function(array, i):
import distributed
s = distributed.s This is a bit of a hack, but it works. I find that, oddly, the scheduler never received any news about this task (Pdb) pp s.story(futures[0].key) # this is what we expect
[('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
'released',
'waiting',
OrderedDict([('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
'processing')]),
1587833466.9215913),
('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
'waiting',
'processing',
{},
1587833466.9216352),
('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
'processing',
'memory',
OrderedDict(),
1587833466.950933)]
(Pdb) pp s.story(futures[-1].key) # this is what we get
[] This is odd. I check that the number of tasks is divisible by ten, and learn that it isn't (Pdb) pp len(s.tasks)
27 So I put this code in if len(tasks) % 10 != 0 and len(tasks) != 1:
breakpoint() And this leads me to this section of code while len(tasks) != n: # walk through new tasks, cancel any bad deps
n = len(tasks)
for k, deps in list(dependencies.items()):
if any(
dep not in self.tasks and dep not in tasks for dep in deps
): # bad key
logger.info("User asked for computation on lost data, %s", k)
del tasks[k]
del dependencies[k]
if k in keys:
keys.remove(k)
self.report({"op": "cancelled-key", "key": k}, client=client)
self.client_releases_keys(keys=[k], client=client) And indeed, we've had the "computation on lost data" message in logs all this time. I should look more often at logs. And we find that the scattered data on which this task depends has just recently been released distributed.scheduler - INFO - User asked for computation on lost data, my_sum-9007cd9a-b94f-407c-b700-f62aae1388aa-9
> /home/mrocklin/workspace/distributed/distributed/scheduler.py(1820)update_graph()
-> del tasks[k]
(Pdb) pp k
'my_sum-9007cd9a-b94f-407c-b700-f62aae1388aa-9'
(Pdb) pp deps
('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',)
(Pdb) pp self.story(*deps)
[('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',
'memory',
'released',
OrderedDict([('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51', 'forgotten')]),
1587834183.6691344),
('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',
'released',
'forgotten',
{},
1587834183.6691456)] This is, I think, a duplicate of https://github.com/dask/dask/issues/6027 . See also #3641 Fully worked testfrom distributed.utils_test import gen_cluster
from dask.distributed import secede, rejoin, get_client
import pytest
import distributed
@gen_cluster(client=True, timeout=None)
async def test_nested_scatter(c, s, a, b):
np = pytest.importorskip("numpy")
distributed.s = s
NUM_INNER_TASKS = 10
NUM_OUTER_TASKS = 10
def my_sum(x, i, j):
print(f"running inner task {j} of outer task {i}")
return np.sum(x)
def outer_function(array, i):
import distributed
s = distributed.s
print(f"running outer task {i}")
client = get_client()
slices = [array[i + j :] for j in range(NUM_INNER_TASKS)]
# commenting this line makes the code run successfully
slices = client.scatter(slices, direct=False)
futures = client.map(my_sum, slices, [i] * NUM_INNER_TASKS, range(NUM_INNER_TASKS))
before = str(futures)
secede()
try:
results = client.gather(futures)
except Exception as e:
f = e
breakpoint()
rejoin()
return sum(results)
my_arrays = [np.ones(100000) for _ in range(10)]
future_arrays = await c.scatter(my_arrays, direct=False)
# using .map() instead of .submit() makes the code run successfully.
# futures = client.map(outer_function, future_arrays, range(10))
futures = []
for i, arr in enumerate(future_arrays):
future = c.submit(outer_function, arr, i)
futures.append(future)
results = await c.gather(futures)
print(results) |
Short term these problems also just go away if you use the |
Another approach for Dask + Joblib is for Joblib, when nested, to use the ThreadPoolExecutor on the worker. def my_func(...):
w = dask.distributed.get_worker()
futures = [w.executor.submit(func, x) for x in seq\
secede()
results = [future.result() for future in futures]
rejoin()
return aggregate(results) |
There is a broader point here that one doesn't have to use Dask for everything. At some point it makes sense to switch out to more standard Python libraries. |
If the outer parallel loop cannot saturate the nodes, using the ThreadPoolExecutor for the inner loop will lead to under-subscription of the dask cluster. For instance outer loop is 5-fold cross validation and inner loop is an embarrassingly parallel model fit (e.g. a random forest with 100s of trees). But maybe this case is no so frequent. |
Ah, that makes sense. I think that short term the solution of not hashing data in scatter is probably best. It's a little bit unclean, but I suspect that it actually has better performance because locally scattering data is entirely free. |
@mrocklin Thank you for pointing out |
Hi. I think I might be hitting this same race condition without even using nested scatters. The following minimal example fails reliably for me with either I kind of agree with @mrocklin's earlier tone of urgency about this issue, since it seems so basic. I believe I've encountered this race condition a couple of times in my career using Dask (once at a previous employer and now again with this non-nested variant), and both times I googled it, added from dask import distributed
cluster = distributed.LocalCluster(
n_workers=4, threads_per_worker=2, host="0.0.0.0",
scheduler_port=0, dashboard_address=':0'
)
client = distributed.Client(cluster)
def nullity(x):
return None
def identity(x, y):
return x
for i in range(100):
try:
y = client.submit(
nullity,
client.scatter("smoochies")
).result()
client.submit(
identity,
client.scatter("smoochies"),
y
).result()
finally:
print(f"Iteration {i}") After a few iterations it usually dies with
More rarely, it dies with
|
Hi All,
I am currently working on improving the
joblib-dask
integration.It turns out that nested
Parallel
calls injoblib
using thedask
backend tend to error out with eitherKeyError
orCancelledError
.I narrowed it down using only
dask
andnumpy
, and it seems that the issue comes from nestedscatter
calls.Here is a reproducer: it consists of submitting functions that rely on scattered arrays. Each of these functions submit small arithmetic operations to be computed on scattered slices of their original input.
2 Remarks:
client.map
makes the code run successfully.slices
in theouter
functions makes the code run successfully.My guess as of now is that dynamically creating new compute resources through
secede/rejoin
calls might interact badly with the data locality logic ofdistributed
. I'm investigating this own my own, but I'm not familiar enough with thedask/distributed
codebase to trace this back efficiently.Is this behavior supported? Is there a clear anti-pattern that I'm missing? Any pointer would be helpful.
The text was updated successfully, but these errors were encountered: