-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Additional modes of parallelism #42
Comments
From @mrchypark, see airflow and luigi. |
others in this repo |
For Spark, it seems like sparklyr::spark_apply() would be ideal. We could wrap it into a worker for parallel_stage() like worker_parLapply() or worker_mclapply(). |
I just talked to a colleague, and Spark may not be the best option here. sparklyr::spark_apply() somehow might maybe possibly come into play for #77, but it does not need to. Spark is apparently not designed for general parallelism tasks, it is more about parallelism when you're splitting up a large data file. Drake is far more general. On the other hand, Docker may be like |
@wlandau-lilly good point. I agree docker is good choice. it's because you can monitor and kill processes. How can I help you? |
It would be great to strategize with you because there are a multiple ways to approach this. Today, a colleague suggested that we could use a dockerfile like the current Makefile, constructing it based on the DAG and using it to spawn a separate R session for each target. Alternatively, we might define a docker-specific parallel worker like I did for the parallel::mclapply() and parallel::parLapply() backends. I am probably missing other options and caveats here because I have never actually used docker before. Do you know of any tutorials specific to this situation? |
batchtools has docker parallel mode and dockerflow is control docker with dataflow in google. I'll keep in touch any others find out. |
I would suggest making docker an option, but not the default one. An issue that I've run into is that it requires admin rights to run (not just install) on Windows. So in a heavily controlled IT environment, docker is a non-starter for some users 😢 |
@AlexAxthelm I totally agree. Even if everyone had docker, I would probably still keep the current default: |
May I suggest using futures of the future package (I'm the author) for parallelism / distributed processing? it might provide a generic solution to what your shooting at here. The idea is that you write code once and then the user can choose whatever backend they'd like, e.g. any backend parallel provides (built-in in future), or HPC-scheduler backends via batchtools (in future.batchtools). Implementing the Future API for additional backends is not that hard. Regarding Docker-based workers: there are some examples how to launch those in |
Absolutely! We can connect I heard that I would love to launch Docker jobs with The changes may not happen immediately, but this is a high priority for the future development of drake. |
Cool!
I would add support for a I should admit I haven't run drake yet, so I'm shooting from the hip here, but it could be that the following will work out of the box (quick tweak of worker_future_lapply <- function(targets, hash_list, config){
prune_envir(targets = targets, config = config)
values <- future::future_lapply(X = targets, FUN = build,
hash_list = hash_list, config = config)
assign_to_envir(target = targets, value = values, config = config)
} This of course assumes that |
I agree. For a first step, it seems wise to append a In case you are interested and get to this before I do (see #21), I will gladly review pull requests to add a new For anyone else reading this thread who is new to |
Also, global variables are generally protected from |
Also: @emittman and @nachalca are having trouble with the advertised SLURM functionality. I think we could get a Docker image of a SLURM setup and use it to reproduce the setbacks and add an example, maybe a vignette too. |
Relevant: #99. I am hoping Docker parallelism turns out to be just Historically in library(drake)
load_basic_example()
# Find "Rscript -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4)
# Find "R -q -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4,
recipe_command = "R -q -e") |
After some more investigation, and I think it would be unwise to go through My next actions on this issue will be to learn @HenrikBengtsson's future and @mllg's batchtools. Depending on what I learn, I will decide to leverage one or both these packages directly. The eventual solution should completely unlock |
I'm here if you need guidance; since I'm not familiar with the inner details of drake I cannot give exact suggestions, but if you already got an A first test would be to run with FYI, the Future API does not have a concept of restarting/retrying, but it does forward exceptions of various error classes meaning it provides the basic needs for a "restart" wrapper. It might be that the concept of restart will be added to the Future API at some point, but I'm quite conservative when it comes to adding features to the API since those must work for all backends etc. It could also be that a restart feature better belongs to a high-level API on top of the Future API. About DockerIn one of my previous comments I had a typo in my reference to ## Setup of Docker worker running rocker/r-base
## (requires installation of future package)
cl <- makeClusterPSOCK(
"localhost",
## Launch Rscript inside Docker container
rscript = c(
"docker", "run", "--net=host", "rocker/r-base",
"Rscript"
),
## Install future package
rscript_args = c(
"-e", shQuote("install.packages('future')")
),
dryrun = TRUE
) Note how that sets up a regular PSOCK cluster, so nothing specific to futures there (except the illustration on how to a install package on the fly); you can use that with |
Thank you @HenrikBengtsson, this is exactly what I need to start tinkering! I did not realize that |
Yes, you can use |
So much power for such a small change in the code base! When I get the chance, I will definitely implement |
By the way: in prose, how does |
Yes, it keeps track of it in a "global" part of its local environment, cf. https://github.com/HenrikBengtsson/future/blob/master/R/zzz.plan.R#L78-L85. PS. Note that you can have nested "plans" (strategies), e.g. |
It just occurred to me: each |
I really like the sound of nested parallelism. #28 is no longer a moonshot! I started to sketch some code on the 'future' branch of a separate fork. As you anticipated, the trouble is all around global exports. It is the exact same scenario as parLapply parallelism, so I think I understand what to do. I need to export the user's globals to all the workers, and I need to run do_prework(), prune_envir(), and assign_to_envir() on all the workers before starting any real jobs. For efficiency's sake, I have a strong preference for running By the way, is there a way to set the maximum number of parallel workers independently of the parallel backend? I am hoping to protect the less computationally-inclined users from accidentally spawning too many jobs and crashing computer systems. For the record: just like |
If you know what the globals are you can always explicitly export them by specifying
What you're getting to here is the concept of "persistent" workers. The basics of a future (in programming terms) do not support the concept of "persistence". Futures don't have a memory of the past and when resolved they conceptually turn into a fixed value. Having said this, cluster/multisession futures have an experimental support for persistency, but please note that it is indeed experimental (futureverse/future#60) because it only makes sense for certain types of backend. Much more work/thoughts need to go into this concept before.
Putting aside the current drake framework/implementation, do you really need persistent workers? Is it because you pass very large globals, which will be too expensive to pass multiple times? Or is it that it is such an essential part of the drake API/concept? Note that it does not really make sense to think of a persistent worker when you submit tasks as individual jobs on an HPC scheduler.
I believe you're referring to the default number of workers used when specifying |
Passing globals and loading packages takes a lot of overhead. I did some tinkering this weekend and confirmed that this is the reason why |
Good news: I just implemented Just one sticking point: For Remaining things to do before I consider this issue solved:
|
Now implemented! @HenrikBengtsson, thank you so much for your generous help! I document the new library(future)
library(future.batchtools)
library(drake)
backend(batchtools_slurm) # same as future::plan()
# Alternatively, plan to send targets to SLURM and then give each target multiple cores.
# This should work if you use `future_lapply()` somewhere within a target's command.
# backend(list(batchtools_slurm, multicore))
# Cap the max jobs with options(mc.cores = 2) or something similar from ?future.options
load_basic_example()
make(my_plan, parallelism = "future_lapply") |
Note: for job schedulers, we will need template files like the one for TORQUE. See futureverse/future.batchtools#9. |
Update: I found a trove of template files at https://github.com/mllg/batchtools/tree/master/inst/templates. I put together built-in quickstart examples for SLURM and the Sun/Univa Grid Engine, but neither is working. I do not have any real experience with |
Awesome. Great to hear you made such progress so quickly. Hopefully the road will be smooth for users testing this out. From the perspective of globals, future, future.batchtools, this will add valuable real-world testing to those frameworks. |
Another thing: @HenrikBengtsson, in your example of a PSOCK cluster for Docker, what does the |
Your referring to the |
My last sentence was incorrect; > cl <- makeClusterPSOCK(2L, dryrun = TRUE)
----------------------------------------------------------------------
Manually start worker #1 on 'localhost' with:
'/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
----------------------------------------------------------------------
Manually start worker #2 on 'localhost' with:
'/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
> cl
socket cluster with 2 nodes on host 'NULL' (but OTH I see that the returned "cluster" object contains |
Thanks, I see. I just realized I mistook |
FYI: at some point, |
zeromq.org
The text was updated successfully, but these errors were encountered: