Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional modes of parallelism #42

Closed
wlandau-lilly opened this issue Jun 7, 2017 · 41 comments
Closed

Additional modes of parallelism #42

wlandau-lilly opened this issue Jun 7, 2017 · 41 comments

Comments

@wlandau-lilly
Copy link
Collaborator

zeromq.org

@wlandau-lilly wlandau-lilly changed the title make(..., parallelism = "zeromq") Other modes of parallelism Aug 11, 2017
@wlandau-lilly
Copy link
Collaborator Author

Also Hadoop and sparklyr.

@wlandau-lilly wlandau-lilly changed the title Other modes of parallelism Additional modes of parallelism Aug 11, 2017
@wlandau-lilly
Copy link
Collaborator Author

From @mrchypark, see airflow and luigi.

@mrchypark
Copy link

others in this repo

@wlandau-lilly
Copy link
Collaborator Author

For Spark, it seems like sparklyr::spark_apply() would be ideal. We could wrap it into a worker for parallel_stage() like worker_parLapply() or worker_mclapply().

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Sep 6, 2017

I just talked to a colleague, and Spark may not be the best option here. sparklyr::spark_apply() somehow might maybe possibly come into play for #77, but it does not need to. Spark is apparently not designed for general parallelism tasks, it is more about parallelism when you're splitting up a large data file. Drake is far more general.

On the other hand, Docker may be like Makefile parallelism, but more reproducible and robust. It may even help solve #76 with deployment hooks. @mrchypark, what do you think?

@mrchypark
Copy link

@wlandau-lilly good point. I agree docker is good choice. it's because you can monitor and kill processes. How can I help you?

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Sep 7, 2017

It would be great to strategize with you because there are a multiple ways to approach this. Today, a colleague suggested that we could use a dockerfile like the current Makefile, constructing it based on the DAG and using it to spawn a separate R session for each target. Alternatively, we might define a docker-specific parallel worker like I did for the parallel::mclapply() and parallel::parLapply() backends.

I am probably missing other options and caveats here because I have never actually used docker before. Do you know of any tutorials specific to this situation?

@mrchypark
Copy link

batchtools has docker parallel mode and dockerflow is control docker with dataflow in google. I'll keep in touch any others find out.

@AlexAxthelm
Copy link
Collaborator

I would suggest making docker an option, but not the default one. An issue that I've run into is that it requires admin rights to run (not just install) on Windows. So in a heavily controlled IT environment, docker is a non-starter for some users 😢 Make comes with Rtools however, and is more likely to be installed.

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Sep 7, 2017

@AlexAxthelm I totally agree. Even if everyone had docker, I would probably still keep the current default: parLapply for Windows and mclapply for all other platforms. These are the most R-focused options, and drake is trying to be the most R-focused pipeline toolkit.

@HenrikBengtsson
Copy link

HenrikBengtsson commented Sep 10, 2017

May I suggest using futures of the future package (I'm the author) for parallelism / distributed processing? it might provide a generic solution to what your shooting at here.

The idea is that you write code once and then the user can choose whatever backend they'd like, e.g. any backend parallel provides (built-in in future), or HPC-scheduler backends via batchtools (in future.batchtools). Implementing the Future API for additional backends is not that hard.

Regarding Docker-based workers: there are some examples how to launch those in ?parallel:makeClusterPSOCK ?future:makeClusterPSOCK.

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Sep 10, 2017

Absolutely! We can connect future (and by extension, batchtols) and Docker, all in an R-focused way!

I heard that future is taking the R world by storm. I have not yet used it myself, but it sounds ideal here. Do you think any existing parallel backends should be overwritten, or could new ones just be added?

I would love to launch Docker jobs with parallel::makePSOCKcluster() because the code is already set up for this sort of approach.

The changes may not happen immediately, but this is a high priority for the future development of drake.

@HenrikBengtsson
Copy link

Cool!

Do you think any existing parallel backends should be overwritten, or could new ones just be added?

I would add support for a "future" option first, and only later when it's shown to work make it the default. With that in place, one could deprecate existing implementations that is already covered by "future", i.e. lapply covered by sequential, mclapply by multicore, parLapply by multisession (the latter two by multiprocess). I'm conservative when it comes to API changes, so I wouldn't do that in a single release. If that was your question.

I should admit I haven't run drake yet, so I'm shooting from the hip here, but it could be that the following will work out of the box (quick tweak of worker_lapply()):

worker_future_lapply <- function(targets, hash_list, config){
  prune_envir(targets = targets, config = config)
  values <- future::future_lapply(X = targets, FUN = build,
    hash_list = hash_list, config = config)
  assign_to_envir(target = targets, value = values, config = config)
}

This of course assumes that build() doesn't modify global variables etc. As long as it's a true functional call it should work.

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Sep 10, 2017

I agree. For a first step, it seems wise to append a future backend without modifying the other backends. I was just thinking that it would be really slick to totally replace lapply.R, mclapply.R, and parLapply.R with something like worker_future_lapply() with an additional argument to select the backend within future. If future covers the current non-Makefile functionality, the API will stay totally back-compatible: "mclapply" and "parLapply" will still be elements of parallelism_choices() (i.e. possible values of the parallelism argument to make()) but they will accompany synonyms "future", "sequential", "multicore", "multisession", etc. No deprecation required.

In case you are interested and get to this before I do (see #21), I will gladly review pull requests to add a new future.R with run_*() and worker_*() functions, along with additions to parallelism_choices() and possibly documentation.

For anyone else reading this thread who is new to drake's internals, the run_*() functions are called near the end of make(). Since I use get(), appearances of run_lapply(), for example, may be difficult to spot.

@wlandau-lilly
Copy link
Collaborator Author

Also, global variables are generally protected from build(), but the user could theoretically write workflow commands that insist on modifying globalenv(). I routinely test typical use cases for make(..., envir = globalenv()) vs make(..., envir = some_custom_environment) for both parLapply and Makefile parallelism, and it will be straightforward to see what happens for future.

@wlandau-lilly
Copy link
Collaborator Author

Also: @emittman and @nachalca are having trouble with the advertised SLURM functionality. I think we could get a Docker image of a SLURM setup and use it to reproduce the setbacks and add an example, maybe a vignette too.

@wlandau-lilly
Copy link
Collaborator Author

Relevant: #99. I am hoping Docker parallelism turns out to be just Makefile parallelism with the right prepend and recipe_command arguments to make(). That would be super easy and slick.

Historically in Makefile parallelism, targets have always been made with Rscript -e 'drake::mk(...)'. But as of 9904c03, you can supply a custom recipe_command argument: for example, make(..., recipe_command = "R -q -e"). Example:

library(drake)
load_basic_example()
# Find "Rscript -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4)
# Find "R -q -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e")

@wlandau-lilly
Copy link
Collaborator Author

@emittman and @nachalca, do you think recipe_command could also help solve the difficulties you have been having with SLURM? Maybe you could try something like make(..., parallelism = "Makefile", jobs = 2, recipe_command = "sbatch ...").

@wlandau-lilly
Copy link
Collaborator Author

After some more investigation, and I think it would be unwise to go through Makefile parallelism to leverage Docker as a job scheduler.

My next actions on this issue will be to learn @HenrikBengtsson's future and @mllg's batchtools. Depending on what I learn, I will decide to leverage one or both these packages directly. The eventual solution should completely unlock batchtools, address the retry/timeout functionality that @mrchypark brought up in #76, and hopefully leverage Docker somehow. At a glance, I see that batchtools supports SLURM, which will help stat grad students at Iowa State. @emittman, my day job is taking a turn for the strange, so I may not have time to solve this before you defend and graduate.

@HenrikBengtsson
Copy link

I'm here if you need guidance; since I'm not familiar with the inner details of drake I cannot give exact suggestions, but if you already got an mclapply() backend, just try to do a cut'n'paste of that setup replacing mclapply() with future_lapply(). Then all futures "should" work out of the box.

A first test would be to run with plan(multicore), which uses forked processes. A second test would be to run with plan(multisession), which use background R session on the same machine - this adds a bit of challenge when it comes to globals but the future framework automatically takes care of most common use cases and you don't have to worry about manually exporting globals (although that's possible). If you've got a parLapply() backend already implemented, then you can just borrow from there which globals you export. When future_lapply(), or future() calls, is working with the above backends, then plan(future.batchtools::batchtools_local) should work out of the box. After that, testing with one of the HPC schedulers should also work after making sure you have a working batchtools template; plan(future.batchtools::batchtools_torque).

FYI, the Future API does not have a concept of restarting/retrying, but it does forward exceptions of various error classes meaning it provides the basic needs for a "restart" wrapper. It might be that the concept of restart will be added to the Future API at some point, but I'm quite conservative when it comes to adding features to the API since those must work for all backends etc. It could also be that a restart feature better belongs to a high-level API on top of the Future API.

About Docker

In one of my previous comments I had a typo in my reference to ?future::makeClusterPSOCK. It's example contains:

## Setup of Docker worker running rocker/r-base
## (requires installation of future package)
cl <- makeClusterPSOCK(
  "localhost",
  ## Launch Rscript inside Docker container
  rscript = c(
    "docker", "run", "--net=host", "rocker/r-base",
    "Rscript"
  ),
  ## Install future package
  rscript_args = c(
    "-e", shQuote("install.packages('future')")
  ),
  dryrun = TRUE
)

Note how that sets up a regular PSOCK cluster, so nothing specific to futures there (except the illustration on how to a install package on the fly); you can use that with parLapply() if you'd like.

@wlandau-lilly
Copy link
Collaborator Author

Thank you @HenrikBengtsson, this is exactly what I need to start tinkering! I did not realize that future::plan() could set the backend for future_lapply(). That's a really cool design choice. (I now regret choosing plan() as the name of a prominent drake function.) Would I use future::plan() to tell future_lapply() to use the PSOCK cluster like the cl you defined?

@HenrikBengtsson
Copy link

Yes, you can use plan(cluster, workers = cl) where cl is any cluster object. With "cluster" futures, the backend is effectively utilizing the same framework as parallel::parLapply() would do.

@wlandau-lilly
Copy link
Collaborator Author

So much power for such a small change in the code base! When I get the chance, I will definitely implement future_lapply() as the very next parallel backend extension. If enough future/future.batchtools backends work, I will consider this issue solved. The other backends will stay as they are.

@wlandau-lilly
Copy link
Collaborator Author

By the way: in prose, how does future::plan() work? Does it set global options? Where does it store the workers?

@HenrikBengtsson
Copy link

Yes, it keeps track of it in a "global" part of its local environment, cf. https://github.com/HenrikBengtsson/future/blob/master/R/zzz.plan.R#L78-L85. PS. Note that you can have nested "plans" (strategies), e.g. plan(list(batchjobs_sge, multiprocess)), so that if you "drake" via an SGE scheduler, then each jobs launched will be configured to use plan(multiprocess). If nested strategies are not specified, the default is always plan(sequential).

@wlandau-lilly
Copy link
Collaborator Author

It just occurred to me: each make(..., parallelism = "parLapply") makes a new PSOCK cluster every time, but a single future::plan(cluster, workers = cl) should work for multiple make()s. This could easily quadruple the speed of the package checks. I cannot tell you how much easier this will make development!

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Oct 22, 2017

I really like the sound of nested parallelism. #28 is no longer a moonshot!

I started to sketch some code on the 'future' branch of a separate fork. plan(multicore) seems to work mostly okay. Many console messages disappear, but the targets seem to build just fine. On the other hand, plan(multisession) fails. I could fix it easily with future equivalents of clusterCall() and clusterExport(), but they need to generalize beyond just PSOCK clusters. Is there such functionality?

As you anticipated, the trouble is all around global exports. It is the exact same scenario as parLapply parallelism, so I think I understand what to do. I need to export the user's globals to all the workers, and I need to run do_prework(), prune_envir(), and assign_to_envir() on all the workers before starting any real jobs. For efficiency's sake, I have a strong preference for running do_prework() once per make() and running prune_envir() and assign_to_envir() once per future_lapply().

By the way, is there a way to set the maximum number of parallel workers independently of the parallel backend? I am hoping to protect the less computationally-inclined users from accidentally spawning too many jobs and crashing computer systems.

For the record: just like Makefile parallelism, we also should process all the imports locally with the default backend before touching any of the targets. No sense in submitting a separate SLURM or TORQUE job to import every single relevant user-defined function.

@HenrikBengtsson
Copy link

I started to sketch some code on the 'future' branch of a separate fork. plan(multicore) seems to work mostly okay. Many console messages disappear, but the targets seem to build just fine. On the other hand, plan(multisession) fails.

If you know what the globals are you can always explicitly export them by specifying globals argument (or future.globals if you use future_lapply()). Ideally the future + globals framework should be able to automatically identify what these globals are, but there are cases where this is very hard or very costly/time consuming. I would need to spend time with drake to give a better answer. Do you have a sense on what globals are missing? If you run with options(future.debug = TRUE) you'll be able to see what globals the future framework finds. Runtime error messages will of course show the first global that is missing.

I could fix it easily with future equivalents of clusterCall() and clusterExport(), but they need to generalize beyond just PSOCK clusters. Is there such functionality?

What you're getting to here is the concept of "persistent" workers. The basics of a future (in programming terms) do not support the concept of "persistence". Futures don't have a memory of the past and when resolved they conceptually turn into a fixed value. Having said this, cluster/multisession futures have an experimental support for persistency, but please note that it is indeed experimental (futureverse/future#60) because it only makes sense for certain types of backend. Much more work/thoughts need to go into this concept before.

As you anticipated, the trouble is all around global exports. It is the exact same scenario as parLapply parallelism, so I think I understand what to do. I need to export the user's globals to all the workers, and I need to run do_prework(), prune_envir(), and assign_to_envir() on all the workers before starting any real jobs. For efficiency's sake, I have a strong preference for running do_prework() once per make() and running prune_envir() and assign_to_envir() once per future_lapply().

Putting aside the current drake framework/implementation, do you really need persistent workers? Is it because you pass very large globals, which will be too expensive to pass multiple times? Or is it that it is such an essential part of the drake API/concept? Note that it does not really make sense to think of a persistent worker when you submit tasks as individual jobs on an HPC scheduler.

By the way, is there a way to set the maximum number of parallel workers independently of the parallel backend? I am hoping to protect the less computationally-inclined users from accidentally spawning too many jobs and crashing computer systems.

I believe you're referring to the default number of workers used when specifying plan(multicore), plan(multiprocess), etc. The default is future::availableCores(). It acknowledges several know environment variables and R options. For instance, if you set options(mc.cores = 2L) it will not use more than 2 cores. See also ?future.options, e.g. there are ways for sysadm to protect against overload/mis-usage via site-wide environment variables.

@wlandau-lilly
Copy link
Collaborator Author

Passing globals and loading packages takes a lot of overhead. I did some tinkering this weekend and confirmed that this is the reason why drake's parLapply backend is unavoidably so much slower than the mclapply one. However, this is a small price to pay to fully embrace the future paradigm and unlock its full power. Now that I know that non-persistence is so important, I will take the time to prepare each worker for each target individually (and process the imports beforehand in a separate step). The implementation will be almost identical to Makefile parallelism, and the behavior will be similar: high-overhead, but able to deploy on a variety of distributed systems. With #76 already solved to the extent that R.utils::withTimeout() allows, I think our bases will be covered.

@wlandau-lilly
Copy link
Collaborator Author

wlandau-lilly commented Oct 24, 2017

Good news: I just implemented future_lapply as a drake backend, and it works great on multisession, multicore, and batchtools_local! I already did all the heavy lifting with Makefile parallelism. Will merge soon.

Just one sticking point: For plan(multisession) and plan(batchtools_local), I lose the console output that tells me when I'm building a target. @HenrikBengtsson, is there any way to preserve stdout()?

Remaining things to do before I consider this issue solved:

  • Write a tutorial for the "future_lapply" backend in the parallelism vignette.
  • Demo "future_lapply" backend in the basic example.
  • Add future_lapply to the long testing scenarios with each of the multicore, multisession, and batchtools_local backends, making sure to hit both the local and global environments for each.
  • Deprecate drake::plan() in favor of workflow() due to the name conflict with future::plan().
  • Add .onLoad() tip about the future backends.

@wlandau-lilly
Copy link
Collaborator Author

Now implemented! @HenrikBengtsson, thank you so much for your generous help!

I document the new "future_lapply" backend in the parallelism vignette. @jarad, @emittman, @nachalca, and @raymondkww, you may be interested in a new way to run things on SLURM. We may want to update @nachalca's SLURM array example if this works.

library(future)
library(future.batchtools)
library(drake)

backend(batchtools_slurm) # same as future::plan()

# Alternatively, plan to send targets to SLURM and then give each target multiple cores.
# This should work if you use `future_lapply()` somewhere within a target's command.
# backend(list(batchtools_slurm, multicore))

# Cap the max jobs with options(mc.cores = 2) or something similar from ?future.options
load_basic_example()
make(my_plan, parallelism = "future_lapply")

@wlandau-lilly
Copy link
Collaborator Author

Note: for job schedulers, we will need template files like the one for TORQUE. See futureverse/future.batchtools#9.

@wlandau-lilly
Copy link
Collaborator Author

Update: I found a trove of template files at https://github.com/mllg/batchtools/tree/master/inst/templates. I put together built-in quickstart examples for SLURM and the Sun/Univa Grid Engine, but neither is working. I do not have any real experience with batchtools, so any help would be appreciated.

@HenrikBengtsson
Copy link

Awesome. Great to hear you made such progress so quickly. Hopefully the road will be smooth for users testing this out. From the perspective of globals, future, future.batchtools, this will add valuable real-world testing to those frameworks.

@wlandau-lilly
Copy link
Collaborator Author

Another thing: @HenrikBengtsson, in your example of a PSOCK cluster for Docker, what does the dryrun argument do?

@HenrikBengtsson
Copy link

HenrikBengtsson commented Oct 25, 2017

Your referring to the example("makeClusterPSOCK", package = "future") where all example snippets use makeClusterPSOCK(..., dryrun = TRUE) (not just the one for Docker). It's there so that the example is actually runnable - dryrun = TRUE does everything but launches the cluster/workers. It's actually most informative if you also use verbose = TRUE at the same time. (Maybe verbose = dryrun should be the default). UPDATE: dryrun=TRUE does indeed output a message.

@HenrikBengtsson
Copy link

My last sentence was incorrect; dryrun=TRUE outputs an informative message, e.g.

> cl <- makeClusterPSOCK(2L, dryrun = TRUE)
----------------------------------------------------------------------
Manually start worker #1 on 'localhost' with:
  '/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
----------------------------------------------------------------------
Manually start worker #2 on 'localhost' with:
  '/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
> cl
socket cluster with 2 nodes on host 'NULL'

(but OTH I see that the returned "cluster" object contains NULL nodes.)

@wlandau-lilly
Copy link
Collaborator Author

Thanks, I see. I just realized I mistook future::makeClusterPSOCK() with parallel::makePSOCKcluster(), which is why I did not find dryrun to begin with.

@wlandau-lilly
Copy link
Collaborator Author

FYI: at some point, future::future_lapply() will move to an upcoming future.apply package (futureverse/future#159). When future.apply is on CRAN and all the builds complete, I will make the adjustment to drake.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants