-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scaling distributed dask to run 27,648 dask workers on the Summit supercomputer #3691
Comments
I'll be posting a follow-up shortly that shares what we've done, what we're doing now, and possible strategies for resolving future problems in scaling up. |
I'm very excited to track this issue. Thank you @piprrr for recording your efforts (and of course, for the efforts themselves). |
You should not use
|
I'll need to update the referred to github LSR script and make some minor edits, but I've got a meeting, so that will have to wait. :( |
Thanks @piprrr for opening up this issue and documenting your process on Summit!
Just as a side note, there's a |
Are you saying that you shouldn't launch tens of thousands of jobs? Hrm, who knew? :) I'm curious about why MPI would not have been a good choice here. I'm also curious if there is some modification of dask-jobqueue that would make sense for very large jobs, but my guess is that when we get to the scale of Summit, every machine will be custom enough so that custom scripts provided by the HPC admin staff may be best. Would you agree with this? Or is there something that the Dask development community can do to make this job easier? |
Well, now I feel foolish as that's exactly the kind of functionality I was looking for! Thank you! (So, this issue is already paying a dividend!) The general guidance should still stand that you can wait for a subset of the total target workers to begin work. I've been recently bit by waiting for all the workers when some failed, which means waiting indefinitely. And, the larger the number of nodes allocated, the likelier some of those will fail. |
Well, yeah, the folks in the OLCF would not be your biggest fan. ;) That said, I've been thinking of what I call a "slow cooker" approach that would use I've shared this idea with the folks running the local cluster here at ORNL, and they are eager to give that a go.
I was bit on Titan in that it was difficult to get MPI to work well with schwimmbad. And then it happened again on Summit. Let's just say that I had to work with the OLCF support staff to find the correct magic environment variables to set to appease MPI. This is what we get when IBM decides to just write its own MPI implementation from scratch instead of porting over one of the well-established MPI libraries to the PowerPC architecture. Mind, my last tussle with MPI on Summit was over a year ago when the machine still had its new car smell. It should (hopefully) be a lot easier to deal with MPI on Summit now.
Using the single batch script works for up to 402 nodes, but it will take some tuning of configuration variables to get things to work at larger number of nodes. For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs. (And I fear I have more tweaking to do down that road.). I also have the scheduler running on just a single CPU, and am wondering if it would take advantage of one or two more CPUs. I'm thinking not since the scheduler is single-threaded (I think). What do you think? There might be other things the dask community can do, which hopefully we can explore here. I'll be sharing more later, but I do have to revert to my other duties. I can't believe how much I've already written! Ha! |
To be clear, these are Dask settings? Ideally we would make this easier, either by publishing a more robust configuration that users could drop in as necessary, or maybe even by learning automatically what situation we're in and changing things around. For example, Dask heartbeats today will increase their delay automatically based on how many other workers there are (more workers, longer heartbeats). There are probably other things we can do to reduce the number of knobs that you had to turn.
The nice thing about MPI is that then we share these issues with every other project. Probably OLCF folks have decent docs about how to run MPI well.
Yes, today the scheduler will mostly use just one CPU (you might throw on one more, just for compression/serialization etc, which can be offloaded to another thread). We could look at multi-core scheduling. That would be a fun project I think, but probably also a larger engagement than anyone is going to do in nights-and-weekends time. The other option here would be to start a major profiling effort around the scheduler, and see if there are hotspots. I wouldn't be surprised if there is some decent speedup still left to be found there. |
I just checked here and didn't see that listed in the summary of member functions for the |
Thanks for the heads up, I've added to the table at the top of the page over in #3692 |
I now worry that by manually setting things I've therefore disabled dynamic behavior that would have helped. Here is the distributed.yaml.gz I used. It may be beneficial to reset to the default configuration, run with that, and then get feedback from you folks and suggested settings. Do you guys have a Slack dedicated to dask distributed? It may be easier to do this interactively. (Though it's nearly 5pm, so I'll be switching over to other tasking.) |
And look at that, this issue is paying dividends for the Dask project
already too :)
…On Thu, Apr 9, 2020 at 1:46 PM James Bourbeau ***@***.***> wrote:
Thanks for the heads up, I've added to the table at the top of the page
over in #3692 <#3692>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTHXDBZRU2RGXSSO63TRLYXZVANCNFSM4ME452QQ>
.
|
There is https://gitter.im/dask/dask , but you'll almost certainly get better response times on github (few of the devs track the real-time chat channel). If you want to go high bandwidth, I'm happy to engage with a video chat. |
Also, in case you want to read a screed on Slack: http://matthewrocklin.com/blog/2019/02/28/slack-github :) |
Hi @piprrr thank you for posting this it's been very interesting. A short couple comments: (1) We at NERSC think of (2) Lately we have been really thinking that for "I need all my workers now, and I need them at scale" at NERSC the answer is Would love to get together and compare notes and learn from your large scale experiences. |
I've tried 8K sized clusters and I'm not able to keep the entire cluster
alive but amazingly all the work got done
I would be curious to hear more about this.
On another note, what is the right way to build some sense of community
among "HPC admins running Dask". There are a few groups doing this, what
would help you all to share information?
…On Thu, Apr 9, 2020 at 2:44 PM R. C. Thomas ***@***.***> wrote:
Hi @piprrr <https://github.com/piprrr> thank you for posting this it's
been very interesting. A short couple comments:
(1) We at NERSC think of dask-jobqueue as a solution for people who don't
want to write batch scripts. We think some contributions there might make
it more useful, but basically that's it. If you want all your workers at
once this is just the wrong thing, and it makes no sense to me to sit there
and wait for the resources to build up. Ideally you can start feeding work
to it and new jobs just add workers. Some people can use that, but
sometimes you just want everything at once and to get going. We don't think
a blanket statement like "you shouldn't use it" is what we would say. But
I've also seen the reflexive "I want to run Dask on a Cray" answer "oh you
need dask-jobqueue" --- that is just not right either. :)
(2) Lately we have been really thinking that for "I need all my workers
now, and I need them at scale" at NERSC the answer is dask-mpi and
containers (Shifter). We're trying to package this up and reconcile that
against our queues. This seems to be good for the 1K-worker class of Dask
job. I've tried 8K sized clusters and I'm not able to keep the entire
cluster alive but amazingly all the work got done.
Would love to get together and compare notes and learn from your large
scale experiences.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTAIR5N4B4REV5DTKDLRLY6STANCNFSM4ME452QQ>
.
|
@mrocklin The 8K run was a scaling experiment in the earlier days of Cori where I pulled out all stops, used containers, upped all timeouts I could find. During the run, at no given instant were all the workers alive, I don't think it ever achieved 8K, but all tasks completed. I did not expect this really to work, and it's against all the guidance I had to that point about whether what I was doing made sense. <4K workers seemed more reliable for the simple test problem I was doing. This was a couple years ago now, it was a Friday afternoon experiment, and Dask was more youthful. If we were to repeat that kind of experiment today I don't know what we would find. Most experiments I've tried, or demo'd, or showed you (thanks) were at the hundreds-to-thousand worker level and this is mostly just fine these days and seems appropriate to the problems we're looking at. On community. What has worked for Jupyter in HPC I think, is (1) we had a workshop that that helped us learn who was doing what at what facility and (2) out of that we were able to put together a few interest networks of people who periodically get together to chase issues, discuss developments, share lessons learned, best practices, and plan future events. What has not worked well is mailing lists (they just go silent); a Slack channel was set up but also doesn't see much traffic. Vendor engagement is a plus. The Dask developer workshop held a couple months ago went some distance and I was able to make connections and build understanding of what the community is doing but HPC representation was smallish and there is (for now, but my guess is it won't last forever) a gap between enterprise and HPC Dask. Seeing what's going on in enterprise is very exciting and educational but not always immediately applicable. |
I'm chuffed that this issue has spawned a lively conversation, and hopefully one that can be continued, if not here, then perhaps at a more appropriate venue. I'm definitely curious about distributed dask conferences or workshops; hopefully I can participate in a future one, post-pandemic. A future possibility is UCAR's Software Engineering Assembly's annual Improving Scientific Software Conference and Tutorials. It's canceled this year because of COVID, of course, but it would be an excellent opportunity next year to talk about distributed dask to the scientific community, and to offer distributed dask related tutorials. |
Latest 921 node issues on SummitAnd now to share the results of the latest batch job on Summit where I am struggling to get six workers per node on a total of 921 nodes to do some work. The first diagnostic is this:
What's interesting is that happens at the start of the run, and the Then there are a lot of these, which is normal and to be expected: Then I see signs that the wait for worker process has ended successfully, and the actual Mind, workers are still getting registered, which is OK. Since dask is essentially a producing/consumer queuing system, we can already give existing workers something to do while the rest register and warm up.
And this is the EA flaming out, and which suggests that the All told 3,935 workers were registered, according a grep of the output, when we wanted 5,526. However, what likely happened is that the client failed, and then killed the whole job while the balance of workers were still coming up. I feel that the problem was that the client couldn't connect to the scheduler within 30 seconds and failed. To resolve this issue, I'm going to bump the scheduler timeout to something ridiculous, like 10 minutes, which I hope resolves the immediate problem of the EA failing. However, I am concerned about the other warnings that cropped up during the run. Are those things I should be concerned about? And, if so, does anyone have good guidance on how to resolve them? |
Thank you for the update. Bumping up timeouts generally would be good. I don't actually have strong intuition on what would be causing the slowdown here. In principle I would love to see profiling information if we can get it, although profiling in this context might be hard. You could try running the scheduler with the builtin |
Following with interest. We have done runs with ~1000 worker on Cheyenne with no major issues. For that, we used dask-mpi, which allowed us to do it all in a single big MPI job. My impression is that a central challenge you will have in scaling up to the level you describe is giving the workers something to do. In our experience, the dask scheduler starts to really choke when you have > 1_000_000 tasks in your graph. This central chokepoint provides a hard limit on the absolute size of computations you can do with dask, regardless of the number of workers in your cluster. |
In what way did it choke? If my memory serves in your case it wasn't able to respond at interactive time scales (things would sit still for minutes). This is certainly an issue and something that we would love help in fixing. I think that the thing to do here is to start doing some more intense profiling. If anyone has experience profiling networking applications like this, that would be of great help. However, this limit may also not be as bad if we're talking about non-interactive jobs, which may be the case for the folks on Summit. (It's still important there as well, but we probably have another order of magnitude to play with). |
No problem! And sorry for being silent for a week as I was wrapped up with other tasking.
Duly noted. And I'll share if the 10 minutes works. (And, which may be overkill. However, we could probably use these results to optimize the dask "autotune" feature to dynamically adjust timeouts based on the number of workers.)
I've used that for a different project, and I'll do the same for this one. I'll be happy to share the results once I get them. Thanks again for chiming in! |
Yes good points Matt, and apologies for the non-technical term "choke". What I really mean is the following... Each computation (containing N tasks) effectively has two parts, which occur in serial:
For small jobs 1, is very small, almost negligible. However, I have the expression that it scales very poorly, like O(N^2). I have not measured this, but someone should. Because of this very different scaling between parts 1 and 2, it places, for us, a hard limit on the number of tasks we can run at once. For example, 10_000_000 tasks takes about 30 minutes to finish step 1, and I have never run 100_000_000. |
Thanks Ryan. You've brought this up several times before. I think that in each time I've suggested that it would be great to reduce the per-task overhead (which I put at about 200-500us). The next step here is probably profiling, both at a Python level, and then probably at a lower C/C++ level. @rabernat you're one of the people that could help to resolve this problem with your Pangeo resources. Maybe this is something that you could direct some of your funding or collaborators towards? |
This is an interesting observation. Thanks for sharing Ryan! May be worth exploring if you have time @piprrr 😉 It's also worth noting that using job arrays in dask-jobqueue probably would address this same need ( dask/dask-jobqueue#196 ). |
I'm trying to avoid adding the MPI dependency. I've had bad prior experiences on Titan and Summit with MPI support. Moreover, we are having to rely on singularity containers to run some of the code, and getting MPI to work from within the container is going to be a little tricky. So, if we can have code that doesn't rely on MPI, as is the case with basic distributed dask, then all the better. However, if I continue to struggle scaling up to more nodes on Summit, then I will definitely give MPI-aware distributed dask a look. I'll let you know if I do! |
The 2,764 node job failed, and I may need help reading the tea leaves on this one. First, the job ran for the fully allotted time, but was hung and killed by the system:
I look in the output and see that it had only gotten so far in allocating workers:
Yes, I could have used I had the job configured so that each individual worker saved its respective stdout and stderrr to its own file, so I went to the stderr for some, and this is what I saw:
and:
I have no idea what file it was looking for, but that would appear to be the ultimate source of the problem. Any ideas on how to proceed? |
I don't know exactly what is going on here, but as a workaround, you could
try passing the `--no-nanny` flag to the `dask-worker` command.
The context is that dask workers are started up by a Nanny, which is used
to check health and restart it if necessary. That Nanny uses
multiprocessing pipes and other functionality to ensure that it gets a
signal when the worker dies. My guess is that some of those mechanisms in
Python rely on a file system, which, presumably isn't a great idea at this
scale. The easiest thing to do is to just not use a Nanny process and see
if that gets you past things.
…On Mon, May 4, 2020 at 7:16 AM Mark Coletti ***@***.***> wrote:
The 2,764 node job failed, and I may need help reading the tea leaves on
this one.
First, the job ran for the fully allotted time, but was hung and killed by
the system:
User defined signal 2
Could not read jskill result from pmix server
Could not read jskill result from pmix server
Could not read jskill result from pmix server
I look in the output and see that it had only gotten so far in allocating
workers:
Waiting for workers
Deadline set for 5 minutes
Target workers: 8292
Scheduler timeout: 600 secs
Pause time: 5 secs
Scheduler file: /gpfs/alpine/csc363/scratch/mcoletti/sleeper/2764_node/scheduler_file.json
seconds number of workers
0.00 7
130.17 50
135.18 136
140.20 209
145.21 259
150.24 352
155.28 536
160.32 657
165.72 862
172.03 1418
180.27 1912
Yes, I could have used Client.wait_for_workers(), but my
wait_for_workers.py script gives a little more detail on worker
allocation. In any case, the maximum total number of workers was 16584, but
I had set the wait for workers to allow the EA to start after only half of
those were allocated, or 8292, or five minutes, whichever came first.
However, it is in that process that execution hung, so it was during a call
to client.scheduler_info() that things got wedged.
I had the job configured so that each individual worker saved its
respective stdout and stderrr to its own file, so I went to the stderr for
some, and this is what I saw:
Traceback (most recent call last):
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
finalizer()
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
and:
FileNotFoundError: [Errno 2] No such file or directory
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
distributed.nanny - INFO - Closing Nanny at 'tcp://10.41.0.101:38369'
distributed.dask_worker - INFO - End worker
I have no idea *what* file it was looking for, but that would appear to
be the ultimate source of the problem. Any ideas on how to proceed?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGY7K5QUUVQ6FBSNW3RP3EUBANCNFSM4ME452QQ>
.
|
That seems reasonable, so I've resubmitted the job sans nannies. I'll let you know the outcome as soon as the job runs, which may not be for several days. (We're wayyyy over quota for that Summit account, and are rightfully being dinged for that for submitted jobs.) |
Ha!
I also think that at some point it probably makes sense to test not whether
we can connect, but if things are performant at all. My guess is that even
if we can get tens of thousands of machines hooked up we'll still run into
performance issues at a scale that is far smaller.
…On Mon, May 4, 2020 at 10:39 AM Mark Coletti ***@***.***> wrote:
That seems reasonable, so I've resubmitted the job sans nannies. I'll let
you know the outcome as soon as the job runs, which may not be for several
days.
(We're wayyyy over quota for that Summit account, and are rightfully being
dinged for that for submitted jobs.)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTEVIQCSYINOOLUWSZTRP34WJANCNFSM4ME452QQ>
.
|
I have plans to do some post mortem analysis of the runs, and one of the things I will be looking for will be "Did all the workers do work?" Given that some of the workers will still be coming up when the main program runs, I expect there to be a little non-uniformity in the load distribution, but hopefully all workers will have done something. I also plan on doing later runs with performance reports toggled on to see if that yields anything useful, but I've not done so now for fear that may trigger problems. |
2,764 node job failed again, which is getting tiresome. The wait-for-workers utility died with this error:
The only thing it's connecting with is the scheduler, and the scheduler timeout was ten minutes. I ... guess ... I could up that? :-\ And looking through the error logs I saw 3,043 instances of this error:
So, made these changes to
|
At this point I personally would probably stop trying to push things as
high as they can go, and instead try to figure out what is causing friction
at lower scales. I understand that that may be challenging though.
…On Wed, May 6, 2020 at 7:36 AM Mark Coletti ***@***.***> wrote:
2,764 node job failed again, which is getting tiresome.
The wait-for-workers utility died with this error:
Raised CommClosedError('in <closed TCP>: TimeoutError: [Errno 110] Connection timed out') during execution
Wait for workers failed. Aborting.
The only thing it's connecting with is the scheduler, and the scheduler
timeout was *ten minutes*. I ... guess ... I could up that? :-\
And looking through the error logs I saw 3,043 instances of this error:
distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>
So, made these changes to ~/.config/dask/distributed.yaml:
@@ -23,7 +23,7 @@ distributed:
transition-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
- worker-ttl: 300s # like '60s'. Time to live for workers. They must heartbeat faster than this
+ worker-ttl: 900s # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: []
preload-argv: []
@@ -94,8 +94,8 @@ distributed:
threads: -1 # Threads to use. 0 for single-threaded, -1 to infer from cpu count.
timeouts:
- connect: 60s # time before connecting fails
- tcp: 90s # time before calling an unresponsive connection dead
+ connect: 600s # time before connecting fails
+ tcp: 900s # time before calling an unresponsive connection dead
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTBGJEOQY5JAQ47KSKTRQFYYTANCNFSM4ME452QQ>
.
|
Well, it doesn't cost me anything to jigger some things and resubmit the jobs. I can carry on other tasking while the jobs are pending. However, I'd be interested in pointers in trying to dig a little deeper into what some of the bottlenecks might be. I suspect the biggest one is going to be the scheduler, though I've given that 2 CPUs to crank on. Honestly, for the stuff we're trying to do, I think even grabbing 900 nodes, which I've proved works, is probably adequate for most of our problems. However, our flagship distributed EA, MENNDL, can run on the whole machine, so I'd like to at least demonstrate similar capability for the distributed dask based code. I also haven't tried the MPI-based distributed dask, which may be a direction I go next, even though it's only responsible for setting up workers. |
I also haven't tried the MPI-based distributed dask, which may be a
direction I go next, even though it's only responsible for setting up
workers.
Making an MPI based communication system for Dask is also in-scope if
anyone wanted to take it on. That would probably reduce setup woes. I
would estimate this at about a week for an experienced Dask dev, and maybe
a month for a decent dev who was not experienced with Dask comms.
…On Wed, May 6, 2020 at 7:56 AM Mark Coletti ***@***.***> wrote:
Well, it doesn't cost me anything to jigger some things and resubmit the
jobs. I can carry on other tasking while the jobs are pending. However, I'd
be interested in pointers in trying to dig a little deeper into what some
of the bottlenecks might be. I suspect the biggest one is going to be the
scheduler, though I've given that 2 CPUs to crank on.
Honestly, for the stuff we're trying to do, I think even grabbing 900
nodes, which I've proved works, is probably adequate for most of our
problems. However, our flagship distributed EA, MENNDL, can run on the
whole machine, so I'd like to at least demonstrate similar capability for
the distributed dask based code.
I also haven't tried the MPI-based distributed dask, which may be a
direction I go next, even though it's only responsible for setting up
workers.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTAJBJ7ID5BDRLKPJ3LRQF3DNANCNFSM4ME452QQ>
.
|
There's already UCX-Py. Maybe try that before writing a new comm? 😉 |
Here's an overdue update: I'm still struggling to get the 2,764 node runs to work with In any case, the last run had some new errors, which I count as a sort of progress. First were the usual unresponsive
Fair enough, but there were some new errors afoot this go round.
I'm guessing a I also observed this error:
I'm not entirely sure what's up with that. Moving forward, I've removed a call to my If this fails, I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose. (Again, we're avoiding MPI because on Summit we must use IBM's proprietary Spectrum MPI library that's a fork of OpenMPI burdened by DRM garbage that's not exactly a joy to get to work inside a Singularity container. But, if we must suffer that option to get whole machine runs, then suffer we shall.) |
Are you using array jobs to submit the workers or is each worker its own job? |
@jakirkham So, as far as I understand UCX-py efforts are for dask-cuda and, while compiling it, we can explicitly set UCX dependencies, but for DASK, which is already available for ppc64le, the question is if DASK latest versions has already been built with UCX. |
We chose to use a single job approach because there is a tight limit on the number of concurrent jobs on Summit, which is five. Since we're going for full machine runs, that means we need to do everything within a single job since 27,648 jobs (4,608 nodes x six workers per node) running simultaneously is wellll over that five job limit. ;) And, just to be clear, we're currently shooting to get just half the machine to run, or 2,764 nodes. Once we get distributed dask to work at that level, we'll shoot for the whole enchilada. For those familiar with Summit's batch queueing system, LSF, we use a single
|
I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. We could make a proper Dask MPI communication system, but that would require a bit of work. I'm guessing a week or two of a full-time dev who had done this before. cc'ing @fjetter who might have thoughts on stability with large numbers of workers Regarding UCX I think that it would be interesting to try, mostly because it would help to isolate the probably away from TCP connections. If I were to try to isolate the problem here I would probably start by running this on a smaller set of workers (merely 1000 or so :) ) and do some profiling. I would hope that that would highlight some bottleneck that a small dev investment could resolve. Then I would increase from there. Going directly for thousands of nodes without profiling seems potentially wasteful to me. This could be any of a hundred different issues. I think that we'll need to figure out what's going on before we try to push harder. |
Ok, thanks for the heads up, Matthew.
I'm not a distributed dask developer, but I'm willing to share profiling information here. I suspect any identified bottlenecks on Summit are likely to manifest, albeit likelier in lesser form, for others. I suspect that the bottlenecks are going to be in the scheduler. For example, just monitoring all the worker heartbeats is a burdensome task, I imagine. |
Yes, I suspect that that's exactly the kind of thing that we'll run into. Actually, we already did with previous systems. Today worker heartbeats slow down based on the number of workers. There are probably other issues that will arise though that we haven't yet accounted for. |
@benjha UCX-py is a generic UCX library for Python. It is useful in dask-cuda as UCX supports NVLINK but it is also useful for pure Infiniband setups. |
Yeah to complement what @jacobtomlinson already said @benjha. Dask already has UCX-Py support baked in. Dask-CUDA is really designed for single node deployments atm. Though there may be pieces we pull from it. That all being said, would be curious what @piprrr is able to accomplish just by adding UCX-Py. |
Just in case you are not aware but the heartbeat interval is limited to a max of 5 seconds. With your 10k+ dask workers that you are trying to run your scheduler has less than 0.5ms per heartbeat before just heartbeats are causing delays for all scheduler actions. In our fork we have limited the number of heartbeats per second to 50 which has helped quite a bit in large jobs. You can find the code for the heartbeat interval at distributed/distributed/scheduler.py Line 5561 in 6ee7f89
Note that not limiting the heartbeat interval requires that you also update code that relies on the heartbeat interval (e.g. distributed/distributed/scheduler.py Line 5301 in 6ee7f89
|
Thanks for pointing that out. I opened #3877 to make configuring the heartbeat easier. |
Oh yeah, that's super helpful. Thanks!
…On Wed, Jun 10, 2020 at 4:06 AM Tom Augspurger ***@***.***> wrote:
Thanks for pointing that out. I opened #3877
<#3877> to make configuring the
heartbeat easier.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTC57O7RSMT6CFLLFBDRV5SNBANCNFSM4ME452QQ>
.
|
This is a very useful feature, and I'll be happy to test it on Summit, though it may be a little while before I can do so. This does address my suspicion that it was the scheduler's heartbeat management that was causing us problems. |
Could you please explain such a Dask MPI communication system? Do you have something similar to UCX-Py/TCP kind of backends that already exist in mind or is this going to be complete revamp of Dask Distributed package. Thanks. |
Dask has extensible Comms:
https://distributed.dask.org/en/latest/communications.html
This is what we implement when we add things like UCX. We would use mpi4py
to implement this API and then Dask would use MPI for communication.
However, it would still be Dask, we're just using send/recv for
point-to-point communication.
…On Mon, Jun 29, 2020 at 5:15 PM aamirshafi ***@***.***> wrote:
I don't think that this will help you. All that dask-mpi does is use MPI
to communicate the address of the scheduler, and then it connects as
normal. We could make a proper Dask MPI communication system, but that
would require a bit of work. I'm guessing a week or two of a full-time dev
who had done this before.
Could you please explain such a Dask MPI communication system? Do you have
something similar to UCX-Py/TCP kind of backends that already exist or is
this going to be complete revamp of Dask Distributed package.
Thanks.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTALKXQIOIYYJGXJHHTRZEVDHANCNFSM4ME452QQ>
.
|
Thanks for the response. Dask's communication layer has connect() and listen() functions that do not have direct parallels in the MPI world. Do you have any thoughts on how to handle connection establishment with MPI? I am assuming that something like dask-mpi will be used to start schedulers and workers but in that case, MPI and all processes will already be initialized and there will be no need for connect() and listen(). Also in Dask Distributed, workers create new connections amongst themselves while the client code is executing. Is there a way to configure Dask Distributed to not make dynamic connections between workers on the fly and instead create them in the beginning only? |
Yes. We would just refer to the existing connections. Is this something
that you're planning on working on? If so, we should probably move this
conversation to a separate issue. We're drifting a little bit off topic
from Summit I think.
…On Mon, Jun 29, 2020, 8:49 PM aamirshafi ***@***.***> wrote:
Thanks for the response.
Dask's communication layer has connect() and listen() functions that do
not have direct parallels in the MPI world. Do you have any thoughts on how
to handle connection establishment with MPI? I am assuming that something
like dask-mpi will be used to start schedulers and workers but in that
case, MPI and all processes will already be initialized and there will be
no need for connect() and listen().
Also in Dask Distributed, workers create new connections amongst
themselves while the client code is executing. Is there a way to configure
Dask Distributed to not make dynamic connections between workers on the fly
and instead create them in the beginning only?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3691 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTBG2ABUWAPHZF7JJ3DRZFOFRANCNFSM4ME452QQ>
.
|
This github distributed dask issue is to track progress and resolve problems associated with scaling dask to run on all the nodes on Oak Ridge National Laboratory (ORNL)’s Summit supercomputer. Benjamin Zaitlen of Nvidia suggested that I create this GitHub issue on the RAPIDS-GoAi Slack when I first shared what we were trying to do there; he communicated with the dask development team, and they posed this approach of creating this issue in the project’s GitHub issue tracker.
At ORNL we are using distributed dask to evaluate unique driving scenarios using the CARLA driving simulator to improve a deep-learner model for driving a virtual autonomous vehicle. As part of this effort, we are stress testing dask on Summit by gradually adding more Summit nodes to submitted jobs, fixing problems at the point where a given number of nodes has caused issues, and then increasing the number of nodes to address the next set of problems.
As of April 9th, the maximum number of Summit nodes from which we’ve gotten a successful run is 402 nodes with six dask workers each, for a total of 2,412 dask workers. We have a job pending with 921 nodes that will push the total number of dask workers to 5,526 dask workers. Eventually we’d like dask running on all 4,608 Summit nodes, which would have 27,648 dask workers.
In parallel, we’re developing an open-source evolutionary algorithm framework, LEAP, with colleagues at George Mason University and MITRE that uses dask to concurrently evaluate individuals. We have used LEAP as the vehicle for stress testing dask on Summit. The first test problem was a MAX ONES instance where the fitness was the total number of ones in a given bit string. Since this problem was too trivial for stress testing, we created a new problem that had a single integer as the individual's phenome (that is, genes that have been decoded into something meaningful to the problem, in this case an integer) that would correspond to how many seconds to sleep during evaluation. This means an individual that had a genome of 10 bits of all ones would cause the dask worker to sleep for about 17 minutes during evaluation. Moreover, this was a maximization problem, so the longer the sleep periods, the fitter the individual. (We could probably call this test the UNDERGRAD problem. 😉 )
In the comments below, I’ll detail what I’ve done so far, and relate some of the problems I’ve encountered on this journey. I welcome comments, tips, suggestions, criticisms, and help with the next inevitable set of problems associated with this non-trivial task. 🙂
Cheers,
Mark Coletti,
ORNL
The text was updated successfully, but these errors were encountered: