Skip to content

Commit

Permalink
Introducing internal evalFuture()
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Dec 29, 2024
1 parent a5a3248 commit a8162f5
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 289 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.34.0-9003
Version: 1.34.0-9093
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
2 changes: 1 addition & 1 deletion R/ClusterFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ requestNode <- function(await, workers, timeout = getOption("future.wait.timeout

#' @export
getExpression.ClusterFuture <- local({
tmpl_expr_conditions <- bquote_compile({
tmpl_expr_conditions <- future:::bquote_compile({
...future.makeSendCondition <- base::local({
sendCondition <- NULL

Expand Down
22 changes: 11 additions & 11 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ getExpression <- function(future, ...) UseMethod("getExpression")
#' @export
getExpression.Future <- local({

tmpl_enter <- bquote_compile({
tmpl_enter <- future:::bquote_compile({
base::local({
## covr: skip=4
## If 'future' is not installed on the worker, or a too old version
Expand Down Expand Up @@ -693,20 +693,20 @@ getExpression.Future <- local({
})
})

tmpl_enter_mccores <- bquote_compile({
tmpl_enter_mccores <- future:::bquote_compile({
## covr: skip=3
.(enter)
...future.mc.cores.old <- base::getOption("mc.cores")
base::options(mc.cores = .(mc.cores))
})

tmpl_exit_mccores <- bquote_compile({
tmpl_exit_mccores <- future:::bquote_compile({
## covr: skip=2
base::options(mc.cores = ...future.mc.cores.old)
.(exit)
})

tmpl_enter_rng <- bquote_compile({
tmpl_enter_rng <- future:::bquote_compile({
## covr: skip=2
.(enter)
## NOTE: It is not needed to call RNGkind("L'Ecuyer-CMRG") here
Expand All @@ -715,7 +715,7 @@ getExpression.Future <- local({
base::assign(".Random.seed", .(future$seed), envir = base::globalenv(), inherits = FALSE)
})

tmpl_enter_packages <- bquote_compile({
tmpl_enter_packages <- future:::bquote_compile({
## covr: skip=3
.(enter)
## TROUBLESHOOTING: If the package fails to load, then library()
Expand All @@ -733,7 +733,7 @@ getExpression.Future <- local({
})
})

tmpl_enter_plan <- bquote_compile({
tmpl_enter_plan <- future:::bquote_compile({
## covr: skip=2
.(enter)

Expand All @@ -749,7 +749,7 @@ getExpression.Future <- local({
})

## Reset future strategies when done
tmpl_exit_plan <- bquote_compile({
tmpl_exit_plan <- future:::bquote_compile({
## covr: skip=2
.(exit)
## Reset option 'future.plan' and env var 'R_FUTURE_PLAN'
Expand All @@ -765,7 +765,7 @@ getExpression.Future <- local({
## .(exit)
})

function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, split = future$split, mc.cores = NULL, exit = NULL, ...) {
function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, seed = future$seed, split = future$split, mc.cores = NULL, exit = NULL, ...) {
debug <- getOption("future.debug", FALSE)
## mdebug("getExpression() ...")

Expand Down Expand Up @@ -835,11 +835,11 @@ getExpression.Future <- local({
exit <- bquote_apply(tmpl_exit_plan)

## Set RNG seed?
if (is.numeric(future$seed)) {
if (is.numeric(seed)) {
enter <- bquote_apply(tmpl_enter_rng)
}
expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = enter, exit = exit, ..., version = version)

expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = enter, exit = exit, ..., seed = seed, packages = pkgs, mc.cores = mc.cores, version = version)
if (getOption("future.debug", FALSE)) mprint(expr)

## mdebug("getExpression() ... DONE")
Expand Down
2 changes: 1 addition & 1 deletion R/MulticoreFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ result.MulticoreFuture <- function(future, ...) {

#' @export
getExpression.MulticoreFuture <- local({
tmpl_expr_disable_multithreading <- bquote_compile({
tmpl_expr_disable_multithreading <- future:::bquote_compile({
## Force single-threaded OpenMP, iff needed
old_omp_threads <- RhpcBLASctl::omp_get_max_threads()
if (old_omp_threads > 1L) {
Expand Down
4 changes: 2 additions & 2 deletions R/UniprocessFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ resolved.UniprocessFuture <- function(x, ...) {

#' @export
getExpression.UniprocessFuture <- local({
tmpl_exit_rng_remove <- bquote_compile({
tmpl_exit_rng_remove <- future:::bquote_compile({
.(exit)
RNGkind(.(okind))
base::rm(list = ".Random.seed", envir = base::globalenv(), inherits = FALSE)
})

tmpl_exit_rng_undo <- bquote_compile({
tmpl_exit_rng_undo <- future:::bquote_compile({
base::assign(".Random.seed", .(oseed), envir = base::globalenv(), inherits = FALSE)
.(exit)
})
Expand Down
Loading

0 comments on commit a8162f5

Please sign in to comment.