Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for job arrays #196

Closed
sntgluca opened this issue Nov 15, 2018 · 70 comments
Closed

Add support for job arrays #196

sntgluca opened this issue Nov 15, 2018 · 70 comments

Comments

@sntgluca
Copy link

Dear dask community,

I am moving my first steps with Dask and distributed, and I wish to bring up a couple of ideas and suggestions.
The first one is about enhancing the integration with the schedulers.

Dask workers are usually requested in bulk.
When the schedulers allow it, i.e. with SGE, I think it's a good practice to submit those as job (task) arrays rather than multiple independent jobs.

In systems with heavy loads it would make the work of the scheduler much easier.

I don't know the code well enough yet to come up with a complete solution, I think there are multiple places where the code needs changes. So I would like to be sure it's a good idea before working on it.

What do you think?

@guillaumeeb
Copy link
Member

The first thought that comes to my mind here is that we've designed dask-jobqueue with simplicity in mind, as discussed and argumented by @jhamman here: #84 (comment).

The discussion was about multi nodes reservation at first, but I feel this apply, even with a lesser extend, to job arrays. I'd be curious to have @jhamman advice here.

What goes into your direction @sntgluca is that I believe that job arrays have an implementation almost every job queue softwares. But still, I'm not sure if this is an easy thing to implement in dask-jobqueue, and we certainly don't want to make it too complex.

One question if we implemented this, how will we able to easily implement scale_up and scale_down functions with arrays? Is it always easy to qdel or equivalent only part of an array of jobs? Will this be compatible with the internal state (running_jobs, pending_jobs ...) we store?
I think if we wanted to use arrays, we should do it systematically to keep only simple job templates...

Looks like a quite heavy change.

@jhamman
Copy link
Member

jhamman commented Nov 15, 2018

@sntgluca - in the early days of dask on hpc, we explored using job arrays as a tool for deploying dask (dask/distributed#1260 (comment)). IIRC, we found this approach was generally difficult to abstract to generic array jobs. It might we worth reading https://docs.dask.org/en/latest/setup/hpc.html if you have not already.

I think there is probably a use case for array jobs with dask but I don't think dask-jobqueue is the right place. While I do see how they may lighten the load on the (SGE) scheduler, they are far less capable of dynamic scaling than what we have now.

@mrocklin
Copy link
Member

mrocklin commented Nov 16, 2018 via email

@belledon
Copy link

belledon commented Nov 19, 2018

Not sure if this will help as I am relatively new to using dask but I have played around with using job arrays. Right now you can force

        n = 100
        params = {
            'cores' : 2,
            'memory' : '1000MB',
            'processes' : 1,
            'walltime' : '180',
            'job_extra' : [
                '--array 0-{0:d}'.format(n - 1), # submit array of sizen
                '--requeue',
            ],
            'env_extra' : [
                'JOB_ID=${SLURM_ARRAY_JOB_ID%;*}_${SLURM_ARRAY_TASK_ID%;*}',
            ],
        }

which will result with each worker having a unique id by overwriting JOB_ID.

#!/usr/bin/env bash

#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=954M
#SBATCH -t 180
#SBATCH --array 0-99
#SBATCH --requeue
JOB_ID=${SLURM_JOB_ID%;*}

JOB_ID=${SLURM_ARRAY_JOB_ID%;*}_${SLURM_ARRAY_TASK_ID%;*}

This works just fine when using simple setups since each worker gets added to the pool even if the scheduler complains that it is unknown. However, each call the scale will result in n new workers, so client.scale(i) means n * i workers.

@guillaumeeb
Copy link
Member

Some IT groups prefer job arrays over many single jobs because it reduces stress of the resource manager (SLURM, SGE, etc..) They view submitting thousands of independent jobs as an anti-pattern.

Yep, this is clearly the case at CNES, but it is when one doesn't use Dask :). We've a limitation of 200 concurrent individual jobs, or 10 job arrays, but with up to 100,000 jobs each. But 200 dask-jobqueue jobs with 24 cores is already 4800 cores, that's 2/3 of our cluster... More importantly, in my point of view, dask-jobqueue can play a role similar to job-arrays, as once you've booked a number of resources, you can start computing a lot of individual tasks, and you also don't have to wait for all resources to come up.

However, I don't know if my understanding here is correct, and even if so I don't know if this would generalize.

I think your understanding is correct, but as you say we first need here to check if this generalize on all job queue system. It seems to be the case for PBS, Slurm and SGE at least.

I think that the general answer here is that "if this can be made to work without adding much complexity, and if it demonstrably improves the experience for some users or administrators then we'd be happy to have it".

I'm perfectly fine with that, though I don't know if it is worth the effort.

@mrocklin
Copy link
Member

mrocklin commented Nov 19, 2018 via email

@sntgluca
Copy link
Author

Hi all,

thanks for the all the follow up and sorry for delaying my answer. I personally find it an important asset. I speak from my personal experience as a HPC user working in close contact with my system administrators.

One thing to keep in mind is that dask is pretty new, and I am not sure how many syadmin are actually following your progress :) So it's not easy to measure the real impact there.

one question if we implemented this, how will we able to easily implement scale_up and scale_down functions with arrays? Is it always easy to qdel or equivalent only part of an array of jobs? Will this be compatible with the internal state (running_jobs, pending_jobs ...) we store?
I think if we wanted to use arrays, we should do it systematically to keep only simple job templates...

I am positive this can be done in scale_up and scale_down. But there are other parts of the code requiring attention. For instance, a task array is returned as a unique job, not the whole list of jobs, messing up with the pending_jobs list.

Before you decide to close this, is it ok if I give it a try? What would you expect a newcomer do ;)?

@mrocklin
Copy link
Member

mrocklin commented Nov 27, 2018 via email

@sntgluca
Copy link
Author

Thanks for the guidelines @mrocklin,
will follow up as soon as I have something to say!

@sntgluca
Copy link
Author

Hi, I have a working proof of concept to use job arrays for the SGECluster class.
I can achieve it by rewriting the start_cluster() method and modifying the self._command_template attribute.

Before moving further, I have a design question.

How do you think the change shall be hooked into the current code? I suggest the use of job arrays be optional, for instance activated by a boolean

dask_jobqueue.SGECluster(use_task_array=True)

This idea seems consistent with the rest of the code and shall be robust, but might make the design a bit dull: if use_task_array do this else do that...

@guillaumeeb
Copy link
Member

This idea seems consistent with the rest of the code and shall be robust, but might make the design a bit dull: if use_task_array do this else do that...

I'm a bit afraid of this yes... But defaulting to job arrays sounds bad too. Is there a lot of places where the if statement would be used? Can we limit this? If it is only in start_cluster() or above, this should be okay.

So I'm slightly in favor of using the boolean activator you propose.

@lesteve
Copy link
Member

lesteve commented Jan 30, 2019

I commented on the the #217 but I'll comment on the issue too with more complete thoughts. My feeling echoes the feeling of others in this issue: task array support is not worth the effort at this point of time in dask-jobqueue.

IMO task array support is a "nice to have" feature but I am not convinced it is one of the main bottleneck for dask-jobqueue users in practice. I may be wrong about this and I would be happy to get more details about your particular use case if you can not use dask-jobqueue because it does not support task array. If we get many reports like this, I would be very happy to reprioritise task array support.

My experience with open-source projects is the following: there is quite a long (and ever growing) list of "nice to have" features but not that much time and energy on people's hands to actually implement them, review PRs, test them thoroughly, etc ... deciding what to focus on is key.

@halsafar
Copy link

halsafar commented Sep 11, 2019

Coming from a compute center with 30,000 SGE nodes. Job arrays are a must. The head node cannot handle the load without job arrays. In one of our software products we were debating a migration to Dask (from our in-house solution) and this is the only feature missing but it is enough to prevent our migration.

@lesteve
Copy link
Member

lesteve commented Sep 11, 2019

Full disclosure: I am not very familiar with big clusters. 30_000 nodes seems like a lot though, is it number of nodes or number of cores? I know dask-jobqueue is used at Cheyenne which is ~40 in the TOP 500 and has roughly ~4000 nodes according to this

Other than this:

  • what is an acceptable number of jobs to launch on your cluster with qsub, in other words what is the threshold of job numbers where you have to use job arrays?
  • what is the typical size of your job arrays?
  • what is the typical time a single job takes?

Something that may be worth noting: dask can be more flexible than job arrays so this may allow you to do more with the limited amount of qsub you are allowed to have (as hinted in #196 (comment)). Typically there is not a 1-to-1 mapping between "chunk or work" and job and you can use many "chunks of work" inside the same job.

Here is a snippet to demonstrate what I mean:

from dask_jobqueue import SGECluster
cluster = SGECluster(cores=1, memory='1GB')
# this will launch 4 jobs
cluster.scale(cores=4)

from dask.distributed import Client
client = Client(cluster)

def my_function(i):
    return i

argument_list = range(1000)

# the number of times you call my_function is not related to the number of jobs
futures = [client.submit(my_function, arg) for arg in argument_list]

@jmichel-otb
Copy link

Just popping in this long thread because I am a user of @guillaumeeb cluster and I have started to use dask a lot, and I currently encounter the limitation of maximum number of jobs in queue. Our maximum number of jobs in queue is 100, which is not much.

I understand that using job arrays is off the table, but maybe we could make dask jobqueue aware of those limits, and inform it that it needs to start say 200 jobs but it needs to ensure that there are never more than 100 queuing ?

I am not familiar with jobqueue code but I expect this to be a much smaller patch than enabling job arrays.

@willirath
Copy link
Collaborator

If it is really about having only 100 jobs waiting to start and being able to have a lot more jobs running, you could test along the following lines:

current_target_size = 0
eventual_target_size = 999
queue_limit = 100
while current_target_size < eventual_target_size:
    current_target_size = max(
        eventual_target_size,
        queue_limit + current_target_size)
    cluster.scale(jobs=current_target_size)
    wait(client.wait_for_workers(current_target_size))

@jmichel-otb
Copy link

Of course I could do that, but then I would wait for all my ressources to be allocated before moving on, and my master node is for res in as_completed() later on to collect and merge results as they are ready.

I really think this is something that should be handled by jobqueue one way or the other, otherwise it does not offer a full abstraction of PBS and all the rest.

@jhamman
Copy link
Member

jhamman commented Oct 31, 2019

otherwise it does not offer a full abstraction of PBS and all the rest.

I don't think we're targeting a full abstraction of the functionality of the common job queueing systems. I think a better way to describe the target of dask-jobqueue is the union of functionality among the core queueing systems (PBS, Slurm, SGE, etc).

While I am hesitant to commit to job array implementation in the core cluster classes, I'm not opposed to seeing a new cluster class, e.g. PBSArrayCluster. Or a new job class PBSArrayJob. I think these would be useful features to build into the package and PRs that demonstrate how this can be done (and work) would help us test out the pattern.

[EDIT]: Sorry for the accidental issue close below. Clumsy fingers!

@jhamman jhamman closed this as completed Oct 31, 2019
@jhamman jhamman reopened this Oct 31, 2019
@lesteve
Copy link
Member

lesteve commented Oct 31, 2019

I agree with @jhamman. Having a PBSArrayCluster (or any FooArrayCluster) could be a way to experiment with this. Full disclosure: I am not going to have time to look at this in the foreseeable future. I suspect the only way this can happen is that someone who is affected by this limitation is willing to put the time and effort to work on this and make a proof of concept.

I have to admit that I don't understand what the limitations are in practice @jmichel-otb (or someone else who understands this more than I do), can you elaborate a bit more:

  • why can you not make your jobs "bigger" as in each job book more cores per job (as hinted by @guillaumeeb in Add support for job arrays  #196 (comment)). Is it because "smaller" jobs tends to be in the running state quicker ?
  • is the limitation is on the number of jobs submitted ? Do you get an error if you submit one more job ?
  • if you run multiple dask-jobqueue main scripts at the same time, should a single script be aware of what the other scripts are doing and how they are scaling up or down so that the total number of submitted jobs never goes above the maximum allowed number of jobs (this seems very tricky to achieve)

@jmichel-otb
Copy link

For the record this is what happens when I hit the limitation for number of jobs in queue :

distributed.utils - ERROR - Command exited with non-zero exit code.
Exit code: 38
Command:
qsub /tmp/tmpj_59it3y.sh
stdout:

stderr:
qsub: would exceed complex's per-user limit of jobs in 'Q' state

Traceback (most recent call last):
  File "/xxx/python3.5/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/xxx/python3.5/site-packages/dask_jobqueue/deploy/cluster_manager.py", line 174, in _scale
    self.scale_up(n)
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 444, in scale_up
    self.start_workers(n - active_and_pending)
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 362, in start_workers
    self.pending_jobs[job] = {}
  File "/xxx/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 346, in job_file
    yield fn
  File "/xxx/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/xxx/python3.5/site-packages/distributed/utils.py", line 903, in tmpfile
    yield filename
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 346, in job_file
    yield fn
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 357, in start_workers
    out = self._submit_job(fn)
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 349, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/xxx/python3.5/site-packages/dask_jobqueue/core.py", line 410, in _call
    "stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)

Cluster is still running, though.

@lesteve
Copy link
Member

lesteve commented Nov 5, 2019

Thanks for this @jmichel-otb, this is helpful! If you have the time to answer my question above, it would be great!

Here are some general rough directions, if someone has the bandwidth to investigate:

  • job_extra could be used for adding the job arrays option into the job script. Ideally the size of the job arrays could change with time but I guess having a static job array size might be simpler to start with and experiment. A way to vary the job array size would be to pass it to the command line (i.e. adding to submit_command) when scaling up.
  • scaling up (probably in PBSArrayCluster.scale): you would need to do some logic to know that executing one qsub can result in multiple jobs and parse multiple job ids (also the job ids you get with job arrays may look a bit different, not sure).
  • scaling down: not sure whether you would want to qdel all the jobs of a given job array or one by one
  • I don't really know SpecCluster well enough but SpecCluster may help for some of this

@guillaumeeb
Copy link
Member

Sorry @jmichel-otb for the delay in my answer...

I am a user of @guillaumeeb cluster

Wow, it's as much yours as it's mine 🙂 !

I propose you open another issue about your suggestion to limit the amount of queued jobs. Be aware that there may not be an easy solution to this, and I'll have limited time to investigate. I also stick to my suggestion of making individual jobs bigger, and if it does not solve your problem I'd be curious as @lesteve to know why?

I think there are two different issues here. Eventually both could be solved by implementing job arrays in dask-jobqueue. I think this would be a good idea to work on such an implementation in the end, but I probably won't be able to any time soon. Could @sntgluca try to work again on this with the new dask-jobqueue based on SpecCluster?

@jmichel-otb
Copy link

@guillaumeeb the answer is that I can not get it to work with more workers per PBS job.

My current setup is 1 worker per job, with 1 process, 8 threads and 40 Gb of memory. The maximum processing power I can reach with cluster's limit is 200*8 threads (which is a lot already), but as you may know multithreaded code never reaches 100% efficiency, and there are part of the code that are not multithreaded.

I have been trying to change this setup to jobs with 8 single-threaded workers processes, 5 Gb each per job for the past week. This never reaches the end of the processing, with various errors ranging from workers restart from nanny, tornado errors, jobs killed by PBS ...

Part of the issue might be that dask does not order tasks in an optimal way for our graph (it leaves many intermediate results open instead of processing leaves nodes quickly so as to release ressources). I posted an issue about this here : dask/dask#5584

I must say that it is really hard to figure out what is going on when computation fails with this software stack (PBS, dask-jobqueue, dask). Logs are filled with clueless entries such as tornado errors, and it seems that the only option is trial and errors (and a bit of prayer).

@sntgluca
Copy link
Author

Dear @guillaumeeb
I'd happily try to give it another go, since it seems there's a broader interest now.
Just a heads-up: it will take time before I can get my head around it. The Christmas holidays seem a good time for me to look into it.

If anyone has time before, that'd be welcome

@jakirkham
Copy link
Member

Did you wind up exploring this further @sntgluca? 🙂

@sntgluca
Copy link
Author

sntgluca commented May 19, 2020

if you could push the code that allows you to use job arrays in a branch

I prepared it but I don't have rights to push a new branch into the repository.
Any suggestion? A patch, post it here, a notebook, reach out via mail?

@lesteve
Copy link
Member

lesteve commented May 19, 2020

I prepared it but I don't have rights to push a new branch into the repository.

Push it in your fork i.e. sntgluca/dask-jobqueue into a branch e.g. job-array-support and post the link here so I can have a look.

@sntgluca
Copy link
Author

LOL I really need some practice...

Here it is:
https://github.com/sntgluca/dask-jobqueue/tree/job-array-support

@lesteve
Copy link
Member

lesteve commented May 19, 2020

Yeah so if this is the only thing you need to make job array work, I would hope that there is a way to make name at the cluster level passed into name at the job level for your use case. As I hinted above I think there is clearly a bug here and that if you fix this bug then you can use something like (hopefully):

cluster = SGECluster(name='whatever_$SGE_TASK_ID', cores=1, memory='1GB')

Also can you post the output of print(cluster.job_script()) just to make sure that we do get --name whatever_$SGE_TASK_ID as I was expecting.

Also I don't think we need to get too involved in distributed SpecCluster (I don't think the key of the spec dict are related to the name of the worker but maybe they are ...). I may of course be completely wrong and too naive on this one ...

@guillaumeeb
Copy link
Member

Just to state that I still agree with Matt's comment above (long time ago) about implementing job arrays in dask-jobqueue :

I think that the general answer here is that "if this can be made to work
without adding much complexity, and if it demonstrably improves the
experience for some users or administrators then we'd be happy to have
it".

I share @lesteve concerns about maintainablility, but as I'm in charge of an HPC cluster, I also know that Scheduler load is really important, and job arrays is a way to improve it. If all this can be solved by documentation that's great. If some careful lines of code can improve it, that's great too.

@sntgluca
Copy link
Author

Dear @lesteve

Also can you post the output of print(cluster.job_script()) just to make sure that we do get --name whatever_$SGE_TASK_ID as I was expecting.

Here's the output of cluster.jobscript()

#!/usr/bin/env bash

#$ -N dask-worker
#$ -l m_mem_free=2G
#$ -l h_rt=3600
#$ -cwd
#$ -j y

/home/santagi4/.conda/envs/dask/bin/python -m distributed.cli.dask_worker tcp://10.168.16.163:39551 --nthreads 1 --memory-limit 2.00GB --name name --nanny --death-timeout 60

As you see the name passed is the string "name".

I don't think this is very useful for you but... if you wish to follow along, job_script is defined in JobQueueCluster, and calls JobQueueCluster.dummy_job:

def job_script(self):

The latter in turn hardcodes the name as the string "name".

name="name",

@sntgluca
Copy link
Author

@lesteve ,
note that the comment above is only partially connected with the task array problem. The workers' names are determined when the scale method is called, which is unfortunately implemented in distributed.

We have been thinking about this ticket for a while now. I think we both spent too much time for what's worth. I feel we have three options:

  1. give up, as it's not worth pursuing. It might not be my favourite idea but if you are not convinced about the other options this would be the right thing to do.
  2. modify the code to inject "task" information in the JobSGE and other Job classes, as the example in Add support for job arrays  #196 (comment)
  3. in JobQueueCluster, override the code setting up the workers described in Add support for job arrays  #196 (comment) (currently from distributed), to pass cluster.name in the workers' names,
    All the code necessary would be
def new_worker_spec(self):
    while self._i in self.worker_spec:
        self._i += 1
    name = f'{self._i}_{self._name}'
    return {name, self.new_spec}

This would be in line with your approach of passing 'cluster_$SGE_TASK_ID' as the cluster's name.

If neither options 2 or 3 are convincing, I'd say we can pick option 1 and move on to something more interesting.
What do you think?

@lesteve
Copy link
Member

lesteve commented May 26, 2020

I still have some hope to find a work-around and document it with minimal code change. Although I agree we both probably spent spent too much time that we would have liked in an ideal world, I feel we at least understand the issue a bit better.

After having looked at this more closely, I am reasonably convinced that the "right way" is to use group in the SpecCluster class. From the SpecCluster class doc:

If a single entry in the spec will generate multiple dask workers then
please provide a `"group"` element to the spec, that includes the suffixes
that will be added to each name (this should be handled by your worker
class).

>>> cluster.worker_spec
{
    0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]}
    1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]}
}

Now we need to give the user a way to provide group at the cluster level. To be honest, this is definitely not going to be very user-friendly but at least this would be a work-around ...

@lesteve
Copy link
Member

lesteve commented May 26, 2020

I was being too optimistic and completely missed this in the docstring:

(this should be handled by your worker class)

This SpecCluster group may still be part of making this a bit more robust (actually maybe used in adaptive mode only?) but this is definitely not the main story here 😩 ...

@sntgluca
Copy link
Author

Dear @lesteve ,

This was good input! By checking how groups are already used in JobQueueCluster with multiple processes I came up with an alternative approach:

A user can "mimic" the behaviour of a task array by setting up a cluster with multiple processes, leading to fewer calls to the scheduler.

cluster = SGECluster(processes=<PROCESSES>, cores=1, memory='1GB',
                     job_extra=["-pe smp <PROCESSES>"]
)

when scale(n=<WORKERS>) is called, the number of jobs submit to the cluster is scaled down by a factor equal to the number or processes.
As an example, if = 10, scale(n=200) would start only 20 jobs, each of which
will allocate 10 cores on the cluster.

This is not the same as using a task array, and it would have a number of drawbacks, but it

  • would decrease the amount of requests to the scheduler (which was our original goal)
  • would work without any change in the code and
  • could be documented as best practices.

I honestly think that some of the implementation suggestions in this thread would be good to the library, and would have minimal maintenance impact.
But since we are not all thinking the same, maybe this is a valid alternative.
Does it make sense?

@lesteve
Copy link
Member

lesteve commented Jun 1, 2020

As an example, if = 10, scale(n=200) would start only 20 jobs, each of which
will allocate 10 cores on the cluster.

Sorry I assumed you knew about this approach from the beginning 😅 ... this was implicitly mentioned in #196 (comment) for example but the discussion is so long that it is hard to keep track of everything of course. It would be great if you could add it to the documentation! Not sure the best place to put it, I would say put it in the newly created "Advanced tips and tricks" for now.

In my mind this is one of the reason why job array is not a "must-have at all cost" thing in Dask-Jobqueue. Another key reason also is that you can easily reuse the same job for multiple parameters. When you submit each job directly with qsub the easiest thing to do is to launch one job per parameter (and then you gradually move to job array because that's best practice). If you could ask that to the documentation as well, that would be great too, maybe in a "How to reduce number of jobs" or a better title? Note that having smaller jobs may also allow you to get through the queue quicker, so there is a compromise here that will depend a lot on each cluster configuration.

Note that there is something a bit special about SGE that you have to use job_extra. For other job schedulers only using processes=4 will be enough. IIRC with SGE-like, the name of the parallel environment can be different on each cluster (kind of the same that the memory constraint can use a different name we have some doc on it added by @ericmjl somewhere).

But since we are not all thining the same, maybe this is a valid alternative.

I think we are slowly converging 😄, I now think that your new_worker_spec approach is the most reasonable one (maybe with a few tweaks) and that eventually something along this lines should be part of distributed (my current thinking is that self._name on the SpecCluster should be used in the name of the workers so that we can pass FooCluster(name='whatever_$ENV_VARIABLE_FOR_TASK_ID', ...), or maybe a way to pass a worker_name_func)

@lesteve
Copy link
Member

lesteve commented Jun 1, 2020

Maybe a better name for the section to create would be "How do I do the equivalent of job array with Dask-Jobqueue?". This is a question that a lot of people who are familiar with HPC clusters will be asking themselves so that would be very useful!

@halsafar
Copy link

halsafar commented Jun 2, 2020

We mandate use of job-arrays because our SGE head node cannot keep up if everyone submits single jobs.

When you submit each job directly with qsub the easiest thing to do is to launch one job per parameter (and then you gradually move to job array because that's best practice).

Exactly the opposite. There is no 'gradually move to job arrays' for us. We start with a large job array request and then as nodes die/crash/etc we spawn them with single qsub requests.

@lesteve
Copy link
Member

lesteve commented Jun 3, 2020

@halsafar interesting I would be curious to learn a bit more about your cluster best practice. IIUC you can submit single job (e.g. when a single job crashed and you need to resubmit it) so what is the recommended threshold of number of jobs for which it is acceptable for launching a few very similar single jobs. Say you launch a job array of 100 jobs and 10 crash. Can you relaunch the 10 with single jobs, do you have to do a job array? Say 5 job crash, same question.

Also, just a small comment but rather important comment. You probably did not mean it that way, but "Exactly the opposite" could be easily interpreted as "How could you possibly say this you damn moron" (obviously I am exaggerating a little bit here to make my point, but on this particular one I was indeed a bit annoyed by your "Exactly the opposite" and it took me 5-10 minutes before I was ready to compose a calm reply). IMO terms like this can easily be removed or toned down which makes for a more happy and productive discussion. A good recent related blog post from @GaelVaroquaux: http://gael-varoquaux.info/programming/technical-discussions-are-hard-a-few-tips.html.

Something that could be mentioned somewhere also in the doc is that Dask-Jobqueue may not be the right tool for you if you need a lot more flexibility than what it provides. An alternative while still using Dask on a HPC cluster is https://docs.dask.org/en/latest/setup/hpc.html?highlight=hpc#using-a-shared-network-file-system-and-a-job-scheduler. So it is more manual in the sense that you submit the dask-worker yourself but you have more control for example you can chose to use job arrays when you submit the dask-worker jobs (as it is shown in the documentation).

@ericmjl
Copy link
Contributor

ericmjl commented Jun 4, 2020

IMO terms like this can easily be removed or toned down which makes for a more happy and productive discussion.

@lesteve thank you for pointing this out. I noticed many on GitHub do not practice communicating "neutrally" and/or "effusively", and simply write out what's on their mind. This is a wonderful reminder.

@lesteve
Copy link
Member

lesteve commented Jun 5, 2020

I opened dask/distributed#3855 to allow SpecCluster name to be used in the worker names. That would allow to use job arrays with something like this:

cluster = SGECluster( 
	cores=1, memory='1GB', 
	job_extra=['-t 1-3'],
    name='test_$SGE_TASK_ID')

Notes:

  • this is a work-around with caveats as noted in this issue. .scale(jobs=1) would actually launch 3 jobs (length of the job arrays) and create 3 Dask workers (one of each job inside the job arrays)
  • this PR is not a replacement at all for the documentation improvements mentioned above would still be extremely useful.

I tested dask/distributed#3855 and it seems to be working as intended. There could still be weird things with the spec group, I have to say I don't really understand this and don't have the time to look at it more in the near future.

@sntgluca
Copy link
Author

Hi @lesteve ,
I apologise, I could not find time to follow up.

So, given the implementation on dask/distributed, do you think there's a reasonable implementation, and it would be best to work on the documentation?

@lesteve
Copy link
Member

lesteve commented Jan 15, 2021

That was the kind of work-around I had in mind for job arrays: dask/dask#7070 (comment).

The known caveats are:

  • scaling is weird .scale(jobs=1) would actually submit full job-array so you get length(job-array) jobs
  • adaptive very likely doesn't behave correctly because of the scaling quirk above

Of course there may be unknown caveats ...

@lesteve lesteve closed this as completed Jan 15, 2021
@jmuchovej
Copy link
Contributor

jmuchovej commented Jan 15, 2021

@lesteve If I'm understanding "adaptive" – I was actually using .adapt(minimum_jobs=2, maximum_jobs=8) in my script, and it scaled up with no hiccups. Never had to scale down, so I can't speak to it, but this work-around could be clearly labeled with "use at your own risk, it's minimally supported, but it works". (I'm sure you had this in mind already, just iterating for clarity.)

There might also be some logic that could be user-implemented to adapt the size of the array with .scale/.adapt, but it probably needs a clearer implementation/more thought.

@lesteve
Copy link
Member

lesteve commented Jan 15, 2021

With my maintainer had on I would rather be even more conservative and say "use at your own risk, this may explode in your face at any moment" 😉

@lesteve
Copy link
Member

lesteve commented Jan 20, 2021

In the development version this is a bit easier to use job arrays thanks to #480, see dask/dask#7070 (reply in thread)

@shaohao99
Copy link

shaohao99 commented Apr 21, 2021

@guillaumeeb For this comment,

We've a limitation of 200 concurrent individual jobs, 
or 10 job arrays, but with up to 100,000 jobs each. 

do you use Slurm or other job schedulers? If it is Slurm, would you like to share how to implement that? Thank you.

We use Slurm on our cluster and would want to implement this: N concurrent individual jobs and M concurrent jobs in all job arrays (e.g. m job arrays with n jobs in each and M=n*m), and M is much larger than N. I have read the Slurm documentation, but it seems there is no way to separate the individual jobs and jobs in arrays. We can only set the maximum number of all jobs. Not sure if I am correct.

@guillaumeeb
Copy link
Member

do you use Slurm or other job schedulers? If it is Slurm, would you like to share how to implement that? Thank you.

No, we are currently using PBS Pro. And since a recent update, we cannot do that anymore. We just have now a limit of 100,000 jobs in queue, be it standard job (which is huge) or jobs in array. So same limitation you seem to have.

I cannot help with Slurm, but surely you can find some channel to ask Slurm developers? Maybe on github https://github.com/SchedMD/slurm?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests