queuing calls inside other functions
Love this package as I am a big fan of the python rq package and this feels like the closest I can get in R at the moment. I notice that you have been doing a fair amount of work in this space, so thanks very much and kudos.
I have come across a problem that yields an error:
[2018-04-10 13:30:49] STOP ERROR worker_1 | This is an uncaught error in rrq, probably a bug! worker_1 | Error in storr_copy(dest, self, list, namespace, skip_missing) : worker_1 | Missing values; can't copy: worker_1 | - from namespace 'objects', key: '6717f2823d3202449301145073ab8719' worker_1 | Calls: rrq_worker ... tryCatch -> tryCatchList -> tryCatchOne -> <Anonymous> worker_1 | Execution halted
as a reproducible example i have a a script file "myfuns.R"
slowdouble <- function(x) {
Sys.sleep(x)
print(x * 2)
x*2
}
I create a context and save it separately to be loaded in both the master and worker with
context_holder = "context"
dir.create(context_holder)
root = paste0(context_holder, "/test_context")
out = paste0(context_holder,"/context.RData")
orig_source = "myfuns.R"
new_source = paste0(context_holder,"/",orig_source)
file.copy(orig_source,new_source)
context = context::context_save(root,sources = new_source)
context = context::context_load(context, new.env(parent = .GlobalEnv))
saveRDS(context, out)
Run the worker in one R process
library(rrq)
context = readRDS("context/context.RData")
rrq_worker(context, redux::hiredis(host = host, port = port))
Then in the master
library(rrq)
context = readRDS("context/context.RData")
obj = rrq_controller(context,redux::hiredis(host = host,port = port))
to create the controller object.
It works fine if I run
obj$enqueue(slowdouble(1))
but breaks if I do
f = function(x){
obj$enqueue(slowdouble(x))
}
f(1)
Any ideas on why this might be or what I might do to fix it?
Hi @jamieRowen - sorry for missing this; I don't see most github notifications as they get a bit overwhelming!
This slightly modified version of your script works for me:
Sys.setenv(R_TESTS = "")
root <- tempfile()
context <- context::context_save(root, sources = "myfuns.R")
context <- context::context_load(context, new.env(parent = .GlobalEnv))
obj <- rrq_controller(context, redux::hiredis())
on.exit(obj$destroy())
## For testing, use: worker_command(obj)
wid <- workers_spawn(obj, timeout = 5, progress = PROGRESS)
t <- obj$enqueue(slowdouble(1))
obj$task_wait(t)
f <- function(x) {
obj$enqueue(slowdouble(x))
}
t <- f(1)
obj$task_wait(t)
I think that the issue is the use of saveRDS/readRDS is messing up your environments