Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling distributed dask to run 27,648 dask workers on the Summit supercomputer #3691

Open
markcoletti opened this issue Apr 9, 2020 · 61 comments

Comments

@markcoletti
Copy link

This github distributed dask issue is to track progress and resolve problems associated with scaling dask to run on all the nodes on Oak Ridge National Laboratory (ORNL)’s Summit supercomputer. Benjamin Zaitlen of Nvidia suggested that I create this GitHub issue on the RAPIDS-GoAi Slack when I first shared what we were trying to do there; he communicated with the dask development team, and they posed this approach of creating this issue in the project’s GitHub issue tracker.

At ORNL we are using distributed dask to evaluate unique driving scenarios using the CARLA driving simulator to improve a deep-learner model for driving a virtual autonomous vehicle. As part of this effort, we are stress testing dask on Summit by gradually adding more Summit nodes to submitted jobs, fixing problems at the point where a given number of nodes has caused issues, and then increasing the number of nodes to address the next set of problems.

As of April 9th, the maximum number of Summit nodes from which we’ve gotten a successful run is 402 nodes with six dask workers each, for a total of 2,412 dask workers. We have a job pending with 921 nodes that will push the total number of dask workers to 5,526 dask workers. Eventually we’d like dask running on all 4,608 Summit nodes, which would have 27,648 dask workers.

In parallel, we’re developing an open-source evolutionary algorithm framework, LEAP, with colleagues at George Mason University and MITRE that uses dask to concurrently evaluate individuals. We have used LEAP as the vehicle for stress testing dask on Summit. The first test problem was a MAX ONES instance where the fitness was the total number of ones in a given bit string. Since this problem was too trivial for stress testing, we created a new problem that had a single integer as the individual's phenome (that is, genes that have been decoded into something meaningful to the problem, in this case an integer) that would correspond to how many seconds to sleep during evaluation. This means an individual that had a genome of 10 bits of all ones would cause the dask worker to sleep for about 17 minutes during evaluation. Moreover, this was a maximization problem, so the longer the sleep periods, the fitter the individual. (We could probably call this test the UNDERGRAD problem. 😉 )

In the comments below, I’ll detail what I’ve done so far, and relate some of the problems I’ve encountered on this journey. I welcome comments, tips, suggestions, criticisms, and help with the next inevitable set of problems associated with this non-trivial task. 🙂

Cheers,

Mark Coletti,
ORNL

@markcoletti
Copy link
Author

I'll be posting a follow-up shortly that shares what we've done, what we're doing now, and possible strategies for resolving future problems in scaling up.

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

I'm very excited to track this issue. Thank you @piprrr for recording your efforts (and of course, for the efforts themselves).

@markcoletti
Copy link
Author

You should not use dask_jobqueue on Summit

Of course, it made sense to read up on what others may have done with regards to getting dask to work on Summit, and @mrocklin had described a journey to get a small number of dask workers going on Summit that would connect to a Jupyter notebook. His approach centered around using dask_jobqueue to submit jobs corresponding to dask workers. Unfortunately we had to discard this approach because the OLCF policy for Summit was to have a maximum of five running jobs, which meant there’d be an explicit upper-bound of five dask workers. (And that presumes that the scheduler and the actual main computational task shares one of those batch jobs, otherwise the limit would be for three dask workers. That is, one batch job would be for the scheduler, three each for a dask worker, and the remaining batch job for the dask client.)

As an aside, @mrocklin ‘s article did mention turning off file locking in the dask configuration file top compensate for the fact that Summit mounts home directories as read-only during runs. (Which would obviously cause subpar performance since those would fail.)

In previous work, we had used the python package schwimmbad to implement a kind of map/reduce approach for evaluating individuals in an evolutionary algorithm (EA). To support that, we had a single LSF job that had separate jsrun commands — one for the scheduler and another for the EA, itself. The underlying communication for schwimmbad between the workers and the EA was done via MPI calls.

Inspired by this, we similarly created a single LSF batch script that had a jsrun for the scheduler, another to spawn the workers, and finally one jsrun for the client, itself. Here is an example of such a script cribbed from one that I actually used on summit.

The first jsrun is this:

jsrun --smpiargs="none" --gpu_per_rs 0 --nrs 1 --tasks_per_rs 1 --cpu_per_rs 1 --rs_per_host 1 dask-scheduler --interface ib0 --no-dashboard --no-show --scheduler-file $SCHEDULER_FILE &

Which just runs the scheduler on a single arbitrary core and advises the scheduler to use the available high-speed infiniband connection. We also don’t need the bokeh dashboard. (Though @mrocklin shows how to do that in the aforementioned article if you’re interested in spinning one up.). The --smpiargs argument is to specify that we are not using MPI since dask uses cloud pickle over the wire to slosh things between the scheduler, workers, and client. (Which is a relief because MPI can get a little hairy to deal with on Summit.). Note the closing & to run this task in the background.

The second jsrun fans out the dask workers.

jsrun --smpiargs="none" --nrs $NUM_WORKERS -e individual --stdio_stdout ${RUN_DIR}/worker_out.%h.%j.%t.%p --stdio_stderr ${RUN_DIR}/worker_error.%h.%j.%t.%p --tasks_per_rs 1 --cpu_per_rs 1 --gpu_per_rs 1 --rs_per_host 6 --latency_priority gpu-cpu dask-worker --nthreads 1 --nprocs 1 --interface ib0 --no-dashboard --reconnect --scheduler-file $SCHEDULER_FILE &

This fans out as many workers as Summit “resource sets”, which will be six resource sets per node in the example — essentially giving a single GPU per worker. Obviously you’ll want to tailor the allocation to a configuration that makes sense for your problem. The -e individual and --stdio_stdout and stdio_stderr will capture the stdout and stderr for each worker in a separate file. The default is just to throw everything into a single file, which makes poring over the output challenging. It’s far better to find a corresponding stdout/stderr for a given worker and directly look at the specific file to find what you need when analyzing run results.

Resource sets don’t come up instantly

Workers don’t come up instantly, and can actually take several minutes to come up. In fact, I’ve had runs take 10 to 20 minutes to spin-up all the dask workers for a run.

This means you should delay a bit before starting the client. You can take a best guess at how long to sleep, or you can take out the guess work by using a simple script to wait until a certain number of scripts. With that in mind, I wrote wait_for_workers.py to wait for the scheduler to register a certain target number of workers to then gracefully exit to presumably then execute the jsrun for the dask client script.

One niggling detail. Don’t have that script wait for all the workers to register with the scheduler because if any fail for any reason at all, then you will never get the total number registered you were looking for, and then that script will hang for the entire run duration. Instead, shoot for a much lower number, say, half the total workers. That’s enough for dask to start working on your tasks, and you’ll be sure to always start your client.

So, this is how we wait for workers:

jsrun --smpiargs="off" --nrs 1 --cpu_per_rs 1 --gpu_per_rs 0 --rs_per_host 1 --latency_priority cpu-cpu python3 -u /ccs/proj/csc363/may/carla_imitation_learning/gremlin/wait_for_workers.py --verbose --target-workers $target_num_workers --pause-time 5 --maximum-wait-time 5 --scheduler-timeout 60 $SCHEDULER_FILE

Note there is no trailing & since we want to block until the target number of workers are registered with the scheduler. Also, this uses the same resource allocation as the scheduler — one CPU and no GPUs.

The last jsrun is for the actual dask client and similarly doesn’t have a trailing & because otherwise the batch job would finish right away without giving a chance for the client to do its thing.

The final jskill is to ensure that we kill off the scheduler and workers when the client is finished.

In subsequent comments for this issue I’ll share some tuning tips and gotchas I’ve learned along the way.

@markcoletti
Copy link
Author

I'll need to update the referred to github LSR script and make some minor edits, but I've got a meeting, so that will have to wait. :(

@jrbourbeau
Copy link
Member

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

You should not use dask_jobqueue on Summit

Are you saying that you shouldn't launch tens of thousands of jobs? Hrm, who knew? :)

I'm curious about why MPI would not have been a good choice here.

I'm also curious if there is some modification of dask-jobqueue that would make sense for very large jobs, but my guess is that when we get to the scale of Summit, every machine will be custom enough so that custom scripts provided by the HPC admin staff may be best. Would you agree with this? Or is there something that the Dask development community can do to make this job easier?

@markcoletti
Copy link
Author

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

Well, now I feel foolish as that's exactly the kind of functionality I was looking for! Thank you! (So, this issue is already paying a dividend!)

The general guidance should still stand that you can wait for a subset of the total target workers to begin work. I've been recently bit by waiting for all the workers when some failed, which means waiting indefinitely. And, the larger the number of nodes allocated, the likelier some of those will fail.

@markcoletti
Copy link
Author

You should not use dask_jobqueue on Summit

Are you saying that you shouldn't launch tens of thousands of jobs? Hrm, who knew? :)

Well, yeah, the folks in the OLCF would not be your biggest fan. ;)

That said, I've been thinking of what I call a "slow cooker" approach that would use dask_jobqueue in that you can have a long running evolutionary algorithm that just submits jobs for each evaluation. When the evaluation is done, the dask worker spins down to free up the limited cluster resource. Then when the EA wants to evaluate another individual, it will submit another job for that task. That way you can share the cluster with other users while still getting results from the EA.

I've shared this idea with the folks running the local cluster here at ORNL, and they are eager to give that a go.

I'm curious about why MPI would not have been a good choice here.

I was bit on Titan in that it was difficult to get MPI to work well with schwimmbad. And then it happened again on Summit. Let's just say that I had to work with the OLCF support staff to find the correct magic environment variables to set to appease MPI. This is what we get when IBM decides to just write its own MPI implementation from scratch instead of porting over one of the well-established MPI libraries to the PowerPC architecture.

Mind, my last tussle with MPI on Summit was over a year ago when the machine still had its new car smell. It should (hopefully) be a lot easier to deal with MPI on Summit now.

I'm also curious if there is some modification of dask-jobqueue that would make sense for very large jobs, but my guess is that when we get to the scale of Summit, every machine will be custom enough so that custom scripts provided by the HPC admin staff may be best. Would you agree with this? Or is there something that the Dask development community can do to make this job easier?

Using the single batch script works for up to 402 nodes, but it will take some tuning of configuration variables to get things to work at larger number of nodes. For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs. (And I fear I have more tweaking to do down that road.). I also have the scheduler running on just a single CPU, and am wondering if it would take advantage of one or two more CPUs. I'm thinking not since the scheduler is single-threaded (I think). What do you think?

There might be other things the dask community can do, which hopefully we can explore here.

I'll be sharing more later, but I do have to revert to my other duties. I can't believe how much I've already written! Ha!

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs

To be clear, these are Dask settings? Ideally we would make this easier, either by publishing a more robust configuration that users could drop in as necessary, or maybe even by learning automatically what situation we're in and changing things around. For example, Dask heartbeats today will increase their delay automatically based on how many other workers there are (more workers, longer heartbeats). There are probably other things we can do to reduce the number of knobs that you had to turn.

Mind, my last tussle with MPI on Summit was over a year ago when the machine still had its new car smell. It should (hopefully) be a lot easier to deal with MPI on Summit now.

The nice thing about MPI is that then we share these issues with every other project. Probably OLCF folks have decent docs about how to run MPI well.

I also have the scheduler running on just a single CPU, and am wondering if it would take advantage of one or two more CPUs. I'm thinking not since the scheduler is single-threaded (I think). What do you think?

Yes, today the scheduler will mostly use just one CPU (you might throw on one more, just for compression/serialization etc, which can be offloaded to another thread). We could look at multi-core scheduling. That would be a fun project I think, but probably also a larger engagement than anyone is going to do in nights-and-weekends time. The other option here would be to start a major profiling effort around the scheduler, and see if there are hotspots. I wouldn't be surprised if there is some decent speedup still left to be found there.

@markcoletti
Copy link
Author

markcoletti commented Apr 9, 2020

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

I just checked here and didn't see that listed in the summary of member functions for the Client API at the top of that page. (Which is how I missed it the first go! :( ). Oddly enough, it is described in detail later on the same page.

@jrbourbeau
Copy link
Member

Thanks for the heads up, I've added to the table at the top of the page over in #3692

@markcoletti
Copy link
Author

markcoletti commented Apr 9, 2020

For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs

To be clear, these are Dask settings? Ideally we would make this easier, either by publishing a more robust configuration that users could drop in as necessary, or maybe even by learning automatically what situation we're in and changing things around. For example, Dask heartbeats today will increase their delay automatically based on how many other workers there are (more workers, longer heartbeats). There are probably other things we can do to reduce the number of knobs that you had to turn.

I now worry that by manually setting things I've therefore disabled dynamic behavior that would have helped. Here is the distributed.yaml.gz I used.

It may be beneficial to reset to the default configuration, run with that, and then get feedback from you folks and suggested settings.

Do you guys have a Slack dedicated to dask distributed? It may be easier to do this interactively.

(Though it's nearly 5pm, so I'll be switching over to other tasking.)

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020 via email

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

There is https://gitter.im/dask/dask , but you'll almost certainly get better response times on github (few of the devs track the real-time chat channel). If you want to go high bandwidth, I'm happy to engage with a video chat.

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

Also, in case you want to read a screed on Slack: http://matthewrocklin.com/blog/2019/02/28/slack-github :)

@rcthomas
Copy link

rcthomas commented Apr 9, 2020

Hi @piprrr thank you for posting this it's been very interesting. A short couple comments:

(1) We at NERSC think of dask-jobqueue as a solution for people who don't want to write batch scripts. We think some contributions there might make it more useful, but basically that's it. If you want all your workers at once this is just the wrong thing, and it makes no sense to me to sit there and wait for the resources to build up. Ideally you can start feeding work to it and new jobs just add workers. Some people can use that, but sometimes you just want everything at once and to get going. We don't think a blanket statement like "you shouldn't use it" is what we would say. But I've also seen the reflexive "I want to run Dask on a Cray" answer "oh you need dask-jobqueue" --- that is just not right either. :)

(2) Lately we have been really thinking that for "I need all my workers now, and I need them at scale" at NERSC the answer is dask-mpi and containers (Shifter). We're trying to package this up and reconcile that against our queues. This seems to be good for the 1K-worker class of Dask job. I've tried 8K sized clusters and I'm not able to keep the entire cluster alive but amazingly all the work got done.

Would love to get together and compare notes and learn from your large scale experiences.

@mrocklin
Copy link
Member

mrocklin commented Apr 10, 2020 via email

@rcthomas
Copy link

@mrocklin The 8K run was a scaling experiment in the earlier days of Cori where I pulled out all stops, used containers, upped all timeouts I could find. During the run, at no given instant were all the workers alive, I don't think it ever achieved 8K, but all tasks completed. I did not expect this really to work, and it's against all the guidance I had to that point about whether what I was doing made sense. <4K workers seemed more reliable for the simple test problem I was doing.

This was a couple years ago now, it was a Friday afternoon experiment, and Dask was more youthful. If we were to repeat that kind of experiment today I don't know what we would find. Most experiments I've tried, or demo'd, or showed you (thanks) were at the hundreds-to-thousand worker level and this is mostly just fine these days and seems appropriate to the problems we're looking at.

On community. What has worked for Jupyter in HPC I think, is (1) we had a workshop that that helped us learn who was doing what at what facility and (2) out of that we were able to put together a few interest networks of people who periodically get together to chase issues, discuss developments, share lessons learned, best practices, and plan future events. What has not worked well is mailing lists (they just go silent); a Slack channel was set up but also doesn't see much traffic. Vendor engagement is a plus.

The Dask developer workshop held a couple months ago went some distance and I was able to make connections and build understanding of what the community is doing but HPC representation was smallish and there is (for now, but my guess is it won't last forever) a gap between enterprise and HPC Dask. Seeing what's going on in enterprise is very exciting and educational but not always immediately applicable.

@jakirkham
Copy link
Member

@rabernat @jhamman , I’m curious if either of you explored this upper bound of workers and how many you were able to keep running at a time 🙂

@markcoletti
Copy link
Author

I'm chuffed that this issue has spawned a lively conversation, and hopefully one that can be continued, if not here, then perhaps at a more appropriate venue. I'm definitely curious about distributed dask conferences or workshops; hopefully I can participate in a future one, post-pandemic.

A future possibility is UCAR's Software Engineering Assembly's annual Improving Scientific Software Conference and Tutorials. It's canceled this year because of COVID, of course, but it would be an excellent opportunity next year to talk about distributed dask to the scientific community, and to offer distributed dask related tutorials.

https://sea.ucar.edu/conference/2020

@markcoletti
Copy link
Author

Latest 921 node issues on Summit

And now to share the results of the latest batch job on Summit where I am struggling to get six workers per node on a total of 921 nodes to do some work.

The first diagnostic is this:

distributed.scheduler - INFO - Receive client connection: Client-8231f578-7d52-11ea-ac20-70e284145d13 distributed.core - INFO - Starting established connection distributed.core - INFO - Event loop was unresponsive in Scheduler for 33.24s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

What's interesting is that happens at the start of the run, and theClient in question is my "wait for workers" script that doesn't submit tasks; it just waits for a certain number of workers to spin up, or for a certain amount of time to go by, whichever comes first, and then gracefully exists so that the actual Client that has real tasks can get started. So I dunno what the scheduler is doing for 33 seconds.

Then there are a lot of these, which is normal and to be expected:
distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.13.30:43583', name: tcp://10.41.13.30:43583, memory: 0, processing: 0> distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.13.30:43583
In the middle of all the worker registration messages, I see this:
distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.16.205:37069', name: tcp://10.41.16.205:37069, memory: 0, processing: 0> task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200002ee6df8>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]> Task was destroyed but it is pending! task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200002f22c48>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003a1f8>()]> cb=[_release_waiter(<Future pendi...002f22e88>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003a9d8>()]> cb=[_release_waiter(<Future pendi...002ee6df8>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003aaf8>()]> cb=[_release_waiter(<Future pendi...002f22c48>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> distributed.core - INFO - Event loop was unresponsive in Scheduler for 13.06s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Then I see signs that the wait for worker process has ended successfully, and the actual Client that's to do real work, which is an evolutionary algorithm, is spinning up:
distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.19.232:38707', name: tcp://10.41.19.232:38707, memory: 0, processing: 0> INFO:__main__:workers: 176 init pop size: 5526 max births: 27630, bag size: 5526 INFO:root:Using a remote distributed model DEBUG:asyncio:Using selector: EpollSelector DEBUG:asyncio:Using selector: EpollSelector distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.19.232:38707 distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.13.94:42167', name: tcp://10.41.13.94:42167, memory: 0, processing: 0> distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.13.94:42167

Mind, workers are still getting registered, which is OK. Since dask is essentially a producing/consumer queuing system, we can already give existing workers something to do while the rest register and warm up.

distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.19.232:38707', name: tcp://10.41.19.232:38707, memory: 0, processing: 0> INFO:__main__:workers: 176 init pop size: 5526 max births: 27630, bag size: 5526 INFO:root:Using a remote distributed model DEBUG:asyncio:Using selector: EpollSelector DEBUG:asyncio:Using selector: EpollSelector distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.19.232:38707 distributed.core - INFO - Starting established connection

And this is the EA flaming out, and which suggests that the Client threw an exception (hence the client being undefined):
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.10.121:34115', name: tcp://10.41.10.121:34115, memory: 0, processing: 0> CRITICAL:__main__:Timed out trying to connect to 'tcp://10.41.5.66:8786' after 30 s: Timed out trying to connect to 'tcp://10.41.5.66:8786' after 30 s: connect() didn't finish in time Traceback (most recent call last): File "/ccs/proj/csc363//may/carla_imitation_learning/gremlin/summit/sleeper.py", line 132, in <module> client.close() NameError: name 'client' is not defined

All told 3,935 workers were registered, according a grep of the output, when we wanted 5,526. However, what likely happened is that the client failed, and then killed the whole job while the balance of workers were still coming up. I feel that the problem was that the client couldn't connect to the scheduler within 30 seconds and failed.

To resolve this issue, I'm going to bump the scheduler timeout to something ridiculous, like 10 minutes, which I hope resolves the immediate problem of the EA failing. However, I am concerned about the other warnings that cropped up during the run. Are those things I should be concerned about? And, if so, does anyone have good guidance on how to resolve them?

@mrocklin
Copy link
Member

Thank you for the update.

Bumping up timeouts generally would be good.

I don't actually have strong intuition on what would be causing the slowdown here. In principle I would love to see profiling information if we can get it, although profiling in this context might be hard.

You could try running the scheduler with the builtin cProfile profiler. You could also try using the performance_report functionality built into Dask. I'm not sure that either one will be very accurate here. It may also be interesting to profile this as one would profile a compiled C/C++ program. That would avoid any issues that might arise from Python itself, which is probably under some strain.

@rabernat
Copy link

Following with interest.

We have done runs with ~1000 worker on Cheyenne with no major issues. For that, we used dask-mpi, which allowed us to do it all in a single big MPI job.

My impression is that a central challenge you will have in scaling up to the level you describe is giving the workers something to do. In our experience, the dask scheduler starts to really choke when you have > 1_000_000 tasks in your graph. This central chokepoint provides a hard limit on the absolute size of computations you can do with dask, regardless of the number of workers in your cluster.

@mrocklin
Copy link
Member

the dask scheduler starts to really choke when you have > 1_000_000 tasks in your graph.

In what way did it choke? If my memory serves in your case it wasn't able to respond at interactive time scales (things would sit still for minutes). This is certainly an issue and something that we would love help in fixing. I think that the thing to do here is to start doing some more intense profiling. If anyone has experience profiling networking applications like this, that would be of great help.

However, this limit may also not be as bad if we're talking about non-interactive jobs, which may be the case for the folks on Summit. (It's still important there as well, but we probably have another order of magnitude to play with).

@markcoletti
Copy link
Author

markcoletti commented Apr 15, 2020

Thank you for the update.

No problem! And sorry for being silent for a week as I was wrapped up with other tasking.

Bumping up timeouts generally would be good.

Duly noted. And I'll share if the 10 minutes works. (And, which may be overkill. However, we could probably use these results to optimize the dask "autotune" feature to dynamically adjust timeouts based on the number of workers.)

[...] In principle I would love to see profiling information if we can get it, although profiling in this context might be hard.

You could try running the scheduler with the builtin cProfile profiler. You could also try using the performance_report functionality built into Dask. [...]

I've used that for a different project, and I'll do the same for this one. I'll be happy to share the results once I get them.

Thanks again for chiming in!

@rabernat
Copy link

Yes good points Matt, and apologies for the non-technical term "choke". What I really mean is the following...

Each computation (containing N tasks) effectively has two parts, which occur in serial:

  1. the time it takes for the scheduler to prepare to execute the graph, and
  2. the time it takes the workers to actually execute it.

For small jobs 1, is very small, almost negligible. However, I have the expression that it scales very poorly, like O(N^2). I have not measured this, but someone should. Because of this very different scaling between parts 1 and 2, it places, for us, a hard limit on the number of tasks we can run at once. For example, 10_000_000 tasks takes about 30 minutes to finish step 1, and I have never run 100_000_000.

@mrocklin
Copy link
Member

Thanks Ryan. You've brought this up several times before. I think that in each time I've suggested that it would be great to reduce the per-task overhead (which I put at about 200-500us). The next step here is probably profiling, both at a Python level, and then probably at a lower C/C++ level.

@rabernat you're one of the people that could help to resolve this problem with your Pangeo resources. Maybe this is something that you could direct some of your funding or collaborators towards?

@jakirkham
Copy link
Member

We have done runs with ~1000 worker on Cheyenne with no major issues. For that, we used dask-mpi, which allowed us to do it all in a single big MPI job.

This is an interesting observation. Thanks for sharing Ryan! May be worth exploring if you have time @piprrr 😉

It's also worth noting that using job arrays in dask-jobqueue probably would address this same need ( dask/dask-jobqueue#196 ).

@markcoletti
Copy link
Author

I'm trying to avoid adding the MPI dependency. I've had bad prior experiences on Titan and Summit with MPI support. Moreover, we are having to rely on singularity containers to run some of the code, and getting MPI to work from within the container is going to be a little tricky. So, if we can have code that doesn't rely on MPI, as is the case with basic distributed dask, then all the better.

However, if I continue to struggle scaling up to more nodes on Summit, then I will definitely give MPI-aware distributed dask a look. I'll let you know if I do!

@markcoletti
Copy link
Author

The 2,764 node job failed, and I may need help reading the tea leaves on this one.

First, the job ran for the fully allotted time, but was hung and killed by the system:

User defined signal 2
Could not read jskill result from pmix server
Could not read jskill result from pmix server
Could not read jskill result from pmix server

I look in the output and see that it had only gotten so far in allocating workers:

Waiting for workers
Deadline set for 5 minutes
Target workers: 8292
Scheduler timeout: 600 secs
Pause time: 5 secs
Scheduler file: /gpfs/alpine/csc363/scratch/mcoletti/sleeper/2764_node/scheduler_file.json
seconds number of workers
0.00    7
130.17  50
135.18  136
140.20  209
145.21  259
150.24  352
155.28  536
160.32  657
165.72  862
172.03  1418
180.27  1912

Yes, I could have used Client.wait_for_workers(), but my wait_for_workers.py script gives a little more detail on worker allocation. In any case, the maximum total number of workers was 16584, but I had set the wait for workers to allow the EA to start after only half of those were allocated, or 8292, or five minutes, whichever came first. However, it is in that process that execution hung, so it was during a call to client.scheduler_info() that things got wedged.

I had the job configured so that each individual worker saved its respective stdout and stderrr to its own file, so I went to the stderr for some, and this is what I saw:

Traceback (most recent call last):
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
    finalizer()
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory

and:

FileNotFoundError: [Errno 2] No such file or directory
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
distributed.nanny - INFO - Closing Nanny at 'tcp://10.41.0.101:38369'
distributed.dask_worker - INFO - End worker

I have no idea what file it was looking for, but that would appear to be the ultimate source of the problem. Any ideas on how to proceed?

@mrocklin
Copy link
Member

mrocklin commented May 4, 2020 via email

@markcoletti
Copy link
Author

That seems reasonable, so I've resubmitted the job sans nannies. I'll let you know the outcome as soon as the job runs, which may not be for several days.

(We're wayyyy over quota for that Summit account, and are rightfully being dinged for that for submitted jobs.)

@mrocklin
Copy link
Member

mrocklin commented May 4, 2020 via email

@markcoletti
Copy link
Author

I have plans to do some post mortem analysis of the runs, and one of the things I will be looking for will be "Did all the workers do work?" Given that some of the workers will still be coming up when the main program runs, I expect there to be a little non-uniformity in the load distribution, but hopefully all workers will have done something. I also plan on doing later runs with performance reports toggled on to see if that yields anything useful, but I've not done so now for fear that may trigger problems.

@markcoletti
Copy link
Author

2,764 node job failed again, which is getting tiresome.

The wait-for-workers utility died with this error:

Raised CommClosedError('in <closed TCP>: TimeoutError: [Errno 110] Connection timed out') during execution
Wait for workers failed. Aborting.

The only thing it's connecting with is the scheduler, and the scheduler timeout was ten minutes. I ... guess ... I could up that? :-\

And looking through the error logs I saw 3,043 instances of this error:

distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>

So, made these changes to ~/.config/dask/distributed.yaml:

@@ -23,7 +23,7 @@ distributed:
     transition-log-length: 100000
     work-stealing: True     # workers should steal tasks from each other
     work-stealing-interval: 100ms  # Callback time for work stealing
-    worker-ttl: 300s        # like '60s'. Time to live for workers.  They must heartbeat faster than this
+    worker-ttl: 900s        # like '60s'. Time to live for workers.  They must heartbeat faster than this
     pickle: True            # Is the scheduler allowed to deserialize arbitrary bytestrings
     preload: []
     preload-argv: []
@@ -94,8 +94,8 @@ distributed:
       threads: -1    # Threads to use. 0 for single-threaded, -1 to infer from cpu count.

     timeouts:
-      connect: 60s          # time before connecting fails
-      tcp: 90s              # time before calling an unresponsive connection dead
+      connect: 600s          # time before connecting fails
+      tcp: 900s              # time before calling an unresponsive connection dead

@mrocklin
Copy link
Member

mrocklin commented May 6, 2020 via email

@markcoletti
Copy link
Author

Well, it doesn't cost me anything to jigger some things and resubmit the jobs. I can carry on other tasking while the jobs are pending. However, I'd be interested in pointers in trying to dig a little deeper into what some of the bottlenecks might be. I suspect the biggest one is going to be the scheduler, though I've given that 2 CPUs to crank on.

Honestly, for the stuff we're trying to do, I think even grabbing 900 nodes, which I've proved works, is probably adequate for most of our problems. However, our flagship distributed EA, MENNDL, can run on the whole machine, so I'd like to at least demonstrate similar capability for the distributed dask based code.

I also haven't tried the MPI-based distributed dask, which may be a direction I go next, even though it's only responsible for setting up workers.

@mrocklin
Copy link
Member

mrocklin commented May 6, 2020 via email

@jakirkham
Copy link
Member

There's already UCX-Py. Maybe try that before writing a new comm? 😉

cc @quasiben @pentschev

@markcoletti
Copy link
Author

Here's an overdue update: I'm still struggling to get the 2,764 node runs to work with dask. Progress is slow because we're well over our allotment of Summit hours; we can still run jobs, it just might take a few days for them to do so.

In any case, the last run had some new errors, which I count as a sort of progress.

First were the usual unresponsive Scheduler and Worker failed to hearbeat errors:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 6.13s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.ributed.scheduler - WARNING - Worker failed to heartbeat within 900 seconds. Closing: <Worker 'tcp://10.41.1.217:43433', name: tcp://10.41.1.217:43433, memory: 0, processing: 0>

Fair enough, but there were some new errors afoot this go round.


distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.77s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
Future exception was never retrieved
future: <Future finished exception=OSError("Timed out trying to connect to 'tcp://10.41.0.221:32929' after 600 s: Timed out trying to connect to 'tcp://10.41.0.221:32929' after 600 s: connect() didn't finish in time")>
Traceback (most recent call last):
  File "/autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/core.py", line 232, in connect
    _raise(error)
  File "/autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/core.py", line 213, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.41.0.221:40215' after 600 s: connect() didn't finish in time

I'm guessing a Future finished, but had difficulty propagating back to the Client. If so, this is troubling.

I also observed this error:


ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200003d4de88>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]>
ERROR:asyncio:Task was destroyed but it is pending!

I'm not entirely sure what's up with that.

Moving forward, I've removed a call to my wait_for_workers.py in the batch submission script, and added a call to Client.wait_for_workers(). My hypothesis is that having two back-to-back client connections may be causing problems, especially if workers are still coming on line. I confess that's a flimsy model for what's going on, but this was something I was going to implement anyway, and I may as well give it a go before proceeding on to the next attempt.

If this fails, I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose.

(Again, we're avoiding MPI because on Summit we must use IBM's proprietary Spectrum MPI library that's a fork of OpenMPI burdened by DRM garbage that's not exactly a joy to get to work inside a Singularity container. But, if we must suffer that option to get whole machine runs, then suffer we shall.)

@jakirkham
Copy link
Member

Are you using array jobs to submit the workers or is each worker its own job?

@benjha
Copy link

benjha commented May 22, 2020

There's already UCX-Py. Maybe try that before writing a new comm? 😉

cc @quasiben @pentschev

@jakirkham So, as far as I understand UCX-py efforts are for dask-cuda and, while compiling it, we can explicitly set UCX dependencies, but for DASK, which is already available for ppc64le, the question is if DASK latest versions has already been built with UCX.

@markcoletti
Copy link
Author

markcoletti commented May 22, 2020

Are you using array jobs to submit the workers or is each worker its own job?

We chose to use a single job approach because there is a tight limit on the number of concurrent jobs on Summit, which is five. Since we're going for full machine runs, that means we need to do everything within a single job since 27,648 jobs (4,608 nodes x six workers per node) running simultaneously is wellll over that five job limit. ;)

And, just to be clear, we're currently shooting to get just half the machine to run, or 2,764 nodes. Once we get distributed dask to work at that level, we'll shoot for the whole enchilada.

For those familiar with Summit's batch queueing system, LSF, we use a single jsrun statement to fan out the dask workers. Here's an example:

jsrun --smpiargs="off" --nrs $NUM_WORKERS -e individual --stdio_stdout ${RUN_DIR}/worker_out.%h.%j.%t.%p --stdio_stderr ${RUN_DIR}/worker_error.%h.%j.%t.%p --tasks_per_rs 1 --cpu_per_rs 1 --gpu_per_rs 1 --rs_per_host 6 --latency_priority gpu-cpu dask-worker --nthreads 1 --nprocs 1 --interface ib0 --no-dashboard --no-nanny --reconnect --scheduler-file $SCHEDULER_FILE &

@mrocklin
Copy link
Member

I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose

I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. We could make a proper Dask MPI communication system, but that would require a bit of work. I'm guessing a week or two of a full-time dev who had done this before.

cc'ing @fjetter who might have thoughts on stability with large numbers of workers

Regarding UCX I think that it would be interesting to try, mostly because it would help to isolate the probably away from TCP connections.

If I were to try to isolate the problem here I would probably start by running this on a smaller set of workers (merely 1000 or so :) ) and do some profiling. I would hope that that would highlight some bottleneck that a small dev investment could resolve. Then I would increase from there. Going directly for thousands of nodes without profiling seems potentially wasteful to me. This could be any of a hundred different issues. I think that we'll need to figure out what's going on before we try to push harder.

@markcoletti
Copy link
Author

I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose

I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. [...]

Ok, thanks for the heads up, Matthew.

[...]
If I were to try to isolate the problem here I would probably start by running this on a smaller set of workers (merely 1000 or so :) ) and do some profiling. I would hope that that would highlight some bottleneck that a small dev investment could resolve. Then I would increase from there. Going directly for thousands of nodes without profiling seems potentially wasteful to me. This could be any of a hundred different issues. I think that we'll need to figure out what's going on before we try to push harder.

I'm not a distributed dask developer, but I'm willing to share profiling information here. I suspect any identified bottlenecks on Summit are likely to manifest, albeit likelier in lesser form, for others.

I suspect that the bottlenecks are going to be in the scheduler. For example, just monitoring all the worker heartbeats is a burdensome task, I imagine.

@mrocklin
Copy link
Member

I suspect that the bottlenecks are going to be in the scheduler. For example, just monitoring all the worker heartbeats is a burdensome task, I imagine.

Yes, I suspect that that's exactly the kind of thing that we'll run into. Actually, we already did with previous systems. Today worker heartbeats slow down based on the number of workers. There are probably other issues that will arise though that we haven't yet accounted for.

@jacobtomlinson
Copy link
Member

@benjha UCX-py is a generic UCX library for Python. It is useful in dask-cuda as UCX supports NVLINK but it is also useful for pure Infiniband setups.

@jakirkham
Copy link
Member

Yeah to complement what @jacobtomlinson already said @benjha. Dask already has UCX-Py support baked in.

Dask-CUDA is really designed for single node deployments atm. Though there may be pieces we pull from it.

That all being said, would be curious what @piprrr is able to accomplish just by adding UCX-Py.

@mmohrhard
Copy link
Contributor

Just in case you are not aware but the heartbeat interval is limited to a max of 5 seconds. With your 10k+ dask workers that you are trying to run your scheduler has less than 0.5ms per heartbeat before just heartbeats are causing delays for all scheduler actions.

In our fork we have limited the number of heartbeats per second to 50 which has helped quite a bit in large jobs. You can find the code for the heartbeat interval at

def heartbeat_interval(n):

Note that not limiting the heartbeat interval requires that you also update code that relies on the heartbeat interval (e.g.
async def check_worker_ttl(self):
).

@TomAugspurger
Copy link
Member

Thanks for pointing that out. I opened #3877 to make configuring the heartbeat easier.

@mrocklin
Copy link
Member

mrocklin commented Jun 10, 2020 via email

@markcoletti
Copy link
Author

This is a very useful feature, and I'll be happy to test it on Summit, though it may be a little while before I can do so. This does address my suspicion that it was the scheduler's heartbeat management that was causing us problems.

@aamirshafi
Copy link

aamirshafi commented Jun 30, 2020

I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. We could make a proper Dask MPI communication system, but that would require a bit of work. I'm guessing a week or two of a full-time dev who had done this before.

Could you please explain such a Dask MPI communication system? Do you have something similar to UCX-Py/TCP kind of backends that already exist in mind or is this going to be complete revamp of Dask Distributed package.

Thanks.

@mrocklin
Copy link
Member

mrocklin commented Jun 30, 2020 via email

@aamirshafi
Copy link

Thanks for the response.

Dask's communication layer has connect() and listen() functions that do not have direct parallels in the MPI world. Do you have any thoughts on how to handle connection establishment with MPI? I am assuming that something like dask-mpi will be used to start schedulers and workers but in that case, MPI and all processes will already be initialized and there will be no need for connect() and listen().

Also in Dask Distributed, workers create new connections amongst themselves while the client code is executing. Is there a way to configure Dask Distributed to not make dynamic connections between workers on the fly and instead create them in the beginning only?

@mrocklin
Copy link
Member

mrocklin commented Jun 30, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests