Skip to content

Commit

Permalink
All parallel backends now prevent nested parallelization, unless expl…
Browse files Browse the repository at this point in the history
…icitly allowed.

This allowed us to drop getExpression() for MultisessionFuture, and argument
'mc.cores' from getExpression() for MulticoreFuture.
  • Loading branch information
HenrikBengtsson committed Jan 2, 2025
1 parent ad85fb7 commit a03539f
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 44 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.34.0-9005
Version: 1.34.0-9006
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ S3method(futures,listenv)
S3method(getExpression,ClusterFuture)
S3method(getExpression,Future)
S3method(getExpression,MulticoreFuture)
S3method(getExpression,MultisessionFuture)
S3method(getExpression,UniprocessFuture)
S3method(globals,Future)
S3method(journal,Future)
Expand Down
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Version (development version)

* ...
## New Features

* All parallel backends now prevent nested parallelization, unless
explicitly allowed, e.g. settings recognized by
`parallelly::availableCores()` or set by the future
`plan()`. Previously, this had to be implemented by each backend,
but now it's handled automatically by the future framework.


# Version 1.34.0 [2024-07-29]
Expand Down
35 changes: 35 additions & 0 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,45 @@ getExpression.Future <- local({
options(future.plan = NULL)
Sys.unsetenv("R_FUTURE_PLAN")

## Limit nested parallelization
## (a) Identify default number of cores - ignoring plan settings
...future.ncores <- base::local({
ans <- NA_integer_

base::options(parallelly.availableCores.fallback = 1L)
ncores <- parallelly::availableCores(which = "all")
ncores <- ncores[ncores != ncores["system"]]
ncores <- ncores[base::setdiff(base::names(ncores), base::c("_R_CHECK_LIMIT_CORES_", "Bioconductor"))]
if (base::length(ncores) > 0) {
if (base::length(ncores) > 1) {
ncores <- ncores[base::setdiff(base::names(ncores), "fallback")]
}
if (base::length(ncores) > 0) {
ans <- base::min(ncores, na.rm = TRUE)
}
}
ans
})

## Use the next-level-down ("popped") future strategy
future::plan(.(strategiesR), .cleanup = FALSE, .init = FALSE)

if (!is.na(...future.ncores)) {
## (b) Identify default number of cores - acknowledging plan settings
...future.ncores <- base::local({
nworkers <- future::nbrOfWorkers()
base::min(base::c(nworkers, ...future.ncores), na.rm = TRUE)
})
}

if (!is.na(...future.ncores)) {
...future.options.ncores <- base::options(mc.cores = ...future.ncores)
base::on.exit(base::options(...future.options.ncores), add = TRUE)
}

})


## Reset future strategies when done
tmpl_exit_plan <- bquote_compile({
## covr: skip=2
Expand Down
12 changes: 8 additions & 4 deletions R/MulticoreFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ run.MulticoreFuture <- function(future, ...) {
FutureRegistry(reg, action = "add", future = future, earlySignal = TRUE)

future.args <- list(expr)
job <- do.call(parallel::mcparallel, args = future.args, envir = envir)
job <- local({
oopts <- options(mc.cores = NULL)
on.exit(options(oopts))
do.call(parallel::mcparallel, args = future.args, envir = envir)
})

future$job <- job
future$state <- "running"
Expand Down Expand Up @@ -333,7 +337,7 @@ getExpression.MulticoreFuture <- local({
.(expr)
})

function(future, expr = future$expr, mc.cores = 1L, immediateConditions = TRUE, conditionClasses = future$conditions, resignalImmediateConditions = getOption("future.multicore.relay.immediate", immediateConditions), ...) {
function(future, expr = future$expr, immediateConditions = TRUE, conditionClasses = future$conditions, resignalImmediateConditions = getOption("future.multicore.relay.immediate", immediateConditions), ...) {
## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()

Expand Down Expand Up @@ -367,7 +371,7 @@ getExpression.MulticoreFuture <- local({
## Set condition classes to be ignored in case changed
attr(conditionClasses, "exclude") <- exclude
} ## if (resignalImmediateConditions && immediateConditions)
NextMethod(expr = expr, mc.cores = mc.cores, immediateConditions = immediateConditions, conditionClasses = conditionClasses)

NextMethod(expr = expr, immediateConditions = immediateConditions, conditionClasses = conditionClasses)
}
})
22 changes: 0 additions & 22 deletions R/MultisessionFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,3 @@ MultisessionFuture <- function(expr = NULL, substitute = TRUE, envir = parent.fr
future <- structure(future, class = c("MultisessionFuture", class(future)))
future
}


#' @export
getExpression.MultisessionFuture <- function(future, mc.cores = 1L, ...) {
## NOTE: In order to override the default 'mc.cores = NULL' of
## getExpression.Future(), we have to pass it as a named argument to
## NextMethod(). If not done, that is, if we just call NextMethod(), then
## 'mc.cores' will resolve to the default (= NULL). If we don't name the
## argument - NextMethod("getExpression", mc.cores) - then the default
## will still be NULL.
## The problem with using NextMethod(mc.cores = mc.cores) is that if we
## call getExpression(f, 2L) instead of getExpression(f, mc.cores = 2L),
## then the call will become getExpression.Future(f, 2L, mc.cores = 2L).
## I don't think there is a solution here, except to enforce that all
## arguments to a method that uses NextMethod() must be named.
## See also https://github.com/HenrikBengtsson/Wishlist-for-R/issues/44
## /HB 2018-06-13

## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()
NextMethod(mc.cores = mc.cores)
}
28 changes: 14 additions & 14 deletions R/UniprocessFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,21 @@ getExpression.UniprocessFuture <- local({
.(exit)
})

function(future, immediateConditions = TRUE, exit = NULL, ...) {
## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()

## Preserve RNG state?
oseed <- get_random_seed()
if (is.null(oseed)) {
okind <- RNGkind()[1]
exit <- bquote_apply(tmpl_exit_rng_remove)
} else {
exit <- bquote_apply(tmpl_exit_rng_undo)
function(future, immediateConditions = TRUE, exit = NULL, ...) {
## Assert that no arguments but the first is passed by position
assert_no_positional_args_but_first()

## Preserve RNG state?
oseed <- get_random_seed()
if (is.null(oseed)) {
okind <- RNGkind()[1]
exit <- bquote_apply(tmpl_exit_rng_remove)
} else {
exit <- bquote_apply(tmpl_exit_rng_undo)
}

NextMethod(immediateConditions = immediateConditions, exit = exit)
}

NextMethod(immediateConditions = immediateConditions, exit = exit)
}
})


Expand Down
2 changes: 1 addition & 1 deletion R/expressions.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ makeExpression <- local({
base::options(
## Prevent .future.R from being source():d when future is attached
future.startup.script = FALSE,

## Assert globals when future is created (or at run time)?
future.globals.onMissing = .(globals.onMissing),

Expand Down

0 comments on commit a03539f

Please sign in to comment.