Skip to content

Commit

Permalink
Account for bpiterate returning a list of remote_error objects
Browse files Browse the repository at this point in the history
Apparently bpiterate can sometimes return a list, but one or more
elements of the list is a remote error object. The code now checks for
this case using BiocParallel::bpok.
  • Loading branch information
Ryan C. Thompson committed Apr 30, 2021
1 parent f4f547d commit 59217be
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 16 deletions.
23 changes: 18 additions & 5 deletions R/fitMixedModelDE.R
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ getContrast = function( exprObj, formula, data, coefficient){
# @docType methods
#' @rdname dream-method
#' @importFrom pbkrtest get_SigmaG
#' @importFrom BiocParallel bpiterate bpparam
#' @importFrom BiocParallel bpiterate bpparam bpok
#' @importFrom lme4 VarCorr
#' @importFrom stats hatvalues as.formula
#' @importFrom foreach foreach
Expand Down Expand Up @@ -744,10 +744,23 @@ dream <- function( exprObj, formula, data, L, ddf = c("Satterthwaite", "Kenward-

if( !quiet ) message(paste0("Dividing work into ",attr(it, "n_chunks")," chunks..."))

resList <- do.call(c, bpiterate( it, .eval_master,
data2=data2, form=form, REML=REML, theta=fitInit@theta, control=control,...,
BPPARAM=BPPARAM))

resList <- bpiterate( it, .eval_master,
data2=data2, form=form, REML=REML, theta=fitInit@theta, control=control,...,
BPPARAM=BPPARAM)

# if there is an error in evaluating fxn (usually in parallel backend)
if( !bpok(list(resList)) ){
stop("Error evaluating fxn:\n\n", varPart)
}
# It can also return a list of errors, or a list where only some elements are errors
if( !all(bpok(resList)) ){
first_error <- resList[[which(!bpok(resList))[1]]]
stop("Error evaluating fxn:\n\n", first_error)
}

# If no errors, then it's safe to concatenate all the results together.
resList <- do.call(c, resList)

names(resList) = seq_len(length(resList))

if( !quiet ) message("\nTotal:", paste(format((proc.time() - timeStart)[3], digits = 0, scientific = FALSE), "s"))
Expand Down
44 changes: 33 additions & 11 deletions R/fitModels.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ setGeneric("fitVarPartModel", signature="exprObj",
)

# internal driver function
#' @importFrom BiocParallel bpparam bpiterate bplapply
#' @importFrom BiocParallel bpparam bpiterate bplapply bpok
#' @importFrom methods is new
#' @importFrom lme4 lmer
#' @importFrom progress progress_bar
#' @importFrom utils object.size
#' @import foreach
.fitVarPartModel <- function( exprObj, formula, data, REML=FALSE, useWeights=TRUE, weightsMatrix=NULL, showWarnings=TRUE,fxn=identity, colinearityCutoff=.999,control = lme4::lmerControl(calc.derivs=FALSE, check.rankX="stop.deficient" ), quiet=quiet, BPPARAM=bpparam(), ...){
#' @import lme4
.fitVarPartModel <- function( exprObj, formula, data, REML=FALSE, useWeights=TRUE, weightsMatrix=NULL, showWarnings=TRUE,fxn=identity, colinearityCutoff=.999,control = lme4::lmerControl(calc.derivs=FALSE, check.rankX="stop.deficient" ), quiet=quiet, BPPARAM=bpparam(), ...){

# if( ! is(exprObj, "sparseMatrix")){
# exprObj = as.matrix( exprObj )
Expand Down Expand Up @@ -271,14 +272,22 @@ setGeneric("fitVarPartModel", signature="exprObj",

if( !quiet ) message(paste0("Dividing work into ",attr(it, "n_chunks")," chunks..."))

res <- do.call(c, bpiterate( it, .eval_master,
data2=data2, form=form, REML=REML, theta=fitInit@theta, fxn=fxn, control=control,...,
BPPARAM=BPPARAM))
res <- bpiterate( it, .eval_master,
data2=data2, form=form, REML=REML, theta=fitInit@theta, fxn=fxn, control=control,...,
BPPARAM=BPPARAM)

# if there is an error in evaluating fxn (usually in parallel backend)
if( is(res, 'remote_error') ){
stop("Error evaluating fxn:\n\n", res)
if( !bpok(list(res)) ){
stop("Error evaluating fxn:\n\n", varPart)
}
# It can also return a list of errors, or a list where only some elements are errors
if( !all(bpok(res)) ){
first_error <- res[[which(!bpok(res))[1]]]
stop("Error evaluating fxn:\n\n", first_error)
}

# If no errors, then it's safe to concatenate all the results together.
res <- do.call(c, res)

method = "lmer"
}
Expand Down Expand Up @@ -431,7 +440,7 @@ setMethod("fitVarPartModel", "sparseMatrix",
#' @export
#' @docType methods
#' @rdname fitExtractVarPartModel-method
#' @importFrom BiocParallel bpparam bpiterate bplapply
#' @importFrom BiocParallel bpparam bpiterate bplapply bpok
setGeneric("fitExtractVarPartModel", signature="exprObj",
function(exprObj, formula, data, REML=FALSE, useWeights=TRUE, weightsMatrix=NULL, showWarnings=TRUE, control = lme4::lmerControl(calc.derivs=FALSE, check.rankX="stop.deficient" ), quiet=FALSE, BPPARAM=bpparam(), ...)
standardGeneric("fitExtractVarPartModel")
Expand Down Expand Up @@ -573,9 +582,22 @@ setGeneric("fitExtractVarPartModel", signature="exprObj",

if( !quiet) message(paste0("Dividing work into ",attr(it, "n_chunks")," chunks..."))

varPart <- do.call(c, bpiterate( it, .eval_master,
data=data, form=form, REML=REML, theta=fitInit@theta, control=control,...,
BPPARAM=BPPARAM))
varPart <- bpiterate( it, .eval_master,
data=data, form=form, REML=REML, theta=fitInit@theta, control=control,...,
BPPARAM=BPPARAM)

# if there is an error in evaluating fxn (usually in parallel backend)
if( !bpok(list(varPart)) ){
stop("Error evaluating fxn:\n\n", varPart)
}
# It can also return a list of errors, or a list where only some elements are errors
if( !all(bpok(varPart)) ){
first_error <- varPart[[which(!bpok(varPart))[1]]]
stop("Error evaluating fxn:\n\n", first_error)
}

# If no errors, then it's safe to concatenate all the results together.
varPart <- do.call(c, varPart)

modelType = "linear mixed model"
}
Expand Down

0 comments on commit 59217be

Please sign in to comment.