-
-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Getting concurrent.futures._base.CancelledError
from simple binary tree built from futures
#4612
Comments
Thanks for the issue @cjnolet . I'm sorry to hear that you've had a frustrating time. If you can provide a minimal reproducible example I'd be happy to run through it and see if I can reproduce the problem. You're really close to this with your current code, but it looks like you have a custom function You mention that you are using two V100s. Is it necessary to use GPUs to reproduce the problem? If so, I might divert you to RAPIDS folks. |
Absolutely, @mrocklin. Actually, I am using the Oddly enough, this issue seems exclusive to the use of from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster(protocol="tcp")
client = Client(cluster)
def s(x):
return x
n_parts = 15
a = [client.submit(s, i) for i in range(n_parts)]
b = tree_reduce(a)
b = b.result()
print(str(b))
assert(sum(range(n_parts)) == b) This one seems to fail intermittently: from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster(protocol="tcp")
client = Client(cluster)
n_parts = 15
a = client.scatter(range(n_parts))
b = tree_reduce(a)
b = b.result()
print(str(b))
assert(sum(range(n_parts)) == b) I have also tried using a different reduction function. The default in the from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster(protocol="tcp")
client = Client(cluster)
def add(parts): return sum(parts)
n_parts = 15
a = client.scatter(range(n_parts))
b = tree_reduce(a, func=add)
b = b.result()
print(str(b))
assert(sum(range(n_parts)) == b) |
Thanks @cjnolet . In the future if you're able to provide a copy-pasteable example that anyone can run that would help. For example, if you don't need I tried cobbling together a copy-pastable example from your comments below: from dask.distributed import get_client, wait, Client, LocalCluster
from toolz import first
def tree_reduce(objs, func=sum):
while len(objs) > 1:
new_objs = []
n_objs = len(objs)
for i in range(0, n_objs, 2):
inputs = objs[i:i + 2]
obj = get_client().submit(func, inputs)
new_objs.append(obj)
wait(new_objs)
objs = new_objs
return first(objs)
cluster = LocalCluster()
client = Client(cluster)
def add(parts): return sum(parts)
n_parts = 15
a = client.scatter(range(n_parts))
b = tree_reduce(a, func=add)
b = b.result()
print(str(b))
assert(sum(range(n_parts)) == b) Unfortunately it seems to work fine on my machine. I ran this ten times and each time it was fine. So I think that the next step here is for you to try to provide a copy-pasteable example that a maintainer can run that fails. (Sorry for the github issue lecture. It's been a busy day.) |
My apologies, @mrocklin. Thank you for being patient with me. Here's a slightly modified script that fails for me everytime. It seems the number of workers is important (On my tests, I have only 2 workers and in our environment it's not always trivial to increase the number of workers). from dask.distributed import get_client, wait, Client, LocalCluster
from toolz import first
#import pytest
def tree_reduce(objs, func=sum):
while len(objs) > 1:
new_objs = []
n_objs = len(objs)
for i in range(0, n_objs, 2):
inputs = objs[i:i + 2]
obj = get_client().submit(func, inputs)
new_objs.append(obj)
wait(new_objs)
objs = new_objs
return first(objs)
#@pytest.mark.parametrize("n_parts", [1, 2, 5, 10, 15])
def test(n_parts):
client = Client(cluster)
def add(parts): return sum(parts)
a = client.scatter(range(n_parts))
b = tree_reduce(a, func=add)
b = b.result()
print(str(b))
assert(sum(range(n_parts)) == b)
if __name__ == "__main__":
cluster = LocalCluster(n_workers=2)
for i in range(25):
for n_parts in [1, 2, 5, 10, 15]:
test(n_parts) |
Ah, that's perfect. Thank you @cjnolet . I'm able to reproduce. I'll try to take a look at this this weekend. |
Here is a simpler failure. I've removed the tree reduction. from dask.distributed import Client, LocalCluster
async def test(n_parts, client):
a = await client.scatter(range(n_parts), hash=True)
future = client.submit(sum, a)
await future
async def f():
async with Client(n_workers=2, processes=False, asynchronous=True) as client:
for i in range(25):
for n_parts in [1, 2, 5, 10, 15]:
print(n_parts)
await test(n_parts, client)
if __name__ == "__main__":
import asyncio
asyncio.get_event_loop().run_until_complete(f()) I think that the problem here has to do with the scattered futures, which get deleted and recreated in quick succession. There is probably some subtle race condition in how the client and scheduler are counting references. @jacobtomlinson , this might be something that you enjoy digging into. It should give some understanding of internals, and should help out other NVIDIA folks. |
Thanks @mrocklin! I verified that your new script fails every time when Our CI always executes our tests against the bleeding edge versions of dask and distributed. It is currently blocked by the sudden presence of In addition to the This consistently has a cancelled error:
This errors less frequently:
Based on the example, I'm thinking this workaround is not fixing the issue, but just lowering the chances for the potential race condition. |
The error that you have uncovered here happens when
I don't have enough information on your other fail cases to know if they are the same unfortunately. If you're not scattering things then it might be something different. In that case I would encourage you to raise another issue with a minimal reproducible example. |
I looked at this and there is a race condition but I am not sure that we can easily resolve inside of dask. The race condition being that as we are scattering, we are deleting at the same time. I think there are a few options:
|
Dask should be robust to this. We shouldn't rely on users to be careful
here.
I think that someone needs to look deeply at how we currently count
references and send task-released messages around. I think that
@jacobtomlinson would be a good person for this if he has time.
…On Wed, Mar 25, 2020 at 1:17 PM Benjamin Zaitlen ***@***.***> wrote:
I looked at this and there is a race condition but I am not sure that we
can easily resolve inside of dask. The race condition being that as we are
scattering, we are deleting at the same. I think there are a few options:
1. when we scatter wait a tick
2. when we scatter call transitions directly (it's not this call but
something like, scheduler -- complete your tasks
3. ask user to wait a tick in between scatter/submit calls
4. uniqueness around scattered keys (this could be fairly limiting for
other tasks but would avoid collisions)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/dask/dask/issues/6027#issuecomment-604063575>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGY4SBYUIDQ6QCDTHLRJJRGLANCNFSM4LP3ZN2Q>
.
|
In other words. This isn't something that we want to work around or
avoid. This is something that we should dive into. Diving into it is hard
though and will require some previous experience with the internals of the
distributed scheduler.
…On Wed, Mar 25, 2020 at 1:25 PM Matthew Rocklin ***@***.***> wrote:
Dask should be robust to this. We shouldn't rely on users to be careful
here.
I think that someone needs to look deeply at how we currently count
references and send task-released messages around. I think that
@jacobtomlinson would be a good person for this if he has time.
On Wed, Mar 25, 2020 at 1:17 PM Benjamin Zaitlen ***@***.***>
wrote:
> I looked at this and there is a race condition but I am not sure that we
> can easily resolve inside of dask. The race condition being that as we are
> scattering, we are deleting at the same. I think there are a few options:
>
> 1. when we scatter wait a tick
> 2. when we scatter call transitions directly (it's not this call but
> something like, scheduler -- complete your tasks
> 3. ask user to wait a tick in between scatter/submit calls
> 4. uniqueness around scattered keys (this could be fairly limiting
> for other tasks but would avoid collisions)
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/dask/dask/issues/6027#issuecomment-604063575>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AACKZTGY4SBYUIDQ6QCDTHLRJJRGLANCNFSM4LP3ZN2Q>
> .
>
|
I am looking at this now. Although I'm feeling a little in at the deep end with this. Some pointers to where I should start looking would be appreciated. |
Yeah, it's non-trivial. I would probably start by printing out or
otherwise recording every time the client and scheduler think that the task
has been created or cancelled. I think that by looking through that list
that you'll be able to eventually spot something fishy.
…On Wed, Mar 25, 2020 at 1:27 PM Jacob Tomlinson ***@***.***> wrote:
I am looking at this now. Although I'm feeling a little in at the deep end
with this. Some pointers to where I should start looking would be
appreciated.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<https://github.com/dask/dask/issues/6027#issuecomment-604068965>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTDL2CQ2LHHZP6NQHKTRJJSMDANCNFSM4LP3ZN2Q>
.
|
I've created another further simplified reproducer. from dask.distributed import Client
async def f():
async with Client(
n_workers=1, processes=False, asynchronous=True, silence_logs=False
) as client:
d = [0, 1]
a = await client.scatter(d, hash=True)
print(await client.submit(sum, a))
del a
d = [1, 2]
a = await client.scatter(d, hash=True)
print(await client.submit(sum, a))
if __name__ == "__main__":
import asyncio
asyncio.get_event_loop().run_until_complete(f()) If we scatter some data and delete it, but then immediately scatter some more data with a common value and try to operate on it we see the This is because the following chain of events happen:
The issue seems to be a result of the time delay between the client deleting a future and the worker deleting the data. The data is recreated during that time. I feel like the logic here is reasonable. If the user deletes some data it should be deleted. If they recreate it again then they pay a performance penalty compared with not deleting it. Perhaps one solution to this would be to introduce some reference counting for keys on the worker. When a key is created in I'll start working on a PR for this. |
Getting the same concurrent.futures._base.CancelledError on LocalCluster. 13:03:59 Traceback (most recent call last): latest dask release from conda |
Moved to distributed. This seems to still be an issue. |
…sult in concurrent future error dask/distributed#4612, and instead have mesh reading occur in innermost delayed function -- slower but works without failing
@jacobtomlinson thank you for looking into the bug. This seems like a really serious bug. Right now it's causing tests I'm adding to Modin on dask to fail often (but not always). Dask users should be able to delete data with a given value and re-store data with the same value.
Is there any workaround? What happened to #3641? |
It turns out I should have been scattering my objects with Still, the bug with |
Yeah there hasn't been much activity here for a couple of years. #3641 wasn't an ideal solution and we ended up not going with it. It's interesting that not many folks are pinging the issue of giving a 👍, maybe most folks are working around it or not running into it. I'm guessing this is why it hasn't been resolved yet. cc @grainger @gjoseph92 @crusaderky @hendrikmakait in case they have any interest in this |
I just tried @jacobtomlinson's reproducer from #4612 (comment) and it still fails.
Notice the distributed/distributed/scheduler.py Lines 3885 to 3896 in 4b89e26
That's where the CancelledError is coming from as well. My gut feeling here is that this is mostly a client-side issue, not with the scheduler or worker. I haven't looked at this for more than 2min so this is just a guess. But this feels familiar to me with how the client gets reference-counting wrong in a concurrent setting, especially when passing from blocking to event-loop code. A guess:
|
I think I have another (simpler?) reproducer. Posted in #3703, but maybe I should have posted here instead since this issue has more recent activity and they seem to be duplicates. from dask import distributed
cluster = distributed.LocalCluster(
n_workers=4, threads_per_worker=2, host="0.0.0.0",
scheduler_port=0, dashboard_address=':0'
)
client = distributed.Client(cluster)
def nullity(x):
return None
def identity(x, y):
return x
for i in range(100):
# after a few iterations, we'll die with either CancelledError or KilledWorker
print(f"Iteration {i}")
y = client.submit(
nullity,
client.scatter("smoochies")
).result()
client.submit(
identity,
client.scatter("smoochies"),
y
).result() |
I can confirm that @gjoseph92 s analysis is correct and this is an ordering issue. I see two options
|
Decoupling scatter_to_workers from update_data as suggested above in 1) indeed resolves the race that triggers the cancelled error but it introduces a similar race condition on worker side resulting in data loss. |
@mvashishtha, Reproducer:
Erroring with:
My current workaround is to return the future in
But would be nice to avoid the return. |
I'm getting the following exception from a binary tree that I'm building from futures (based largely on an example in the dask docs that uses delayed instead of futures).
Here's the function in question:
And my reproducible test:
The exception is intermittent and happens about 50% of the time:
I've tried doing a
dask.distributed.wait
after each level of the tree. I've also tried waiting forb
in the reproducible example before callingresult
. I'm completely stumped as to why this is happening. I'm running this on a workstation with 2x V100s.I believe I'm doing something very wrong but I can't figure out what it is.
The text was updated successfully, but these errors were encountered: