`@distributed` fails on loops over iterators
Distributed for loops over iterators fail, seemingly because @distributed expects the iterator to implement getindex. It makes sense that @distribute needs getindex, but I don't see that in the docs. The possible solutions seem to be:
- Update the docs for
@distributedto mention this requirement. - Update the implementation of
@distributedto work for iterators.
We could do JuliaLang/julia#1 until JuliaLang/julia#2 is finished.
Here's an example:
@distributed (*) for (i, j) in Base.Iterators.product([1, 2], [3, 4])
i + j
end
ERROR: LoadError: MethodError: no method matching getindex(::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64)
(::getfield(Main, Symbol("##7#8")))(::typeof(+), ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64, ::Int64) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:275
(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:339
run_work_thunk(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch(::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
(::getfield(Distributed, Symbol("##167#168")){typeof(+),getfield(Main, Symbol("##7#8")),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Array{UnitRange{Int64},1},Int64,Int64})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:259
Stacktrace:
[1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
[2] wait() at ./event.jl:255
[3] wait(::Condition) at ./event.jl:46
[4] wait(::Task) at ./task.jl:188
[5] fetch at ./task.jl:202 [inlined]
[6] iterate at ./generator.jl:47 [inlined]
[7] collect(::Base.Generator{Array{Task,1},typeof(fetch)}) at ./array.jl:619
[8] preduce(::Function, ::Function, ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:263
[9] top-level scope at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:274
[10] include at ./boot.jl:317 [inlined]
[11] include_relative(::Module, ::String) at ./loading.jl:1044
[12] include(::Module, ::String) at ./sysimg.jl:29
[13] include(::String) at ./client.jl:392
[14] top-level scope at none:0
in expression starting at /home/dan/julia/test.jl:3
This is surprising, since the following non-parallel reduction works:
julia> reduce((*), [i + j for (i, j) = Base.Iterators.product([1, 2], [3, 4])])
600
Adding a collect to the distributed version fixes it:
@distributed (*) for (i, j) in collect(Base.Iterators.product([1, 2], [3, 4]))
i + j
end
Output: 600
Julia Version 1.0.2
Commit d789231e99* (2018-11-08 20:11 UTC)
Platform Info:
OS: Linux (x86_64-linux-gnu)
CPU: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
WORD_SIZE: 64
LIBM: libopenlibm
LLVM: libLLVM-6.0.0 (ORCJIT, skylake)
Environment:
JULIA_REVISE_INCLUDE = 1
JULIA_DEBUG = all
Does any one tell me whether the macro "@distributed" work on iterators now? I have a very large data array (more than 900,000,000) after combinations, collect() function will exhaust all memory.
Yes, @distributed still requires indexing — it'd indeed be good to document this requirement.
Note that distributing such a large array to your workers may swamp the computation time with communication overhead. You may want to consider "peeling off" one of your combinatorial elements, distribute that amongst your workers, and then have each of your workers compute the combinations of that one element with the remainder.
Thanks, mbauman. I will try your suggestions.
Hello Matt,
if I understand correctly, this indexing requirement is the reason behind eachrow & eachcol not working with @distributed? Julia reports a syntax error saying 'invalid assignment location' in 1.2.
Yes, that's correct. Just as a breadcrumb, with https://github.com/JuliaLang/julia/pull/32310 we could possibly add indexing support.