future
future copied to clipboard
More efficient chunking
require(parallel)
require(doSNOW)
require(foreach)
require(future)
require(future.apply)
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)
foo <- function(i) {
if (i %in% 5:8) Sys.sleep(3L)
i
}
x <- 1:20
system.time(ans <- future_lapply(x, function(i) {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
cat(msg, sep="\n")
ans <- foo(i)
}))
# [2019-04-18 13:17:08] [17620] ans= 1
# [2019-04-18 13:17:08] [17620] ans= 2
# [2019-04-18 13:17:08] [17620] ans= 3
# [2019-04-18 13:17:08] [17620] ans= 4
# [2019-04-18 13:17:08] [16016] ans= 5 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:11] [16016] ans= 6 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:14] [16016] ans= 7 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:17] [16016] ans= 8 # <~~ predefined chunk node = wait until previous one completes
# [2019-04-18 13:17:08] [ 3992] ans= 9
# [2019-04-18 13:17:08] [ 3992] ans=10
# [2019-04-18 13:17:08] [ 3992] ans=11
# [2019-04-18 13:17:08] [ 3992] ans=12
# [2019-04-18 13:17:08] [ 3836] ans=13
# [2019-04-18 13:17:08] [ 3836] ans=14
# [2019-04-18 13:17:08] [ 3836] ans=15
# [2019-04-18 13:17:08] [ 3836] ans=16
# [2019-04-18 13:17:08] [ 8856] ans=17
# [2019-04-18 13:17:08] [ 8856] ans=18
# [2019-04-18 13:17:08] [ 8856] ans=19
# [2019-04-18 13:17:08] [ 8856] ans=20
# user system elapsed
# 0.03 0.02 12.14 # <~~ 4x more time
registerDoSNOW(cl)
system.time(ans <- foreach(i=x) %dopar% {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
foo_i <- foo(i)
return(msg)
# [2019-04-18 13:17:20] [17620] ans= 1
# [2019-04-18 13:17:20] [16016] ans= 2
# [2019-04-18 13:17:20] [ 3992] ans= 3
# [2019-04-18 13:17:20] [ 3836] ans= 4
# [2019-04-18 13:17:20] [ 8856] ans= 5 # <~~ runs on next available free node
# [2019-04-18 13:17:20] [17620] ans= 6 # <~~
# [2019-04-18 13:17:20] [16016] ans= 7 # <~~
# [2019-04-18 13:17:20] [ 3992] ans= 8 # <~~
# [2019-04-18 13:17:20] [ 3836] ans= 9
# [2019-04-18 13:17:20] [ 3836] ans=10
# [2019-04-18 13:17:20] [ 3836] ans=11
# [2019-04-18 13:17:20] [ 3836] ans=12
# [2019-04-18 13:17:20] [ 3836] ans=13
# [2019-04-18 13:17:20] [ 3836] ans=14
# [2019-04-18 13:17:20] [ 3836] ans=15
# [2019-04-18 13:17:20] [ 3836] ans=16
# [2019-04-18 13:17:20] [ 3836] ans=17
# [2019-04-18 13:17:20] [ 3836] ans=18
# [2019-04-18 13:17:20] [ 3836] ans=19
# [2019-04-18 13:17:20] [ 3836] ans=20
# user system elapsed
# 0.01 0.00 3.07 # <~~ results in 4 times lesser runtime
The point is, even if things are random (I understand there's ordering="random"
), there can be chunks that get stuck due to a big job, when other nodes are potentially free. I think it's much more efficient to look for free nodes and assign jobs on the fly than to determine chunk sizes / entries upfront.
What do you think?
Did you forget a registerDoFuture()
? At its inner core, the design of cluster futures is to use first available (=free) worker.
Scratch that. My brain is tired. I was just over at the doFuture repository and my brain stayed there. Anyway, the part:
At its inner core, the design of cluster futures is to use first available (=free) worker.
is true. So, if it's true what you're claiming, then there's a bug. I will add it to the bug to go into the details on this and try to reproduce.
At this point, the chunk ii
is entirely run on the same node. Since all the entries within the chunk never gets to see if other nodes are free. You can take a look at the pid
value to see that all chunks with 3s
sleep runs on the same node. This is because it gets assigned to the same chunk.
Ah... now I think I understand what you're getting at (disclaimer: I'm busy with other things so haven't had time to focus on this). The default chunking strategy is to split up the elements to processed in equal-sized chunks upfront where the number of chunks corresponds to the number of workers (=nbrOfWorkers()
). This will cause each worker to process one and only one chunk (=future).
You can tell it to use smaller chunks such that each worker processes more than one chunk (=future). The easiest is to set argument future.scheduling
to a number larger than the default 1.0
. If you use +Inf
, you'll get one element per chunk (=future). A bijective argument is future.chunk.size
. Do those do what you want?
The problem is that as you increase future.scheduling
value, the overhead seems to be increasing by huge amounts as well. In the same example as above, having x <- 1:100
instead of x <- 1:20
and adding future.scheduling=Inf
takes:
require(parallel)
require(doSNOW)
require(foreach)
require(future)
require(future.apply)
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)
foo <- function(i) {
if (i %in% 5:8) Sys.sleep(3L)
i
}
x <- 1:100
system.time(ans <- future_lapply(x, function(i) {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
cat(msg, sep="\n")
ans <- foo(i)
}, future.scheduling=Inf))
# Takes 21 seconds
And changing future.scheduling=3.0
takes 9.14 seconds. foreach
takes 3.08s still.
Let me put it this way. With x <- 1:100
, how can I run future_lapply(...)
in a way that it runs in 3.1s, assuming I do not know beforehand that i=5:8
will have a 3s sleep?
Got it. So, now we narrowed down the problem to a performance difference between future.apply::future_lapply()
with plan(cluster, workers = cl)
and foreach::foreach()
with doSNOW::registerDoSNOW(cl)
where both are using cl <- future::makeClusterPSOCK(nodes)
.
I can investigate further because there's no reason there should be a major difference given the same amount of chunking. One way to rule out some differences is to use the same foreach::foreach()
call comparing doSNOW::registerDoSNOW(cl)
to doFuture::registerDoFuture()
with plan(cluster, workers = cl)
.
(I noticed that I didn't have doFuture installed. Just installed CRAN version)
Aha.. good catch. The one with doFuture::registerDoFuture()
takes 12s.
registerDoSNOW()
require(parallel)
require(doSNOW)
require(foreach)
require(future)
require(future.apply)
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
foo <- function(i) {
if (i %in% 5:8) Sys.sleep(3L)
i
}
x <- 1:100
registerDoSNOW(cl)
system.time(ans <- foreach(i=x) %dopar% {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
foo_i <- foo(i)
return(msg)
})
# user system elapsed
# 0.05 0.00 3.05
Restart session. But also checked within the same session and timings are identical.
registerDoFuture()
require(parallel)
require(doSNOW)
require(foreach)
require(future)
require(future.apply)
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
foo <- function(i) {
if (i %in% 5:8) Sys.sleep(3L)
i
}
x <- 1:100
doFuture::registerDoFuture()
plan(cluster, workers=cl, persistent=TRUE)
system.time(ans <- foreach(i=x) %dopar% {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
foo_i <- foo(i)
return(msg)
})
# user system elapsed
# 0.04 0.00 12.15
Thanks a bunch for doing this. Now I have enough to investigate - the difference is surprising but there might (=probably is) an easy explanation.
PS. The underlying code for future_lapply and doFuture are similar (cut'n'paste-ish history) so this is in one sense what I hoped for.
@HenrikBengtsson is there a timeline on when this would be done?
The problem is this: The chunks are pre-assigned as to which node they'll be run.
Imagine there are about 1000 files (corresponding to 1000 dates) to be updated. Using foreach
, the next available node was provided to the immediate file to be processed which would mean the files are updated in the order provided. Since you assign the file chunks to be processed for each node prior, it's possible that another node finishes its tasks earlier and thereby we end up with files updating in a random order. And if something breaks, it makes it even harder to tell which files have been updated, since it's not a continuous sequence anymore. With foreach
, if date x
broke, then we know to update from x
until the most recent date..
Just setting options(future.wait.interval=0L)
reduces the runtime from 21s to 3.8s (approximately the same as that of foreach
:
require(parallel)
require(doSNOW)
require(foreach)
require(future)
require(future.apply)
options(future.wait.interval=0L) ## <~~~ newly added line
nodes <- 5L
cl <- future::makeClusterPSOCK(nodes)
plan(cluster, workers=cl, persistent=TRUE)
foo <- function(i) {
if (i %in% 5:8) Sys.sleep(3L)
i
}
x <- 1:100
system.time(ans <- future_lapply(x, function(i) {
msg <- sprintf("[%s] [%5.0f] ans=%2.0f", as.character(Sys.time()), Sys.getpid(), i)
cat(msg, sep="\n")
ans <- foo(i)
}, future.scheduling=Inf))
# 3.8s
I think this more or less solves the issue I reported. Will write back if I encounter issues.
Yes, it clearly seems that the polling wait time (option future.wait.interval
) is the culprit. It's quite obvious - sorry for not realizing/mentioning this earlier.
The reason for future.wait.interval
not defaulting to zero is that we don't want to induce too frequent polling/querying on other backends, e.g. job schedulers, but also because it induces some extra CPU load. Having said this, this should not be too big of a deal for parallelization on the local machine.
I'll add it to the todo list to be able to have different defaults on future.wait.interval
for different types of future backends. Ideally, it should also be possible to control this via the plan()
setup, e.g.
plan(multisession, workers = 4L, wait.interval = 0.01)
There are other types of settings that we also want to be backend specific, so this one falls nicely into that plan (no pun intended).
FWIW, the default value for option future.wait.interval
was decreased 20 times - from 0.2 to 0.01 seconds - in future 1.25.0.