diff --git a/DESCRIPTION b/DESCRIPTION index c860b245..3e508659 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,5 +1,5 @@ Package: future -Version: 1.34.0-9003 +Version: 1.34.0-9093 Title: Unified Parallel and Distributed Processing in R for Everyone Imports: digest, diff --git a/R/ClusterFuture-class.R b/R/ClusterFuture-class.R index 83f1caee..3668c113 100644 --- a/R/ClusterFuture-class.R +++ b/R/ClusterFuture-class.R @@ -606,7 +606,7 @@ requestNode <- function(await, workers, timeout = getOption("future.wait.timeout #' @export getExpression.ClusterFuture <- local({ - tmpl_expr_conditions <- bquote_compile({ + tmpl_expr_conditions <- future:::bquote_compile({ ...future.makeSendCondition <- base::local({ sendCondition <- NULL diff --git a/R/Future-class.R b/R/Future-class.R index f700b47b..ea50c0f0 100644 --- a/R/Future-class.R +++ b/R/Future-class.R @@ -658,7 +658,7 @@ getExpression <- function(future, ...) UseMethod("getExpression") #' @export getExpression.Future <- local({ - tmpl_enter <- bquote_compile({ + tmpl_enter <- future:::bquote_compile({ base::local({ ## covr: skip=4 ## If 'future' is not installed on the worker, or a too old version @@ -693,20 +693,20 @@ getExpression.Future <- local({ }) }) - tmpl_enter_mccores <- bquote_compile({ + tmpl_enter_mccores <- future:::bquote_compile({ ## covr: skip=3 .(enter) ...future.mc.cores.old <- base::getOption("mc.cores") base::options(mc.cores = .(mc.cores)) }) - tmpl_exit_mccores <- bquote_compile({ + tmpl_exit_mccores <- future:::bquote_compile({ ## covr: skip=2 base::options(mc.cores = ...future.mc.cores.old) .(exit) }) - tmpl_enter_rng <- bquote_compile({ + tmpl_enter_rng <- future:::bquote_compile({ ## covr: skip=2 .(enter) ## NOTE: It is not needed to call RNGkind("L'Ecuyer-CMRG") here @@ -715,7 +715,7 @@ getExpression.Future <- local({ base::assign(".Random.seed", .(future$seed), envir = base::globalenv(), inherits = FALSE) }) - tmpl_enter_packages <- bquote_compile({ + tmpl_enter_packages <- future:::bquote_compile({ ## covr: skip=3 .(enter) ## TROUBLESHOOTING: If the package fails to load, then library() @@ -733,7 +733,7 @@ getExpression.Future <- local({ }) }) - tmpl_enter_plan <- bquote_compile({ + tmpl_enter_plan <- future:::bquote_compile({ ## covr: skip=2 .(enter) @@ -749,7 +749,7 @@ getExpression.Future <- local({ }) ## Reset future strategies when done - tmpl_exit_plan <- bquote_compile({ + tmpl_exit_plan <- future:::bquote_compile({ ## covr: skip=2 .(exit) ## Reset option 'future.plan' and env var 'R_FUTURE_PLAN' @@ -765,7 +765,7 @@ getExpression.Future <- local({ ## .(exit) }) - function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, split = future$split, mc.cores = NULL, exit = NULL, ...) { + function(future, expr = future$expr, local = future$local, stdout = future$stdout, conditionClasses = future$conditions, seed = future$seed, split = future$split, mc.cores = NULL, exit = NULL, ...) { debug <- getOption("future.debug", FALSE) ## mdebug("getExpression() ...") @@ -835,11 +835,11 @@ getExpression.Future <- local({ exit <- bquote_apply(tmpl_exit_plan) ## Set RNG seed? - if (is.numeric(future$seed)) { + if (is.numeric(seed)) { enter <- bquote_apply(tmpl_enter_rng) } - - expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = enter, exit = exit, ..., version = version) + + expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = enter, exit = exit, ..., seed = seed, packages = pkgs, mc.cores = mc.cores, version = version) if (getOption("future.debug", FALSE)) mprint(expr) ## mdebug("getExpression() ... DONE") diff --git a/R/MulticoreFuture-class.R b/R/MulticoreFuture-class.R index 79c65028..73752677 100644 --- a/R/MulticoreFuture-class.R +++ b/R/MulticoreFuture-class.R @@ -299,7 +299,7 @@ result.MulticoreFuture <- function(future, ...) { #' @export getExpression.MulticoreFuture <- local({ - tmpl_expr_disable_multithreading <- bquote_compile({ + tmpl_expr_disable_multithreading <- future:::bquote_compile({ ## Force single-threaded OpenMP, iff needed old_omp_threads <- RhpcBLASctl::omp_get_max_threads() if (old_omp_threads > 1L) { diff --git a/R/UniprocessFuture-class.R b/R/UniprocessFuture-class.R index d34ea7bf..602571bf 100644 --- a/R/UniprocessFuture-class.R +++ b/R/UniprocessFuture-class.R @@ -101,13 +101,13 @@ resolved.UniprocessFuture <- function(x, ...) { #' @export getExpression.UniprocessFuture <- local({ - tmpl_exit_rng_remove <- bquote_compile({ + tmpl_exit_rng_remove <- future:::bquote_compile({ .(exit) RNGkind(.(okind)) base::rm(list = ".Random.seed", envir = base::globalenv(), inherits = FALSE) }) - tmpl_exit_rng_undo <- bquote_compile({ + tmpl_exit_rng_undo <- future:::bquote_compile({ base::assign(".Random.seed", .(oseed), envir = base::globalenv(), inherits = FALSE) .(exit) }) diff --git a/R/expressions.R b/R/expressions.R index f7745806..b01cb147 100644 --- a/R/expressions.R +++ b/R/expressions.R @@ -1,24 +1,323 @@ makeExpression <- local({ skip <- skip.local <- NULL - tmpl_expr_local <- bquote_compile(base::local(.(expr))) + tmpl_expr_local <- future:::bquote_compile(base::local(.(expr))) - tmpl_enter_optenvar <- bquote_compile({ - ## Start time for future evaluation - ...future.startTime <- base::Sys.time() - - ## Required packages are loaded and attached here - .(enter) - - ## Record R options and environment variables - ## Note, we do this _after_ loading and attaching packages, in - ## case they set options/env vars needed for the session, e.g. - ## https://github.com/Rdatatable/data.table/issues/5375 - ...future.oldOptions <- base::as.list(base::.Options) - ...future.oldEnvVars <- base::Sys.getenv() + tmpl_expr_evaluate2 <- future:::bquote_compile({ + ## Evaluate future + future:::evalFuture(expr = quote(.(expr)), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals.onMissing = .(globals.onMissing), globalenv = .(globalenv), skip = .(skip), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), mc.cores = .(mc.cores)) }) - tmpl_exit_optenvar <- bquote_compile({ + + function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL) { + if (version != "1.8") { + stop(FutureError("Internal error: Non-supported future expression version: ", version)) + } + + conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE) + muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE) + if (is.null(muffleInclude)) muffleInclude <- "^muffle" + + if (immediateConditions && !is.null(conditionClasses)) { + immediateConditionClasses <- getOption("future.relay.immediate", "immediateCondition") + conditionClasses <- unique(c(conditionClasses, immediateConditionClasses)) + attr(conditionClasses, "exclude") <- conditionClassesExclude + attr(conditionClasses, "muffleInclude") <- muffleInclude + } else { + immediateConditionClasses <- character(0L) + } + + if (is.null(skip)) { + ## WORKAROUND: skip = c(7/12, 3) makes assumption about withCallingHandlers() + ## and local(). In case this changes, provide internal options to adjust this. + ## /HB 2018-12-28 + skip <<- getOption("future.makeExpression.skip", c(6L, 3L)) + skip.local <<- getOption("future.makeExpression.skip.local", c(12L, 3L)) + } + + ## Evaluate expression in a local() environment? + if (local) { + expr <- bquote_apply(tmpl_expr_local) + skip <- skip.local + } + + strategies <- plan("list") + strategiesR <- strategies[-1] + if (length(strategiesR) == 0L) { + strategiesR <- getOption("future.plan", sequential) + } else { + ## Identify package namespaces needed for strategies + pkgsS <- lapply(strategiesR, FUN = environment) + pkgsS <- lapply(pkgsS, FUN = environmentName) + pkgsS <- unique(unlist(pkgsS, use.names = FALSE)) + ## CLEANUP: Only keep those that are loaded in the current session + pkgsS <- intersect(pkgsS, loadedNamespaces()) + packages <- unique(c(packages, pkgsS)) + } + + expr <- bquote_apply(tmpl_expr_evaluate2) + + expr + } +}) ## makeExpression() + + + + + + +evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), skip = NULL, packages = NULL, seed = NULL, mc.cores = NULL, strategiesR = future::sequential, envir = parent.frame()) { + stop_if_not( + length(stdout) == 1L && is.logical(stdout), + length(split) == 1L && is.logical(split) && !is.na(split), + is.null(conditionClasses) || (is.character(conditionClasses) && !anyNA(conditionClasses) && all(nzchar(conditionClasses))), + length(immediateConditions) == 1L && is.logical(immediateConditions) && !is.na(immediateConditions), + is.character(immediateConditionClasses) && !anyNA(immediateConditionClasses) && all(nzchar(immediateConditionClasses)), + length(globalenv) == 1L && is.logical(globalenv) && !is.na(globalenv), + length(skip) == 2L && is.integer(skip) && !anyNA(skip) && all(skip >= 0L), + !is.null(strategiesR), + is.null(seed) || is_lecyer_cmrg_seed(seed) || (is.logical(seed) && !is.na(seed) || !seed), + is.null(mc.cores) || (is.numeric(mc.cores) && length(mc.cores) == 1L && !is.na(mc.cores) && mc.cores >= 1) + ) + +# packages <- c(packages, "future") + + conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE) + muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE) + if (is.null(muffleInclude)) muffleInclude <- "^muffle" + + ## Capture standard output? + if (is.na(stdout)) { ## stdout = NA + ## Don't capture, but also don't block any output + } else { + if (stdout) { ## stdout = TRUE + ## Capture all output + ## NOTE: Capturing to a raw connection is much more efficient + ## than to a character connection, cf. + ## https://www.jottr.org/2014/05/26/captureoutput/ + ...future.stdout <- base::rawConnection(base::raw(0L), open = "w") + } else { ## stdout = FALSE + ## Silence all output by sending it to the void + ...future.stdout <- base::file( + base::switch(.Platform$OS.type, windows = "NUL", "/dev/null"), + open = "w" + ) + } + base::sink(...future.stdout, type = "output", split = split) + base::on.exit(if (!base::is.null(...future.stdout)) { + base::sink(type = "output", split = split) + base::close(...future.stdout) + }, add = TRUE) + } + + ...future.frame <- base::sys.nframe() + ...future.conditions <- base::list() + ...future.rng <- base::globalenv()$.Random.seed + + if (is.numeric(seed)) { + genv <- base::globalenv() + genv$.Random.seed <- seed + } + + ## Temporarily limit R option 'mc.cores'? + if (!is.null(mc.cores)) { + ...future.mc.cores.old <- base::getOption("mc.cores") + base::options(mc.cores = mc.cores) + } + + ## Record R options and environment variables + ## Note, we do this _after_ loading and attaching packages, in + ## case they set options/env vars needed for the session, e.g. + ## https://github.com/Rdatatable/data.table/issues/5375 + ...future.oldOptions <- base::as.list(base::.Options) + ...future.oldEnvVars <- base::Sys.getenv() + + ## covr: skip=7 + base::options( + ## Prevent .future.R from being source():d when future is attached + future.startup.script = FALSE, + + ## Assert globals when future is created (or at run time)? + future.globals.onMissing = globals.onMissing, + + ## Pass down other future.* options + future.globals.maxSize = getOption("future.globals.maxSize"), + future.globals.method = getOption("future.globals.method"), + future.globals.onReference = getOption("future.globals.onReference"), + future.globals.resolve = getOption("future.globals.resolve"), + future.resolve.recursive = getOption("future.resolve.recursive"), + future.rng.onMisuse = getOption("future.rng.onMisuse"), + future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"), + future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"), + + ## Other options relevant to making futures behave consistently + ## across backends + width = getOption("width") + ) + + base::options( + ## Prevent .future.R from being source():d when future is attached + future.startup.script = FALSE, + + ## Assert globals when future is created (or at run time)? + future.globals.onMissing = globals.onMissing + ) + + ## Record above future options + ...future.futureOptionsAdded <- base::setdiff(base::names(base::.Options), base::names(...future.oldOptions)) + + ## Record workding directory + ...future.workdir <- getwd() + + if (globalenv) { + ## Record names of variables in the global environment + ...future.globalenv.names <- c(base::names(base::.GlobalEnv), "...future.value", "...future.globalenv.names", ".Random.seed") + } + + ## Record the original future strategy set on this worker + ...future.plan.old <- getOption("future.plan") + ...future.plan.old.envvar <- Sys.getenv("R_FUTURE_PLAN", NA_character_) + ...future.strategy.old <- future::plan("list") + + ## Prevent 'future.plan' / R_FUTURE_PLAN settings from being nested + options(future.plan = NULL) + Sys.unsetenv("R_FUTURE_PLAN") + + ## Use the next-level-down ("popped") future strategy + ...future.oldPlan <- future::plan(strategiesR, .cleanup = FALSE, .init = FALSE) + + ## Start time for future evaluation + ...future.startTime <- Sys.time() + + ## TROUBLESHOOTING: If the package fails to load, then library() + ## suppress that error and generates a generic much less + ## informative error message. Because of this, we load the + ## namespace first (to get a better error message) and then + ## calls library(), which attaches the package. /HB 2016-06-16 + ## NOTE: We use local() here such that 'pkg' is not assigned + ## to the future environment. /HB 2016-07-03 + if (length(packages) > 0L) { + base::local({ + for (pkg in packages) { + base::loadNamespace(pkg) + base::library(pkg, character.only = TRUE) + } + }) + } + + ## NOTE: We don't want to use local(body) w/ on.exit() because + ## evaluation in a local is optional, cf. argument 'local'. + ## If this was mandatory, we could. Instead we use + ## a tryCatch() statement. /HB 2016-03-14 + ...future.result <- base::tryCatch({ + base::withCallingHandlers({ + ...future.value <- base::withVisible(eval(expr, envir = envir)) + future::FutureResult( + value = ...future.value$value, + visible = ...future.value$visible, + rng = !identical(base::globalenv()$.Random.seed, ...future.rng), + globalenv = if (globalenv) list(added = base::setdiff(base::names(base::.GlobalEnv), ...future.globalenv.names)) else NULL, + started = ...future.startTime, + version = "1.8" + ) + }, condition = base::local({ + ## WORKAROUND: If the name of any of the below objects/functions + ## coincides with a promise (e.g. a future assignment) then we + ## we will end up with a recursive evaluation resulting in error: + ## "promise already under evaluation: recursive default argument + ## reference or earlier problems?" + ## To avoid this, we make sure to import the functions explicitly + ## /HB 2018-12-22 + c <- base::c + inherits <- base::inherits + invokeRestart <- base::invokeRestart + length <- base::length + list <- base::list + seq.int <- base::seq.int + signalCondition <- base::signalCondition + sys.calls <- base::sys.calls + `[[` <- base::`[[` + `+` <- base::`+` + `<<-` <- base::`<<-` + + sysCalls <- function(calls = sys.calls(), from = 1L) { + calls[seq.int(from = from + skip[1L], to = length(calls) - skip[2L])] + } + + function(cond) { + is_error <- inherits(cond, "error") + + ## Ignore condition? + ignore <- !is_error && + !is.null(conditionClassesExclude) && + inherits(cond, conditionClassesExclude) + + ## Handle error:s specially + if (is_error) { + sessionInformation <- function() { + list( + r = base::R.Version(), + locale = base::Sys.getlocale(), + rngkind = base::RNGkind(), + namespaces = base::loadedNamespaces(), + search = base::search(), + system = base::Sys.info() + ) + } + ## Record condition + ...future.conditions[[length(...future.conditions) + 1L]] <<- list( + condition = cond, + calls = c(sysCalls(from = ...future.frame), cond$call), + session = sessionInformation(), + timestamp = base::Sys.time(), + signaled = 0L + ) + + signalCondition(cond) + } else if (!ignore && + !is.null(conditionClasses) && + inherits(cond, conditionClasses) + ) { + ## Relay 'immediateCondition' conditions immediately? + ## If so, then do not muffle it and flag it as signalled + ## already here. + signal <- immediateConditions && inherits(cond, immediateConditionClasses) + ## Record condition + ...future.conditions[[length(...future.conditions) + 1L]] <<- list( + condition = cond, + signaled = base::as.integer(signal) + ) + if (immediateConditions && !split && !signal) { + muffleCondition(cond, pattern = muffleInclude) + } + } else { + if (!split && !is.null(conditionClasses)) { + ## Muffle all non-captured conditions + muffleCondition(cond, pattern = muffleInclude) + } + } + } ## function(cond) + })) ## local() + withCallingHandlers() + }, error = function(ex) { + base::structure(base::list( + value = NULL, + visible = NULL, + conditions = ...future.conditions, + rng = !identical(base::globalenv()$.Random.seed, ...future.rng), + started = ...future.startTime, + finished = Sys.time(), + session_uuid = NA_character_, + version = "1.8" + ), class = "FutureResult") + }, finally = { + ## Reset working directory + if (!identical(...future.workdir, getwd())) setwd(...future.workdir) + + ## Reset R option 'mc.cores' + if (!is.null(mc.cores)) { + base::options(mc.cores = ...future.mc.cores.old) + } + ## (a) Reset options ## WORKAROUND: Do not reset 'nwarnings' unless changed, because ## that will, as documented, trigger any warnings collected @@ -127,274 +426,41 @@ makeExpression <- local({ ## (d) Remove any environment variables added ## diff <- base::setdiff(base::names(base::Sys.getenv()), base::names(...future.oldEnvVars)) ## base::Sys.unsetenv(diff) - - .(exit) - }) - - - tmpl_enter_future_opts <- bquote_compile({ - .(enter) - ## covr: skip=7 - base::options( - ## Prevent .future.R from being source():d when future is attached - future.startup.script = FALSE, - - ## Assert globals when future is created (or at run time)? - future.globals.onMissing = .(globals.onMissing), - - ## Pass down other future.* options - future.globals.maxSize = .(getOption("future.globals.maxSize")), - future.globals.method = .(getOption("future.globals.method")), - future.globals.onMissing = .(getOption("future.globals.onMissing")), - future.globals.onReference = .(getOption("future.globals.onReference")), - future.globals.resolve = .(getOption("future.globals.resolve")), - future.resolve.recursive = .(getOption("future.resolve.recursive")), - future.rng.onMisuse = .(getOption("future.rng.onMisuse")), - future.rng.onMisuse.keepFuture = .(getOption("future.rng.onMisuse.keepFuture")), - future.stdout.windows.reencode = .(getOption("future.stdout.windows.reencode")), - - ## Other options relevant to making futures behave consistently - ## across backends - width = .(getOption("width")) - ) - - ## Record above future options - ...future.futureOptionsAdded <- base::setdiff(base::names(base::.Options), base::names(...future.oldOptions)) - }) - - - tmpl_exit_future_opts <- bquote_compile({ ## Remove any "future" options added if (base::length(...future.futureOptionsAdded) > 0L) { opts <- base::vector("list", length = base::length(...future.futureOptionsAdded)) base::names(opts) <- ...future.futureOptionsAdded base::options(opts) } - - .(exit) - }) - - - tmpl_enter_workdir <- bquote_compile({ - .(enter) - ...future.workdir <- getwd() - }) - tmpl_exit_workdir <- bquote_compile({ - if (!identical(...future.workdir, getwd())) setwd(...future.workdir) - .(exit) - }) - - - tmpl_expr_evaluate <- bquote_compile({ - ## covr: skip=6 - .(enter) - - ## Capture standard output? - if (.(is.na(stdout))) { ## stdout = NA - ## Don't capture, but also don't block any output + ## Revert to the original future strategy + ## Reset option 'future.plan' and env var 'R_FUTURE_PLAN' + options(future.plan = ...future.plan.old) + future::plan(...future.strategy.old, .cleanup = FALSE, .init = FALSE) + if (is.na(...future.plan.old.envvar)) { + Sys.unsetenv("R_FUTURE_PLAN") } else { - if (.(stdout)) { ## stdout = TRUE - ## Capture all output - ## NOTE: Capturing to a raw connection is much more efficient - ## than to a character connection, cf. - ## https://www.jottr.org/2014/05/26/captureoutput/ - ...future.stdout <- base::rawConnection(base::raw(0L), open = "w") - } else { ## stdout = FALSE - ## Silence all output by sending it to the void - ...future.stdout <- base::file( - base::switch(.Platform$OS.type, windows = "NUL", "/dev/null"), - open = "w" - ) - } - base::sink(...future.stdout, type = "output", split = .(split)) - base::on.exit(if (!base::is.null(...future.stdout)) { - base::sink(type = "output", split = .(split)) - base::close(...future.stdout) - }, add = TRUE) + Sys.setenv(R_FUTURE_PLAN = ...future.plan.old.envvar) } - - ...future.frame <- base::sys.nframe() - ...future.conditions <- base::list() - ...future.rng <- base::globalenv()$.Random.seed - - if (.(globalenv)) { - ## Record names of variables in the global environment - ...future.globalenv.names <- c(base::names(base::.GlobalEnv), "...future.value", "...future.globalenv.names", ".Random.seed") - } - - ## NOTE: We don't want to use local(body) w/ on.exit() because - ## evaluation in a local is optional, cf. argument 'local'. - ## If this was mandatory, we could. Instead we use - ## a tryCatch() statement. /HB 2016-03-14 - ...future.result <- base::tryCatch({ - base::withCallingHandlers({ - ...future.value <- base::withVisible(.(expr)) - future::FutureResult( - value = ...future.value$value, - visible = ...future.value$visible, - rng = !identical(base::globalenv()$.Random.seed, ...future.rng), - globalenv = if (.(globalenv)) list(added = base::setdiff(base::names(base::.GlobalEnv), ...future.globalenv.names)) else NULL, - started = ...future.startTime, - version = "1.8" - ) - }, condition = base::local({ - ## WORKAROUND: If the name of any of the below objects/functions - ## coincides with a promise (e.g. a future assignment) then we - ## we will end up with a recursive evaluation resulting in error: - ## "promise already under evaluation: recursive default argument - ## reference or earlier problems?" - ## To avoid this, we make sure to import the functions explicitly - ## /HB 2018-12-22 - c <- base::c - inherits <- base::inherits - invokeRestart <- base::invokeRestart - length <- base::length - list <- base::list - seq.int <- base::seq.int - signalCondition <- base::signalCondition - sys.calls <- base::sys.calls - `[[` <- base::`[[` - `+` <- base::`+` - `<<-` <- base::`<<-` - - sysCalls <- function(calls = sys.calls(), from = 1L) { - calls[seq.int(from = from + .(skip[1L]), to = length(calls) - .(skip[2L]))] - } - - function(cond) { - is_error <- inherits(cond, "error") - - ## Ignore condition? - ignore <- !is_error && - !is.null(.(conditionClassesExclude)) && - inherits(cond, .(conditionClassesExclude)) - - ## Handle error:s specially - if (is_error) { - sessionInformation <- function() { - list( - r = base::R.Version(), - locale = base::Sys.getlocale(), - rngkind = base::RNGkind(), - namespaces = base::loadedNamespaces(), - search = base::search(), - system = base::Sys.info() - ) - } - ## Record condition - ...future.conditions[[length(...future.conditions) + 1L]] <<- list( - condition = cond, - calls = c(sysCalls(from = ...future.frame), cond$call), - session = sessionInformation(), - timestamp = base::Sys.time(), - signaled = 0L - ) - - signalCondition(cond) - } else if (!ignore && - .(!is.null(conditionClasses)) && - inherits(cond, .(conditionClasses)) - ) { - ## Relay 'immediateCondition' conditions immediately? - ## If so, then do not muffle it and flag it as signalled - ## already here. - signal <- .(immediateConditions) && inherits(cond, .(immediateConditionClasses)) - ## Record condition - ...future.conditions[[length(...future.conditions) + 1L]] <<- list( - condition = cond, - signaled = base::as.integer(signal) - ) - if (.(immediateConditions && !split) && !signal) { - ## muffleCondition <- future:::muffleCondition() - muffleCondition <- .(muffleCondition) - muffleCondition(cond, pattern = .(muffleInclude)) - } - } else { - if (.(!split && !is.null(conditionClasses))) { - ## Muffle all non-captured conditions - ## muffleCondition <- future:::muffleCondition() - muffleCondition <- .(muffleCondition) - muffleCondition(cond, pattern = .(muffleInclude)) - } - } - } ## function(cond) - })) ## local() + withCallingHandlers() - }, error = function(ex) { - base::structure(base::list( - value = NULL, - visible = NULL, - conditions = ...future.conditions, - rng = !identical(base::globalenv()$.Random.seed, ...future.rng), - started = ...future.startTime, - finished = Sys.time(), - session_uuid = NA_character_, - version = "1.8" - ), class = "FutureResult") - }, finally = .(exit)) - - if (.(!base::is.na(stdout))) { - base::sink(type = "output", split = .(split)) - if (.(stdout)) { - ...future.result$stdout <- base::rawToChar( - base::rawConnectionValue(...future.stdout) - ) - } else { - ...future.result["stdout"] <- base::list(NULL) - } - base::close(...future.stdout) - ...future.stdout <- NULL - } - - ...future.result$conditions <- ...future.conditions - ...future.result$finished <- base::Sys.time() - - ...future.result - }) - - - function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8") { - conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE) - muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE) - if (is.null(muffleInclude)) muffleInclude <- "^muffle" - - if (immediateConditions && !is.null(conditionClasses)) { - immediateConditionClasses <- getOption("future.relay.immediate", "immediateCondition") - conditionClasses <- unique(c(conditionClasses, immediateConditionClasses)) - } else { - immediateConditionClasses <- character(0L) - } - - if (is.null(skip)) { - ## WORKAROUND: skip = c(7/12, 3) makes assumption about withCallingHandlers() - ## and local(). In case this changes, provide internal options to adjust this. - ## /HB 2018-12-28 - skip <<- getOption("future.makeExpression.skip", c(6L, 3L)) - skip.local <<- getOption("future.makeExpression.skip.local", c(12L, 3L)) - } - - ## Evaluate expression in a local() environment? - if (local) { - expr <- bquote_apply(tmpl_expr_local) - skip <- skip.local - } - - ## Set and reset certain properties and states - enter <- bquote_apply(tmpl_enter_workdir) - enter <- bquote_apply(tmpl_enter_optenvar) - enter <- bquote_apply(tmpl_enter_future_opts) - - exit <- bquote_apply(tmpl_exit_future_opts) - exit <- bquote_apply(tmpl_exit_optenvar) - exit <- bquote_apply(tmpl_exit_workdir) + }) ## tryCatch(..., finally = { ... }) - if (version == "1.8") { - expr <- bquote_apply(tmpl_expr_evaluate) + if (!base::is.na(stdout)) { + base::sink(type = "output", split = split) + if (stdout) { + ...future.result$stdout <- base::rawToChar( + base::rawConnectionValue(...future.stdout) + ) } else { - stop(FutureError("Internal error: Non-supported future expression version: ", version)) + ...future.result["stdout"] <- base::list(NULL) } - - expr + base::close(...future.stdout) + ...future.stdout <- NULL } -}) ## makeExpression() + + ...future.result$conditions <- ...future.conditions + ...future.result$finished <- base::Sys.time() + + ...future.result +} ## evalFuture() + diff --git a/R/utils-immediateCondition.R b/R/utils-immediateCondition.R index 99852508..35758ba0 100644 --- a/R/utils-immediateCondition.R +++ b/R/utils-immediateCondition.R @@ -170,7 +170,7 @@ save_rds <- function(object, pathname, ...) { -tmpl_expr_send_immediateConditions_via_file <- bquote_compile({ +tmpl_expr_send_immediateConditions_via_file <- future:::bquote_compile({ withCallingHandlers({ .(expr) }, immediateCondition = function(cond) { diff --git a/tests/globals,NSE.R b/tests/globals,NSE.R index ad60d6a9..4c4f56e5 100644 --- a/tests/globals,NSE.R +++ b/tests/globals,NSE.R @@ -44,6 +44,8 @@ for (strategy in supportedStrategies()) { void %<-% { plan(sequential) } print(void) + options(future.globals.onMissing = NULL) + message(sprintf("- Strategy: %s ... DONE", strategy)) }