pbapply
pbapply copied to clipboard
Futures: output and conditions are now relayed, but with glitch
With the support for futures, you now get automatic support for relaying of the standard output, messages, warnings, and other conditions, e.g. with
library(pbapply)
future::plan("multisession")
my_sqrt <- function(x) {
if (x %% 10 == 0) message("x = ", x)
sqrt(x)
}
we get:
> pboptions(type = "none")
> y <- pbsapply(1:100, FUN = my_sqrt, cl = "future")
x = 10
x = 20
x = 30
x = 40
x = 50
x = 60
x = 70
x = 80
x = 90
x = 100
> str(y)
num [1:100] 1 1.41 1.73 2 2.24 ...
However, it looks like your progress bar is not prepared for having output generated in between the 0% step and the completion, e.g.
> pboptions(type = "txt")
> y <- pbsapply(1:100, FUN = my_sqrt, cl = "future")
|++++ | 8%x = 10
|++++++++ | 15%x = 20
|++++++++++++ | 23%x = 30
|+++++++++++++++ | 31%x = 40
|+++++++++++++++++++++++ | 46%x = 50
|+++++++++++++++++++++++++++ | 54%x = 60
|+++++++++++++++++++++++++++++++ | 62%x = 70
|+++++++++++++++++++++++++++++++++++ | 69%x = 80
|++++++++++++++++++++++++++++++++++++++++++ | 85%x = 90
|++++++++++++++++++++++++++++++++++++++++++++++ | 92%x = 100
|++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
>
The quick fix to avoid this, is to disable relaying of stdout and conditions in futures, e.g.
rval[i] <- list(future.apply::future_lapply(X[Split[[i]]], FUN, ..., future.stdout = FALSE, future.conditions = character(0L)))
That is how all other parallel frameworks work, i.e. they silently swallow any output.
Now, to actually relaying them, which is more useful, you have to buffer standard output and all conditions in:
https://github.com/psolymos/pbapply/blob/53aa5415157b709f2474a8e61f2cc472bb8eb575/R/pblapply.R#L73-L76
I'm thinking something like:
captureStdoutAndConditions <- function(expr, envir = parent.frame()) {
expr <- substitute(expr)
conditions <- list()
withCallingHandlers({
stdout <- utils::capture.output({
value <- eval(expr, envir = envir)
})
}, condition = function(cond) {
conditions <<- c(conditions, list(cond))
})
list(value = value, stdout = stdout, conditions = conditions)
}
relayStdoutAndConditions <- function(res) {
cat(res$stdout)
for (condition in res$conditions) {
if (inherits(condition, "warning")) {
warning(condition)
} else if (inherits(condition, "message")) {
message(condition)
} else if (inherits(condition, "condition")) {
signalCondition(condition)
}
}
invisible(res)
}
and then use:
for (i in seq_len(B)) {
res <- captureStdoutAndConditions({
future.apply::future_lapply(X[Split[[i]]], FUN, ...)
})
rval[i] <- list(res$value)
hide_pb(pb)
relayStdoutAndConditions(res)
unhide_pb(pb)
setpb(pb, i)
}
PS. I've got future.mapreduce on the roadmap. One goal is to provide an API for others to build on and to avoid having to do manually do the above.