Skip to content

Commit

Permalink
evalFuture(): add 'globals' argument
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Dec 30, 2024
1 parent becfd04 commit 240e51d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.34.0-9094
Version: 1.34.0-9095
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
7 changes: 5 additions & 2 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,10 @@ getExpression.Future <- function(future, expr = future$expr, local = future$loca
warning(FutureWarning("Future version was not set. Using default %s",
sQuote(version)))
}


## Globals needed by the future
globals <- globals(future)

## Packages needed by the future
pkgs <- packages(future)
if (length(pkgs) > 0) {
Expand Down Expand Up @@ -700,7 +703,7 @@ getExpression.Future <- function(future, expr = future$expr, local = future$loca
pkgs <- unique(c(pkgs, pkgsS))
}

expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, enter = NULL, exit = exit, ..., seed = seed, packages = pkgs, mc.cores = mc.cores, version = version)
expr <- makeExpression(expr = expr, local = local, stdout = stdout, conditionClasses = conditionClasses, split = split, globals = globals, enter = NULL, exit = exit, ..., seed = seed, packages = pkgs, mc.cores = mc.cores, version = version)
if (getOption("future.debug", FALSE)) mprint(expr)

## mdebug("getExpression() ... DONE")
Expand Down
72 changes: 65 additions & 7 deletions R/expressions.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ makeExpression <- local({
tmpl_expr_local <- future:::bquote_compile(base::local(.(expr)))

tmpl_expr_evaluate2 <- future:::bquote_compile({
## Evaluate future
future:::evalFuture(expr = quote(.(expr)), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals.onMissing = .(globals.onMissing), globalenv = .(globalenv), skip = .(skip), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), mc.cores = .(mc.cores))
## Evaluate future
future:::evalFuture(expr = quote(.(expr)), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals = .(globals), globals.onMissing = .(globals.onMissing), globalenv = .(globalenv), skip = .(skip), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), mc.cores = .(mc.cores))
})


function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL) {
function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals = NULL, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), enter = NULL, exit = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL) {
if (version != "1.8") {
stop(FutureError("Internal error: Non-supported future expression version: ", version))
}
Expand Down Expand Up @@ -55,6 +55,22 @@ makeExpression <- local({
packages <- unique(c(packages, pkgsS))
}

if (is.function(strategiesR)) {
if (!inherits(strategiesR, "future")) {
stop(FutureError(sprintf("Argument 'strategiesR' is a function, but does not inherit 'future': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}
} else if (is.list(strategiesR)) {
for (kk in seq_along(strategiesR)) {
strategy <- strategiesR[[kk]]
if (!inherits(strategy, "future")) {
stop(FutureError(sprintf("Element #%d of list 'strategiesR' is a function, but does not inherit 'future': %s", kk, paste(sQuote(class(strategy)), collapse = ", "))))
}
}
} else if (is.character(strategiesR)) {
} else {
stop(FutureError(sprintf("Unknown value of argument 'strategiesR': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}

forwardOptions <- list(
## Assert globals when future is created (or at run time)?
future.globals.onMissing = globals.onMissing,
Expand Down Expand Up @@ -82,7 +98,21 @@ makeExpression <- local({



evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), skip = NULL, packages = NULL, seed = NULL, mc.cores = NULL, forwardOptions = NULL, strategiesR = future::sequential, envir = parent.frame()) {
logme <- function(expr, envir = parent.frame()) {
expr <- substitute(expr)
stdout <- utils::capture.output(eval(expr, envir = envir))
stdout <- sprintf("[evalFuture()] %s\n", stdout)
stdout <- paste(stdout, collapse = "")
cat(stdout, file = "callr.log", append = TRUE)
}

FutureEvalError <- function(...) {
ex <- FutureError(...)
class(ex) <- c("FutureEvalError", class(ex))
ex
}

evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals = NULL, globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), skip = NULL, packages = NULL, seed = NULL, mc.cores = NULL, forwardOptions = NULL, strategiesR = NULL, envir = parent.frame()) {
stop_if_not(
length(stdout) == 1L && is.logical(stdout),
length(split) == 1L && is.logical(split) && !is.na(split),
Expand All @@ -91,11 +121,25 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
is.character(immediateConditionClasses) && !anyNA(immediateConditionClasses) && all(nzchar(immediateConditionClasses)),
length(globalenv) == 1L && is.logical(globalenv) && !is.na(globalenv),
length(skip) == 2L && is.integer(skip) && !anyNA(skip) && all(skip >= 0L),
!is.null(strategiesR),
is.null(seed) || is_lecyer_cmrg_seed(seed) || (is.logical(seed) && !is.na(seed) || !seed),
is.null(mc.cores) || (is.numeric(mc.cores) && length(mc.cores) == 1L && !is.na(mc.cores) && mc.cores >= 1)
)

if (is.function(strategiesR)) {
if (!inherits(strategiesR, "future")) {
stop(FutureEvalError(sprintf("Argument 'strategiesR' is a function, but does not inherit 'future': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}
} else if (is.list(strategiesR)) {
for (kk in seq_along(strategiesR)) {
strategy <- strategiesR[[kk]]
if (!inherits(strategy, "future")) {
stop(FutureEvalError(sprintf("Element #%d of list 'strategiesR' is a function, but does not inherit 'future': %s", kk, paste(sQuote(class(strategy)), collapse = ", "))))
}
}
} else if (is.character(strategiesR)) {
} else {
stop(FutureEvalError(sprintf("Unknown value of argument 'strategiesR': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
}

## Start time for future evaluation
...future.startTime <- Sys.time()
Expand All @@ -106,7 +150,7 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
## -----------------------------------------------------------------
## Current working directory
...future.workdir <- getwd()

## mc.cores
...future.mc.cores.old <- getOption("mc.cores")

Expand Down Expand Up @@ -326,6 +370,14 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
...future.globalenv.names <- c(names(.GlobalEnv), "...future.value", "...future.globalenv.names", ".Random.seed")
}

if (length(globals) > 0) {
base_attach <- base::attach ## To please R CMD check
base_attach(globals, pos = 2L, name = "future:globals")
on.exit({
detach(name = "future:globals")
}, add = TRUE)
}

## Ignore, capture or discard standard output?
if (is.na(stdout)) { ## stdout = NA
## Don't capture, but also don't block any output
Expand Down Expand Up @@ -354,9 +406,15 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
options(future.plan = NULL)
Sys.unsetenv("R_FUTURE_PLAN")

# logme("future:plan() ...")
# logme(utils::str(strategiesR))
# logme(print(strategiesR))

## Use the next-level-down ("popped") future strategy
future::plan(strategiesR, .cleanup = FALSE, .init = FALSE)


# logme("future:plan() ... done")

## Temporarily set R option 'mc.cores'?
if (!is.null(mc.cores)) {
options(mc.cores = mc.cores)
Expand Down

0 comments on commit 240e51d

Please sign in to comment.