fabletools
fabletools copied to clipboard
implement `future.apply` in `forecast` as it is in `model`
Using fable
at any kind of scale gets painful if computation of forecasts isn't also parallelized. I've got a possible implementation I'll PR tomorrow.
A PR for this would be great, thanks!
My suggestion for this would be to drop the use of mutate_at()
and replace it with a internal util
function which iterates over the mable with parallel support. There is a common pattern throughout the package where functions applied to a mable need to operate over each model, and a more general (and parallelisable) helper function for this would be great (for example, generate()
should also be possible in parallel).
I've got to pull a couple of errant commits out, but I'll send it along momentarily. I reversed the order of the pivot_longer
and changed the mutate_at
to mapply
to mirror the future.apply
implementation.
I'm happy to make a util function that does the iteration piece. It would essentially start with something that essentially at the point of having been pivot_longer
'ed and iterates across with either mapply
or future_mapply
, given that they take the same arguments (other than the future
-specific added ones). Take everything that's passed in additionally and place it in MoreArgs
, then select the method based on the availability of future
.
This is an 80-core machine trying to churn through forecasts for the models it fit between 5:45pm and 6pm. It's still at it, 4 hours later. This is a pretty strong motivator, as I'll be explaining the bill later. :)
I think it could be safer / more efficient if the internal util would also handle the pivot_longer
bit to avoid possible variable name conflicts that exist from the pivoting process.
Makes sense, and shouldn't be any tougher. It won't pivot_wider
to return to the previous structure, because forecast()
specifically needs for it not to do so. I can't think of an instance where it would need to, but I can add in a param to return it to the original structure, if you'd like.
An example of returning to the previous structure is when the operation returns a <mable>
.
For example, refit(<mable>, <tsibble>)
returns a <mable>
of the same structure. The same applies for stream(<mable>, <tsibble>)
Adding/extending the
Makes sense. Should be straightforward enough.
To make modeling and forecasting more memory efficient for tsibble
s with many keys, and this should probably go for any fabletools
process, it might make sense to shrink the environment of each worker in the future_mapply
calls. This means we'll need to figure out what packages like fable.prophet
might be attached that will need to be available. Obviously, if we can generally rely on the idea that the namespace
will begin with fable.
then it's pretty easy. I don't know if that's something you've thought about and feel strongly about, but it seems reasonable enough.
Otherwise, we might need to crawl across the functions within the call to find all of their namespace
s and add them that way, which seems a bit more fraught with peril.
Just following along as I use fable for millions of time series. If this is in the newest version of the Github release I'd be happy to help test, otherwise just saying I appreciate the work here!
On further thought and a bug report, converting to longer before parallelising isn't appropriate because the column structure defines the way in which the forecasts are handled (such as reconciliation). I'll have a go at adding the conditionally parallel mapply function.
Look at what's in my branch right now. There was an issue with how new_data
was working, but I have a version that I have been testing all afternoon and is working for it. I didn't understand how key_data
was being used, and had messed that portion up, in terms of how it got pushed to forecast
along with the new_data
.
Using what's currently in the parallel_forecasting
branch, here's an example of parallel modeling and forecasting. I'm sorry it's so long, but I needed something with regressors, and I wanted to make sure I included fable.prophet
in there to make sure the capacity
and floor
for logistic growth works properly.
devtools::install_github("davidtedfordholt/fabletools", ref = "parallel_forecasting")
library(tsibbledata)
library(tsibble)
library(dplyr)
library(tidyr)
library(fable)
library(fabletools)
library(fable.prophet)
library(future)
library(future.apply)
options(future.fork.enable = TRUE, future.rng.onMisuse = "ignore")
data <- gafa_stock %>%
select(Volume, Open) %>%
fill_gaps(Volume = 0, .full = TRUE) %>%
group_by_key() %>%
fill(Open) %>%
ungroup()
train <- data %>%
filter(Date < as.Date("2018-01-01")) %>%
mutate(capacity = max(Open) * 4,
floor = min(Open) * .25) %>%
ungroup()
test <- train %>%
new_data(n = 365) %>%
left_join(select(data, -Open), by = c("Date", "Symbol")) %>%
left_join(train %>%
as_tibble() %>%
group_by(Symbol) %>%
summarise(capacity = last(capacity),
floor = last(floor)),
by = "Symbol")
# SEQUENTIAL --------------------------
plan(sequential)
start_sequential_modeling <- Sys.time()
models <-
train %>%
model(
arima = ARIMA(Open),
tslm = TSLM(Open ~ Volume + season()),
naive = NAIVE(Open),
prophet = prophet(
Open ~
Volume +
growth(type = "logistic", capacity = capacity, floor = floor)))
end_sequential_modeling <- Sys.time()
print(paste("Sequential modeling took", format(end_sequential_modeling - start_sequential_modeling)))
forecasts <- forecast(models, new_data = test)
print(paste("Sequential forecasting took", format(Sys.time() - end_sequential_modeling)))
# MULTICORE --------------------------
plan(multicore)
start_multicore_modeling <- Sys.time()
models_multicore <-
train %>%
model(
arima = ARIMA(Open),
tslm = TSLM(Open ~ Volume + season()),
naive = NAIVE(Open),
prophet = prophet(
Open ~
Volume +
growth(type = "logistic", capacity = capacity, floor = floor)))
end_multicore_modeling <- Sys.time()
print(paste("Multicore modeling took", format(end_multicore_modeling - start_multicore_modeling)))
forecasts_multicore <- forecast(models_multicore, new_data = test)
print(paste("Multicore forecasting took", format(Sys.time() - end_multicore_modeling)))
# MULTISESSION --------------------------
plan(multisession)
start_multisession_modeling <- Sys.time()
models_multisession <-
train %>%
model(
arima = ARIMA(Open),
tslm = TSLM(Open ~ Volume + season()),
naive = NAIVE(Open),
prophet = prophet(
Open ~
Volume +
growth(type = "logistic", capacity = capacity, floor = floor)))
end_multisession_modeling <- Sys.time()
print(paste("Multisession modeling took", format(end_multisession_modeling - start_multisession_modeling)))
forecasts_multisession <- forecast(models_multisession, new_data = test)
print(paste("Multisession forecasting took", format(Sys.time() - end_multisession_modeling)))
Output for me was:
[1] "Sequential modeling took 6.105713 secs"
[1] "Sequential forecasting took 3.273358 secs"
[1] "Multicore modeling took 2.228881 secs"
[1] "Multicore forecasting took 1.384231 secs"
[1] "Multisession modeling took 14.54192 secs"
[1] "Multisession forecasting took 1.457515 secs"
Obviously, the multisession
overhead for modeling is a bit painful, but that should drop on a larger dataset. I just wanted to make sure it works even when the workers aren't sharing the current session's environment.
Thanks, looks promising. Do you know if the data transfer overhead for multicore is lower than multisession? I think substantial gains will be had by reducing object sizes here.
I get similar speed-up:
[1] "Sequential modeling took 8.717845 secs"
[1] "Sequential forecasting took 3.696366 secs"
[1] "Multicore modeling took 3.74861 secs"
[1] "Multicore forecasting took 2.048967 secs"
[1] "Multisession modeling took 11.08729 secs"
[1] "Multisession forecasting took 2.403781 secs"
So the underlying issue with the overhead is that multisession
actually creates another R session, copies the environment into it, then launches the process. multicore
works within the current session. multicore
is a bit unstable inside RStudio (hence me setting the option
to allow forked processes), but works far better when you're running it in a session from the command line.
You can reduce the overhead for multisession
by scoping what all has to be included, to include packages that have to be loaded. That's the slowest thing, most likely.
Don't hate me for being super hacky, but throw this in after you build data
.
data <- data %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_1"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_2"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_3"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_4"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_5"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_6"))) %>%
bind_rows(mutate(data, Symbol = paste(Symbol, "_7")))
My results were:
[1] "Sequential modeling took 46.61608 secs"
[1] "Sequential forecasting took 21.95314 secs"
[1] "Multicore modeling took 14.79503 secs"
[1] "Multicore forecasting took 26.5846 secs"
[1] "Multisession modeling took 1.087287 mins"
[1] "Multisession forecasting took 7.000597 secs"
It's an interesting question about the overhead. I need to run it on something much larger.
I extended the data out to 60 keys. Also, I killed off Chrome and a bunch of other things to free up more cores.
[1] "Sequential modeling took 1.396766 mins"
[1] "Sequential forecasting took 37.79279 secs"
[1] "Multicore modeling took 26.57042 secs"
[1] "Multicore forecasting took 14.76845 secs"
[1] "Multisession modeling took 38.57623 secs"
[1] "Multisession forecasting took 11.76835 secs"
I added a second key to them and it still worked just fine. Tomorrow morning, I'm going to test it on the process for work, which takes about 2 hours sequentially, but the machine has 80 cores that are used during modeling but not during forecasting. Yet. It's a bear.
Sounds great. I think there are some low hanging fruit for performance improvements which I'll try to get today.
Here's the base of an mapply
util function. I'm interested in extending it to use progressr
, if available, to give a progress bar.
versatile_mapply <-function(...){
if(is_attached("package:future")){
require_package("future.apply")
future_mapply(..., future.globals = FALSE)
}else{
mapply(...)
}
}
Anything written as an mapply
should be able to have this swapped in.
I agree that adding progressr reporting here is appropriate.
This is what I'm working with at the moment.
mapply_maybe_parallel <- function (.f, ..., MoreArgs = list(), SIMPLIFY = FALSE) {
if(is_attached("package:future")){
require_package("future.apply")
future.apply::future_mapply(
FUN = .f,
...,
MoreArgs = MoreArgs,
SIMPLIFY = SIMPLIFY,
future.globals = FALSE
)
}
else{
mapply(
FUN = .f,
...,
MoreArgs = MoreArgs,
SIMPLIFY = SIMPLIFY
)
}
}
mable_apply <- function (.data, .f, ..., names_to = ".model") {
mv <- mable_vars(.data)
kv <- key_vars(.data)
# Compute .f for all models in mable
result <- lapply(
as_tibble(.data)[mv],
mapply_maybe_parallel,
.f = .f,
.data,
MoreArgs = dots_list(...)
)
num_rows <- lapply(result, vapply, nrow, integer(1L))
# Assume same tsibble structure for all outputs
first_result <- result[[1]][[1]]
result <- vec_rbind(!!!lapply(result, function(x) vec_rbind(!!!x)))
# Compute .model label
model <- rep.int(mv, vapply(num_rows, sum, integer(1L)))
# Repeat key structure as needed
.data <- .data[rep.int(seq_along(num_rows[[1]]), num_rows[[1]]), kv]
# Combine into single
.data <- bind_cols(.data, !!names_to := model, result)
if (is_tsibble(first_result)) {
.data <- build_tsibble(
.data, key = c(kv, names_to, key_vars(first_result)),
index = index_var(first_result), index2 = !!index2(first_result),
ordered = is_ordered(first_result),
interval = interval(first_result))
}
return(.data)
}
I like it. Here's something for the progress bar (because sleep is silly). Obviously, you're paying a lot more attention to the additional arguments getting passed through, which is great.
possibly_fast_mapply <- function(FUN, ...) {
if(is_attached("package:future")) {
require_package("future.apply")
future.apply::future_mapply(
FUN = FUN, ...,
SIMPLIFY = FALSE,
future.globals = FALSE)
} else {
mapply(
FUN = FUN, ...,
SIMPLIFY = FALSE)
}
}
versatile_mapply <- function(FUN, ...) {
if(is_attached("package:progressr")) {
require_package("progressr")
# check to find the longest iterant, to define the progress bar
dots <- dots_list(...)
iterants <- dots[which(!names(dots) %in% c("MoreArgs", "SIMPLIFY", "USE.NAMES"))]
len <- 1:max(unlist(lapply(iterants, length)), na.rm = TRUE)
with_progress({
p <- progressor(along = len)
out <- possibly_fast_mapply(
FUN = function(...){
p()
FUN(...)
}, ...)
})
} else {
out <- possibly_fast_mapply(...)
}
out
}
I was using it out on nothing_in_particular <- versatile_mapply(FUN = sum, x = 1:10000, y = 2:10001)
. I really like progress bars. Steal and adapt, if you like.
I definitely appreciate your refusal to play their FUN
games, though. .f
is so much more pleasant.
As a note, the issue in mitchelloharawild/fable.prophet#17 continues. It doesn't happen with multicore
, since the environment is shared, but does with multisession
. This means that running a fable.prophet
model in RStudio (without that option
change) will likely fail to fit.
One way to solve this would be to attached the holidays to the tsibble
being modeled. Another would be to codify the way holidays will work in general, rather than specifically for fable.prophet
, and build in a check for them in the environment, ensuring that they are passed through to the workers. A third is, as you suggest in the other issue, carefully parsing through the formula for objects that might need to be added to the globals
argument in the future_mapply()
call.
It's an edge case and can be handled through a warning, but it seems like TSLM
and fasster
, as well as causal and econometric models, might benefit from a standardized holiday framework that is consistent across models.
Long term I hope for a better holiday framework, likely powered by {almanac} and representing holidays as part of the data object. However I do expect the need for non-data variables in model specifications, and so detecting and appropriately passing them through to the worker nodes is essential.
I assume that integrating almanac
would look like a function that takes in a recurrence bundle and creates columns on the tsibble
corresponding to the bundle, perhaps dragging things forward and backward in time?
Perhaps, when parsing specials
, object specifications that aren't variables in the tsibble
need to be stored in a common place. That way they can be passed as a named list to 'globals' to the future_mapply
.
I don't know if you're using it, @mitchelloharawild , but options(future.globals.onReference = "warning")
is helpful.