-
-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for job arrays #196
Comments
The first thought that comes to my mind here is that we've designed dask-jobqueue with simplicity in mind, as discussed and argumented by @jhamman here: #84 (comment). The discussion was about multi nodes reservation at first, but I feel this apply, even with a lesser extend, to job arrays. I'd be curious to have @jhamman advice here. What goes into your direction @sntgluca is that I believe that job arrays have an implementation almost every job queue softwares. But still, I'm not sure if this is an easy thing to implement in dask-jobqueue, and we certainly don't want to make it too complex. One question if we implemented this, how will we able to easily implement Looks like a quite heavy change. |
@sntgluca - in the early days of dask on hpc, we explored using job arrays as a tool for deploying dask (dask/distributed#1260 (comment)). IIRC, we found this approach was generally difficult to abstract to generic array jobs. It might we worth reading https://docs.dask.org/en/latest/setup/hpc.html if you have not already. I think there is probably a use case for array jobs with dask but I don't think dask-jobqueue is the right place. While I do see how they may lighten the load on the (SGE) scheduler, they are far less capable of dynamic scaling than what we have now. |
Some IT groups prefer job arrays over many single jobs because it reduces
stress of the resource manager (SLURM, SGE, etc..) They view submitting
thousands of independent jobs as an anti-pattern. If there is an easy way
to achieve this while still maintaining the current design then I think
that it would be welcome. The challenge is to find something that works
out well in both cases.
Sometimes with job arrays (my only experience here is with SGE) it just
produces many new Job ID like 10.1, 10.2, 10.3, 10.4 and those jobs proceed
as though we had submitted them independently. You can still manage them
independently. In this situation we might consider submitting an array job
whenever we scale up by a large number at once. However, I don't know if
my understanding here is correct, and even if so I don't know if this would
generalize.
I think that the general answer here is that "if this can be made to work
without adding much complexity, and if it demonstrably improves the
experience for some users or administrators then we'd be happy to have
it". Proposing a design and implementing a proof of concept that acheives
this would be a good activity for anyone who wants to push forward on this
issue.
…On Thu, Nov 15, 2018 at 5:15 PM Joe Hamman ***@***.***> wrote:
@sntgluca <https://github.com/sntgluca> - in the early days of dask on
hpc, we explored using job arrays as a tool for deploying dask (dask/distributed#1260
(comment)
<dask/distributed#1260 (comment)>). IIRC,
we found this approach was generally difficult to abstract to generic array
jobs. It might we worth reading
https://docs.dask.org/en/latest/setup/hpc.html if you have not already.
I think there is probably a use case for array jobs with dask but I don't
think dask-jobqueue is the right place. While I do see how they may lighten
the load on the (SGE) scheduler, they are far less capable of dynamic
scaling than what we have now.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#196 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszCA0tLEMSsdgzJNu-FgUgYZUC3xTks5uved6gaJpZM4Yffg1>
.
|
Not sure if this will help as I am relatively new to using dask but I have played around with using job arrays. Right now you can force n = 100
params = {
'cores' : 2,
'memory' : '1000MB',
'processes' : 1,
'walltime' : '180',
'job_extra' : [
'--array 0-{0:d}'.format(n - 1), # submit array of sizen
'--requeue',
],
'env_extra' : [
'JOB_ID=${SLURM_ARRAY_JOB_ID%;*}_${SLURM_ARRAY_TASK_ID%;*}',
],
} which will result with each worker having a unique id by overwriting #!/usr/bin/env bash
#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=954M
#SBATCH -t 180
#SBATCH --array 0-99
#SBATCH --requeue
JOB_ID=${SLURM_JOB_ID%;*}
JOB_ID=${SLURM_ARRAY_JOB_ID%;*}_${SLURM_ARRAY_TASK_ID%;*} This works just fine when using simple setups since each worker gets added to the pool even if the scheduler complains that it is unknown. However, each call the |
Yep, this is clearly the case at CNES, but it is when one doesn't use Dask :). We've a limitation of 200 concurrent individual jobs, or 10 job arrays, but with up to 100,000 jobs each. But 200 dask-jobqueue jobs with 24 cores is already 4800 cores, that's 2/3 of our cluster... More importantly, in my point of view, dask-jobqueue can play a role similar to job-arrays, as once you've booked a number of resources, you can start computing a lot of individual tasks, and you also don't have to wait for all resources to come up.
I think your understanding is correct, but as you say we first need here to check if this generalize on all job queue system. It seems to be the case for PBS, Slurm and SGE at least.
I'm perfectly fine with that, though I don't know if it is worth the effort. |
though I don't know if it is worth the effort
I agree. For me personally it's not worth the effort. It sounds like it's
the same for you. If this is very important for someone else though then I
want them to know that they are welcome to do it.
…On Mon, Nov 19, 2018 at 3:48 PM Guillaume Eynard-Bontemps < ***@***.***> wrote:
Some IT groups prefer job arrays over many single jobs because it reduces
stress of the resource manager (SLURM, SGE, etc..) They view submitting
thousands of independent jobs as an anti-pattern.
Yep, this is clearly the case at CNES, but it is when one doesn't use Dask
:). We've a limitation of 200 concurrent individual jobs, or 10 job arrays,
but with up to 100,000 jobs each. But 200 dask-jobqueue jobs with 24 cores
is already 4800 cores, that's 2/3 of our cluster... More importantly, in my
point of view, dask-jobqueue can play a role similar to job-arrays, as once
you've booked a number of resources, you can start computing a lot of
individual tasks, and you also don't have to wait for all resources to come
up.
However, I don't know if my understanding here is correct, and even if so
I don't know if this would generalize.
I think your understanding is correct, but as you say we first need here
to check if this generalize on all job queue system. It seems to be the
case for PBS, Slurm and SGE at least.
I think that the general answer here is that "if this can be made to work
without adding much complexity, and if it demonstrably improves the
experience for some users or administrators then we'd be happy to have it".
I'm perfectly fine with that, though I don't know if it is worth the
effort.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#196 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszEXz68rxVE5mOeudQPlb-4ijSYxqks5uwxkhgaJpZM4Yffg1>
.
|
Hi all, thanks for the all the follow up and sorry for delaying my answer. I personally find it an important asset. I speak from my personal experience as a HPC user working in close contact with my system administrators. One thing to keep in mind is that dask is pretty new, and I am not sure how many syadmin are actually following your progress :) So it's not easy to measure the real impact there.
I am positive this can be done in scale_up and scale_down. But there are other parts of the code requiring attention. For instance, a task array is returned as a unique job, not the whole list of jobs, messing up with the Before you decide to close this, is it ok if I give it a try? What would you expect a newcomer do ;)? |
Even though it's a single job (like 123) the individual components of that
job are sometimes addressable (like 123.456 for the 456th component of job
123).
Before you decide to close this, is it ok if I give it a try?
Definitely
What would you expect a newcomer do ;)?
Presumably you would modify the code to achieve the results that you want.
You would also provide tests that clearly communicate your desired change,
and also demonstrate that that change has occurred. You should also
probably read through the general Dask developer guidelines here:
http://docs.dask.org/en/latest/develop.html#development-environment
…On Tue, Nov 27, 2018 at 8:17 AM sntgluca ***@***.***> wrote:
Hi all,
thanks for the all the follow up and sorry for delaying my answer. I
personally find it an important asset. I speak from my personal experience
as a HPC user working in close contact with my system administrators.
One thing to keep in mind is that dask is pretty new, and I am not sure
how many syadmin are actually following your progress :) So it's not easy
to measure the real impact there.
one question if we implemented this, how will we able to easily implement
scale_up and scale_down functions with arrays? Is it always easy to qdel or
equivalent only part of an array of jobs? Will this be compatible with the
internal state (running_jobs, pending_jobs ...) we store?
I think if we wanted to use arrays, we should do it systematically to keep
only simple job templates...
I am positive this can be done in scale_up and scale_down. But there are
other parts of the code requiring attention. For instance, a task array is
returned as a unique job, not the whole list of jobs, messing up with the
pending_jobs list.
Before you decide to close this, is it ok if I give it a try? What would
you expect a newcomer do ;)?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#196 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszJ29-QASVNYSjE2PLO0LBSvsoTzXks5uzTuCgaJpZM4Yffg1>
.
|
Thanks for the guidelines @mrocklin, |
Hi, I have a working proof of concept to use job arrays for the SGECluster class. Before moving further, I have a design question. How do you think the change shall be hooked into the current code? I suggest the use of job arrays be optional, for instance activated by a boolean dask_jobqueue.SGECluster(use_task_array=True) This idea seems consistent with the rest of the code and shall be robust, but might make the design a bit dull: |
I'm a bit afraid of this yes... But defaulting to job arrays sounds bad too. Is there a lot of places where the if statement would be used? Can we limit this? If it is only in So I'm slightly in favor of using the boolean activator you propose. |
I commented on the the #217 but I'll comment on the issue too with more complete thoughts. My feeling echoes the feeling of others in this issue: task array support is not worth the effort at this point of time in IMO task array support is a "nice to have" feature but I am not convinced it is one of the main bottleneck for My experience with open-source projects is the following: there is quite a long (and ever growing) list of "nice to have" features but not that much time and energy on people's hands to actually implement them, review PRs, test them thoroughly, etc ... deciding what to focus on is key. |
Coming from a compute center with 30,000 SGE nodes. Job arrays are a must. The head node cannot handle the load without job arrays. In one of our software products we were debating a migration to Dask (from our in-house solution) and this is the only feature missing but it is enough to prevent our migration. |
Full disclosure: I am not very familiar with big clusters. 30_000 nodes seems like a lot though, is it number of nodes or number of cores? I know dask-jobqueue is used at Cheyenne which is ~40 in the TOP 500 and has roughly ~4000 nodes according to this Other than this:
Something that may be worth noting: dask can be more flexible than job arrays so this may allow you to do more with the limited amount of qsub you are allowed to have (as hinted in #196 (comment)). Typically there is not a 1-to-1 mapping between "chunk or work" and job and you can use many "chunks of work" inside the same job. Here is a snippet to demonstrate what I mean: from dask_jobqueue import SGECluster
cluster = SGECluster(cores=1, memory='1GB')
# this will launch 4 jobs
cluster.scale(cores=4)
from dask.distributed import Client
client = Client(cluster)
def my_function(i):
return i
argument_list = range(1000)
# the number of times you call my_function is not related to the number of jobs
futures = [client.submit(my_function, arg) for arg in argument_list] |
Just popping in this long thread because I am a user of @guillaumeeb cluster and I have started to use dask a lot, and I currently encounter the limitation of maximum number of jobs in queue. Our maximum number of jobs in queue is 100, which is not much. I understand that using job arrays is off the table, but maybe we could make dask jobqueue aware of those limits, and inform it that it needs to start say 200 jobs but it needs to ensure that there are never more than 100 queuing ? I am not familiar with jobqueue code but I expect this to be a much smaller patch than enabling job arrays. |
If it is really about having only 100 jobs waiting to start and being able to have a lot more jobs running, you could test along the following lines: current_target_size = 0
eventual_target_size = 999
queue_limit = 100
while current_target_size < eventual_target_size:
current_target_size = max(
eventual_target_size,
queue_limit + current_target_size)
cluster.scale(jobs=current_target_size)
wait(client.wait_for_workers(current_target_size)) |
Of course I could do that, but then I would wait for all my ressources to be allocated before moving on, and my master node is I really think this is something that should be handled by jobqueue one way or the other, otherwise it does not offer a full abstraction of PBS and all the rest. |
I don't think we're targeting a full abstraction of the functionality of the common job queueing systems. I think a better way to describe the target of dask-jobqueue is the union of functionality among the core queueing systems (PBS, Slurm, SGE, etc). While I am hesitant to commit to job array implementation in the core cluster classes, I'm not opposed to seeing a new cluster class, e.g. [EDIT]: Sorry for the accidental issue close below. Clumsy fingers! |
I agree with @jhamman. Having a I have to admit that I don't understand what the limitations are in practice @jmichel-otb (or someone else who understands this more than I do), can you elaborate a bit more:
|
For the record this is what happens when I hit the limitation for number of jobs in queue :
Cluster is still running, though. |
Thanks for this @jmichel-otb, this is helpful! If you have the time to answer my question above, it would be great!
Here are some general rough directions, if someone has the bandwidth to investigate:
|
Sorry @jmichel-otb for the delay in my answer...
Wow, it's as much yours as it's mine 🙂 ! I propose you open another issue about your suggestion to limit the amount of queued jobs. Be aware that there may not be an easy solution to this, and I'll have limited time to investigate. I also stick to my suggestion of making individual jobs bigger, and if it does not solve your problem I'd be curious as @lesteve to know why? I think there are two different issues here. Eventually both could be solved by implementing job arrays in dask-jobqueue. I think this would be a good idea to work on such an implementation in the end, but I probably won't be able to any time soon. Could @sntgluca try to work again on this with the new dask-jobqueue based on SpecCluster? |
@guillaumeeb the answer is that I can not get it to work with more workers per PBS job. My current setup is 1 worker per job, with 1 process, 8 threads and 40 Gb of memory. The maximum processing power I can reach with cluster's limit is 200*8 threads (which is a lot already), but as you may know multithreaded code never reaches 100% efficiency, and there are part of the code that are not multithreaded. I have been trying to change this setup to jobs with 8 single-threaded workers processes, 5 Gb each per job for the past week. This never reaches the end of the processing, with various errors ranging from workers restart from nanny, tornado errors, jobs killed by PBS ... Part of the issue might be that dask does not order tasks in an optimal way for our graph (it leaves many intermediate results open instead of processing leaves nodes quickly so as to release ressources). I posted an issue about this here : dask/dask#5584 I must say that it is really hard to figure out what is going on when computation fails with this software stack (PBS, dask-jobqueue, dask). Logs are filled with clueless entries such as tornado errors, and it seems that the only option is trial and errors (and a bit of prayer). |
Dear @guillaumeeb If anyone has time before, that'd be welcome |
Did you wind up exploring this further @sntgluca? 🙂 |
I prepared it but I don't have rights to push a new branch into the repository. |
Push it in your fork i.e. |
LOL I really need some practice... Here it is: |
Yeah so if this is the only thing you need to make job array work, I would hope that there is a way to make cluster = SGECluster(name='whatever_$SGE_TASK_ID', cores=1, memory='1GB') Also can you post the output of Also I don't think we need to get too involved in |
Just to state that I still agree with Matt's comment above (long time ago) about implementing job arrays in dask-jobqueue :
I share @lesteve concerns about maintainablility, but as I'm in charge of an HPC cluster, I also know that Scheduler load is really important, and job arrays is a way to improve it. If all this can be solved by documentation that's great. If some careful lines of code can improve it, that's great too. |
Dear @lesteve
Here's the output of #!/usr/bin/env bash
#$ -N dask-worker
#$ -l m_mem_free=2G
#$ -l h_rt=3600
#$ -cwd
#$ -j y
/home/santagi4/.conda/envs/dask/bin/python -m distributed.cli.dask_worker tcp://10.168.16.163:39551 --nthreads 1 --memory-limit 2.00GB --name name --nanny --death-timeout 60 As you see the I don't think this is very useful for you but... if you wish to follow along, dask-jobqueue/dask_jobqueue/core.py Line 548 in b2a5ac8
The latter in turn hardcodes the name as the string "name". dask-jobqueue/dask_jobqueue/core.py Line 540 in b2a5ac8
|
@lesteve , We have been thinking about this ticket for a while now. I think we both spent too much time for what's worth. I feel we have three options:
def new_worker_spec(self):
while self._i in self.worker_spec:
self._i += 1
name = f'{self._i}_{self._name}'
return {name, self.new_spec} This would be in line with your approach of passing 'cluster_$SGE_TASK_ID' as the cluster's name. If neither options 2 or 3 are convincing, I'd say we can pick option 1 and move on to something more interesting. |
I still have some hope to find a work-around and document it with minimal code change. Although I agree we both probably spent spent too much time that we would have liked in an ideal world, I feel we at least understand the issue a bit better. After having looked at this more closely, I am reasonably convinced that the "right way" is to use
Now we need to give the user a way to provide |
I was being too optimistic and completely missed this in the docstring:
This |
Dear @lesteve , This was good input! By checking how groups are already used in A user can "mimic" the behaviour of a task array by setting up a cluster with multiple processes, leading to fewer calls to the scheduler. cluster = SGECluster(processes=<PROCESSES>, cores=1, memory='1GB',
job_extra=["-pe smp <PROCESSES>"]
) when This is not the same as using a task array, and it would have a number of drawbacks, but it
I honestly think that some of the implementation suggestions in this thread would be good to the library, and would have minimal maintenance impact. |
Sorry I assumed you knew about this approach from the beginning 😅 ... this was implicitly mentioned in #196 (comment) for example but the discussion is so long that it is hard to keep track of everything of course. It would be great if you could add it to the documentation! Not sure the best place to put it, I would say put it in the newly created "Advanced tips and tricks" for now. In my mind this is one of the reason why job array is not a "must-have at all cost" thing in Dask-Jobqueue. Another key reason also is that you can easily reuse the same job for multiple parameters. When you submit each job directly with Note that there is something a bit special about
I think we are slowly converging 😄, I now think that your |
Maybe a better name for the section to create would be "How do I do the equivalent of job array with Dask-Jobqueue?". This is a question that a lot of people who are familiar with HPC clusters will be asking themselves so that would be very useful! |
We mandate use of job-arrays because our SGE head node cannot keep up if everyone submits single jobs.
Exactly the opposite. There is no 'gradually move to job arrays' for us. We start with a large job array request and then as nodes die/crash/etc we spawn them with single qsub requests. |
@halsafar interesting I would be curious to learn a bit more about your cluster best practice. IIUC you can submit single job (e.g. when a single job crashed and you need to resubmit it) so what is the recommended threshold of number of jobs for which it is acceptable for launching a few very similar single jobs. Say you launch a job array of 100 jobs and 10 crash. Can you relaunch the 10 with single jobs, do you have to do a job array? Say 5 job crash, same question. Also, just a small comment but rather important comment. You probably did not mean it that way, but "Exactly the opposite" could be easily interpreted as "How could you possibly say this you damn moron" (obviously I am exaggerating a little bit here to make my point, but on this particular one I was indeed a bit annoyed by your "Exactly the opposite" and it took me 5-10 minutes before I was ready to compose a calm reply). IMO terms like this can easily be removed or toned down which makes for a more happy and productive discussion. A good recent related blog post from @GaelVaroquaux: http://gael-varoquaux.info/programming/technical-discussions-are-hard-a-few-tips.html. Something that could be mentioned somewhere also in the doc is that Dask-Jobqueue may not be the right tool for you if you need a lot more flexibility than what it provides. An alternative while still using Dask on a HPC cluster is https://docs.dask.org/en/latest/setup/hpc.html?highlight=hpc#using-a-shared-network-file-system-and-a-job-scheduler. So it is more manual in the sense that you submit the |
@lesteve thank you for pointing this out. I noticed many on GitHub do not practice communicating "neutrally" and/or "effusively", and simply write out what's on their mind. This is a wonderful reminder. |
I opened dask/distributed#3855 to allow SpecCluster name to be used in the worker names. That would allow to use job arrays with something like this: cluster = SGECluster(
cores=1, memory='1GB',
job_extra=['-t 1-3'],
name='test_$SGE_TASK_ID') Notes:
I tested dask/distributed#3855 and it seems to be working as intended. There could still be weird things with the spec group, I have to say I don't really understand this and don't have the time to look at it more in the near future. |
Hi @lesteve , So, given the implementation on |
That was the kind of work-around I had in mind for job arrays: dask/dask#7070 (comment). The known caveats are:
Of course there may be unknown caveats ... |
@lesteve If I'm understanding "adaptive" – I was actually using There might also be some logic that could be user-implemented to adapt the size of the array with |
With my maintainer had on I would rather be even more conservative and say "use at your own risk, this may explode in your face at any moment" 😉 |
In the development version this is a bit easier to use job arrays thanks to #480, see dask/dask#7070 (reply in thread) |
@guillaumeeb For this comment,
do you use Slurm or other job schedulers? If it is Slurm, would you like to share how to implement that? Thank you. We use Slurm on our cluster and would want to implement this: N concurrent individual jobs and M concurrent jobs in all job arrays (e.g. m job arrays with n jobs in each and M=n*m), and M is much larger than N. I have read the Slurm documentation, but it seems there is no way to separate the individual jobs and jobs in arrays. We can only set the maximum number of all jobs. Not sure if I am correct. |
No, we are currently using PBS Pro. And since a recent update, we cannot do that anymore. We just have now a limit of 100,000 jobs in queue, be it standard job (which is huge) or jobs in array. So same limitation you seem to have. I cannot help with Slurm, but surely you can find some channel to ask Slurm developers? Maybe on github https://github.com/SchedMD/slurm? |
Dear dask community,
I am moving my first steps with Dask and distributed, and I wish to bring up a couple of ideas and suggestions.
The first one is about enhancing the integration with the schedulers.
Dask workers are usually requested in bulk.
When the schedulers allow it, i.e. with SGE, I think it's a good practice to submit those as job (task) arrays rather than multiple independent jobs.
In systems with heavy loads it would make the work of the scheduler much easier.
I don't know the code well enough yet to come up with a complete solution, I think there are multiple places where the code needs changes. So I would like to be sure it's a good idea before working on it.
What do you think?
The text was updated successfully, but these errors were encountered: