Error with remotecall_fetch called on locally defined closures
Ref: discourse link
In cases when parallel data movement is a bottle neck, it would useful to be able to use closures with pre-computed internal state parameters that are computed on each worker only once and not serialized from the master node.
Here are two examples. The first one is a simple case where gen_foo returns a closure generated on all workers. However remotecall_fetch gives an error. @amitmurthy gave a workaround in the link above but it would be nice to have a more direct approach.
The second example tries to use planned FFT's as internal states in the closures.
First example
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
julia> @everywhere function gen_foo(local_state)
foo(x) = x * sum(local_state)
return foo::Function
end
julia> @everywhere foo = gen_foo(rand(100,100))
julia> remotecall_fetch(foo, 1, 10) #<-- works
50008.9259374688
julia> remotecall_fetch(foo, 2, 10) #<-- UndefVarError: #foo#9 not defined
ERROR: On worker 2:
UndefVarError: #foo#9 not defined
deserialize_datatype at ./serialize.jl:968
handle_deserialize at ./serialize.jl:674
deserialize at ./serialize.jl:634
handle_deserialize at ./serialize.jl:681
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
JuliaLang/julia#99 at ./event.jl:73
Stacktrace:
[1] #remotecall_fetch#141(::Array{Any,1}, ::Function, ::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:354
[2] remotecall_fetch(::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:346
[3] #remotecall_fetch#144(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367
[4] remotecall_fetch(::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367
Second example
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
julia> @everywhere function gen_bar(local_state, Δx, n)
FFT = Δx / (2π) * plan_rfft(rand(n); flags=FFTW.PATIENT, timelimit=4)
function bar(x)
y = FFT * (x .* local_state)
return y[1]
end
return bar::Function
end
julia> @everywhere Δx, n = 0.1, 1000
julia> @everywhere bar = gen_bar(rand(n), Δx, n)
julia> x = rand(n);
julia> @everywhere x=$x
julia> @everywhere println(bar(x)) # <-- works
3.9819631247716307 + 0.0im
From worker 2: 3.8079619702395155 + 0.0im
From worker 3: 4.008786805675184 + 0.0im
julia> remotecall_fetch(bar, 1, x) # <-- works
3.9819631247716307 + 0.0im
However I get UndefVarError: #bar#9 not defined for these
remotecall_fetch(bar, 2, x) # <-- UndefVarError: #bar#9 not defined
out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
bar(x)
end
out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
remotecall_fetch(bar, i, x)
end
For this particular use case, remote references can be used to store locally defined closures on each node and fetch it locally before execution. Something like:
# create
foo_refs = Dict()
for p in procs()
foo_refs[p] = RemoteChannel(p)
end
# initialize on all nodes
for p in procs()
remote_do(x->put!(x, gen_foo(rand(100,100))), p, foo_refs[p])
end
# access on worker 2
remotecall_fetch(2, foo_refs[2], 10) do rr, v
fetch(rr)(v)
end
You can use https://github.com/JuliaParallel/DistributedArrays.jl#working-with-distributed-non-array-data-requires-julia-06 in DistributedArrays to do the same more efficiently and in fewer lines of code.
An issue I see is the difference in the way we treat a global function definition vs closures bound to global variables.
With julia -p 4
julia> @everywhere foov = rand()
julia> @everywhere func_foo() = foov
julia> @everywhere glb_var_foo = ()->foov
julia> [remotecall_fetch(func_foo, p) for p in procs()]
5-element Array{Float64,1}:
0.0138725
0.451707
0.41358
0.36171
0.331541
julia>
julia> [remotecall_fetch(glb_var_foo, p) for p in procs()]
5-element Array{Float64,1}:
0.0138725
0.0138725
0.0138725
0.0138725
0.0138725
In the second case it is actually the closure defined on node 1 that is serialized and executed on the remote node while one may intuitively expect it to be the locally defined glb_var_foo.
At this time I think we just need to better highlight this difference and add examples to the documentation.
The original issue is also related to gen_foo defining a local foo which is then bound to a global variable foo. Consider the following 2 cases:
The first one errors as in the original issue:
julia> gen_foo() = (foo() = 1; foo)
gen_foo (generic function with 1 method)
julia> foo = gen_foo()
foo (generic function with 1 method)
julia> remotecall_fetch(foo, 2)
ERROR: On worker 2:
UndefVarError: #foo#1 not defined
deserialize_datatype at ./serialize.jl:975
However, if we change gen_foo to now return a locally defined bar, it works.
julia> gen_foo() = (bar() = 1; bar)
gen_foo (generic function with 1 method)
julia> foo = gen_foo()
(::bar) (generic function with 1 method)
julia> remotecall_fetch(foo, 2)
1
How is the name of the locally defined function affecting the way in which global variable foo is serialized? @vtjnash / @JeffBezanson ? Anyone?
Bump...
Hi -- Thanks for the helpful work-around. I'd also much appreciate the ability to call remote-specific functions. I'm currently computing a likelihood in parallel and am using sets of pre-allocated temporary variables on each worker. Using @everywhere lets me get part of the way there. My work-around to call remote-specific closures extends your code, @amitmurthy.
# to zero-out all tmpvars
@everywhere zero_tmp() = zero!(remote_tmp::TmpVar)
function call_remote_fun(f::Symbol)
for p in workers()
remotecall_fetch(() -> eval(Main, :($(f)())), p)
end
end
call_remote_fun(:zero_tmp)
# to sum over remote tmpvars
@everywhere remote_f() = get_from_remote_tmpvars(remote_tmp::TmpVar)
function mapreduce_remotevars(f::Symbol, R::Function)
mapreduce((p::Int) -> remotecall_fetch(() -> eval(Main, :($(f)())), p), R, workers() )
end
mapreduce_remotevars(:remote_f, +)