Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock after few workers take all tasks #5635

Open
chrisroat opened this issue Jan 3, 2022 · 11 comments
Open

Deadlock after few workers take all tasks #5635

chrisroat opened this issue Jan 3, 2022 · 11 comments

Comments

@chrisroat
Copy link
Contributor

What happened:

I have a dask-gateway cluster on k8s, and a 63k task graph. The processing has essentially stalled. Five workers have 375 tasks each, while the remaining 75 have zero. The scheduler logs have a bunch of timeout errors. The workers with all the tasks have the "unresponsive... long-running ... GIL" errors -- though it look like the running tasks are stack.

Screen Shot 2022-01-03 at 12 32 28 PM

I attempted to use dump_cluster_state, but hit a timeout error.

Anything else we need to know?:

After deleting the pods with the 5 wedged workers, the graph completed.

Environment:

  • Dask version: 2021.12.0 (distributed was from a few commits after 2021.12.0)
  • Python version: 3.8.12
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): pip
Cluster Dump State:
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py in connect()
    318         # write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
--> 319         handshake = await asyncio.wait_for(comm.read(), time_left())
    320         await asyncio.wait_for(comm.write(local_info), time_left())

/opt/conda/lib/python3.8/asyncio/tasks.py in wait_for()
    500             await _cancel_and_wait(fut, loop=loop)
--> 501             raise exceptions.TimeoutError()
    502     finally:

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
/tmp/ipykernel_894/3630066435.py in <module>
----> 1 client.dump_cluster_state(f"dump_{name}_{date}")

/opt/conda/lib/python3.8/site-packages/distributed/client.py in dump_cluster_state(self, filename, exclude, format)
   3592 
   3593         """
-> 3594         return self.sync(
   3595             self._dump_cluster_state,
   3596             filename=filename,

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    308             return future
    309         else:
--> 310             return sync(
    311                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    312             )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    362     if error[0]:
    363         typ, exc, tb = error[0]
--> 364         raise exc.with_traceback(tb)
    365     else:
    366         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    347             if callback_timeout is not None:
    348                 future = asyncio.wait_for(future, callback_timeout)
--> 349             result[0] = yield future
    350         except Exception:
    351             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _dump_cluster_state(self, filename, exclude, format)
   3495         )
   3496         versions = self._get_versions()
-> 3497         scheduler_info, worker_info, versions_info = await asyncio.gather(
   3498             scheduler_info, worker_info, versions
   3499         )

/opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    884             name, comm.name = comm.name, "ConnectionPool." + key
    885             try:
--> 886                 result = await send_recv(comm=comm, op=key, **kwargs)
    887             finally:
    888                 self.pool.reuse(self.addr, comm)

/opt/conda/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    677         if comm.deserialize:
    678             typ, exc, tb = clean_exception(**response)
--> 679             raise exc.with_traceback(tb)
    680         else:
    681             raise Exception(response["exception_text"])

/opt/conda/lib/python3.8/site-packages/distributed/core.py in handle_comm()
    519                             result = asyncio.ensure_future(result)
    520                             self._ongoing_coroutines.add(result)
--> 521                             result = await result
    522                     except (CommClosedError, CancelledError):
    523                         if self.status in (Status.running, Status.paused):

/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py in broadcast()
   6018             return resp
   6019 
-> 6020         results = await All(
   6021             [send_message(address) for address in addresses if address is not None]
   6022         )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in All()
    206     while not tasks.done():
    207         try:
--> 208             result = await tasks.next()
    209         except Exception:
    210 

/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py in send_message()
   6010 
   6011         async def send_message(addr):
-> 6012             comm = await self.rpc.connect(addr)
   6013             comm.name = "Scheduler Broadcast"
   6014             try:

/opt/conda/lib/python3.8/site-packages/distributed/core.py in connect()
   1069         except Exception as exc:
   1070             self.semaphore.release()
-> 1071             raise exc
   1072         finally:
   1073             self._connecting.discard(fut)

/opt/conda/lib/python3.8/site-packages/distributed/core.py in connect()
   1053             )
   1054             self._connecting.add(fut)
-> 1055             comm = await fut
   1056             comm.name = "ConnectionPool"
   1057             comm._pool = weakref.ref(self)

/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py in connect()
    322         with suppress(Exception):
    323             await comm.close()
--> 324         raise OSError(
    325             f"Timed out during handshake while connecting to {addr} after {timeout} s"
    326         ) from exc

OSError: Timed out during handshake while connecting to tls://10.16.184.3:46165 after 30 s
@fjetter
Copy link
Member

fjetter commented Jan 14, 2022

After deleting the pods with the 5 wedged workers, the graph completed.

I assume these are the five workers which have so much data spilled on the dashboard? (See bytes stored by worker)

"unresponsive... long-running ... GIL" errors -- though it look like the running tasks are stack.

Anything interesting happening in this function? E.g. is there native code running which may not release the GIL?

After deleting the pods with the 5 wedged workers, the graph completed.

That strongly indicates another state machine problem.

@chrisroat
Copy link
Contributor Author

I think the stack task is just a da.stack call, though it's possible that optimization fused some other things. The CPU has dropped to zero.

I'm not sure how to better help diagnose the state machine issues.

I find most problems with autoscaling clusters. I could set the cluster size to very high numbers, but the workloads vary a lot and this would be wasteful. I already waste a lot because my autoscale lower bound needs to be high enough to prevent KilledWorker issues while waiting for new workers to come online... so the tail end of the graph where a few tasks are left has very low utilization.

@fjetter
Copy link
Member

fjetter commented Jan 18, 2022

I attempted to use dump_cluster_state, but hit a timeout error.

These timeouts are not unexpected in such an auto scaling situation. The latest release (2022.01.0) includes a fix to the cluster dump function to not fail in case there are timeouts to individual workers.

#5590

@chrisroat
Copy link
Contributor Author

I just encountered a new mode, where the dashboard says 3 tasks are processing but the graph in the lower left shows 0 (and the Info page shows no workers with anything in "processing"). I typically have to delete a pod to unstick things, but this time I don't know which pod!

Screen Shot 2022-01-20 at 5 07 06 AM

I searched through the logs for "overlap-getitem", since that is the most upstream task prefix that is wedged. In the scheduler, I see 12 lines in succession (always the same key, but several different workers) in the scheduler:

Unexpected worker completed task. Expected: <WorkerState 'tls://10.18.96.2:43711', name: dask-worker-5ed6bca7ff4e4dfdb88a823c2b5e540b-jcxzh, status: running, memory: 809, processing: 205>, Got: <WorkerState 'tls://10.16.244.2:36389', name: dask-worker-5ed6bca7ff4e4dfdb88a823c2b5e540b-trg9s, status: running, memory: 139, processing: 28>, Key: ('overlap-getitem-666998704fa0ead8d1aecce3291ebb76', 1, 2, 18)

I tried deleting the workers mentioned, some of which are still around, but that did not bring back processing of the missing keys.

About 25 seconds before those log entries, I see:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 31.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability

Some 50 minutes later:

File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 477, in balance
    maybe_move_task(
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 378, in maybe_move_task
    self.move_task_request(ts, sat, idl)
  File "/opt/conda/lib/python3.8/site-packages/distributed/stealing.py", line 258, in move_task_request
    self.scheduler.stream_comms[victim.address].send(
KeyError: 'tls://10.18.96.2:43711'

That key is seen in the logs as a worker being retired just a few seconds prior to the "Unexpected worker completed task" messages. The same key is re-registered in the middle of those "Unexpected worker completed task" messages.

The cluster dump would not upload to github, so I put it here:

https://drive.google.com/file/d/1okQ6M8M8smP0oBhvlrdbQLJJbWp7_32U/view?usp=sharing

@bnaul
Copy link
Contributor

bnaul commented Jan 22, 2022

We have recently started seeing this behavior as well, currently on dask+distributed 2022.1.0 but I think it's been happening for a couple of releases. Same pattern as @chrisroat:

  • we observe a worker (or sometimes a few) hogging a huge number of tasks even though there are many other idle workers (stealing behaving normally otherwise); looking at the scheduler logs we see this worker being "removed" many hours before
2022-01-21 04:02:49.930 PST
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.107.43.20:34655', name: hardisty-f5562fc3-daskworkers-6c8f6dbdf4-68zs4, status: running, memory: 9, processing: 256>
  • killing that worker redistributes the tasks; some of them succeed but then many seem to disappear: client.processing() shows nothing in progress but the Status page still shows the correct progress value

@crusaderky @fjetter any chance #5665 could be related given that this worker was left for dead but then kept going...?

@crusaderky
Copy link
Collaborator

crusaderky commented Jan 24, 2022

@bnaul #5665 mitigates, but does not fix, the issue.
Everything in the opening post, particularly the memory usage graph of the stuck workers, seems to indicate that this may be a duplicate of #3761:

I can see from your dashboard that 4 workers are paused with extremely high amounts of spilled memory.
They never get out of paused state and they hold a bunch of queued tasks, which are not returned to the scheduler.
The reason why they don't get out of paused state may be (educated guess) that you ran out of disk space and they can't swap out anymore. As of the latest release, the error handling for this use case is fragile - see #5364 for improvement.

I currently have two changes in my pipeline:

  1. as soon as a worker pauses, it should return its queued tasks to the scheduler
  2. if a worker stays paused for too long, it should restart automatically

While you wait for those, I would suggest you try to figure out why you have 180 GiB worth of managed data per worker on 4 workers while the rest of the cluster is relatively free.

Do those workers contain replicas of data that already exists elsewhere? In that case, the new Active Memory Manager could help you: https://coiled.io/blog/introducing-the-dask-active-memory-manager/

You can also try running rebalance() periodically. Word of warning - the method is not safe to run on a busy scheduler. Making it safe and automatic is also in my pipeline.

@chrisroat
Copy link
Contributor Author

I see this consolidation of tasks a few workers when I use auto-scaling and spot instances. I think the addition/deletion of workers is throwing a curveball to the scheduler. I tried to repro in #4471 (no deadlock -- just bad scheduling), where you see a small number of workers get all the tasks as the cluster scales up.

The graph above is a map_overlap on a 3d-chunked array with 21720=680 chunks, reading from and writing to the zarr format. This leads to 82k tasks(!). I notice in an issue you link that one can adjust fuse parameters. Is that worth trying -- could it reduce the number of tasks and perhaps lower the rate of wonky scheduling & deadlocks?

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

@crusaderky
Copy link
Collaborator

I see this consolidation of tasks a few workers when I use auto-scaling and spot instances. I think the addition/deletion of workers is throwing a curveball to the scheduler.

Do you mean that the 4 workers that I see saturated in the dashboard are the initial ones and the rest of the cluster only goes online when they are already very full?

The graph above is a map_overlap on a 3d-chunked array with 2_17_20=680 chunks, reading from and writing to the zarr format. This leads to 82k tasks(!). I notice in an issue you link that one can adjust fuse parameters. Is that worth trying -- could it reduce the number of tasks and perhaps lower the rate of wonky scheduling & deadlocks?

fuse parameters can reduce the number of tasks, which in turn translates in a reduction in CPU load on the scheduler. If your scheduler CPU is frequently near 100%, this should in turn translate in a reduction in random timeouts and disconnects. I don't think this is your case - unless you're frequently seeing them in the logs.

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

Should be enough. Please run client.amm.running() to double check.

Could you run client.rebalance() every once in a while? I wouldn't recommend it as a production fix (it's not robust in the main branch while computations are running), but if it does help you it will give me a case for raising the priority of the rework I'm doing on it.

@chrisroat
Copy link
Contributor Author

Do you mean that the 4 workers that I see saturated in the dashboard are the initial ones and the rest of the cluster only goes online when they are already very full?

I don't think so. In the issue I references (#4471), I scale from 1 to 4 to 20 workers. It looks like 6 workers get the bulk of the load, and a few others get some load.

fuse parameters can reduce the number of tasks, which in turn translates in a reduction in CPU load on the scheduler. If your scheduler CPU is frequently near 100%, this should in turn translate in a reduction in random timeouts and disconnects. I don't think this is your case - unless you're frequently seeing them in the logs.

I do see a lot of comm errors in the logs.

FWIW, I did attempt the AMM at one point. If I set the env var DASK_DISTRIBUTED__SCHEDULER__ACTIVE_MEMORY-MANAGER__START to "true", it turns on -- do I need additional settings?

Should be enough. Please run client.amm.running() to double check.

Could you run client.rebalance() every once in a while? I wouldn't recommend it as a production fix (it's not robust in the main branch while computations are running), but if it does help you it will give me a case for raising the priority of the rework I'm doing on it.

I could try that on some future run. I do see tons of GIL errors, which I guess must be the read/write to disk that is happening. It does seem like distributing the work better would mitigate things.

@fjetter
Copy link
Member

fjetter commented Feb 15, 2022

FYI I recently merged a PR which might help in this situation. At least the failure scenario I fixed there is likely to happen in scaling situations, particularly if workers are removed #5786
Note that we got already another deadlock reported (off github) after this fix so we're not over the hill, yet.

@mrocklin
Copy link
Member

The "few workers taking all the tasks" thing is explainable and fixable by #6115 .

This doesn't explain things fully halting though, unless possibly you've run out of disk space (is this possible?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants