Distributed.jl icon indicating copy to clipboard operation
Distributed.jl copied to clipboard

`@distributed` fails on loops over iterators

Open dangirsh opened this issue 7 years ago • 5 comments

Distributed for loops over iterators fail, seemingly because @distributed expects the iterator to implement getindex. It makes sense that @distribute needs getindex, but I don't see that in the docs. The possible solutions seem to be:

  1. Update the docs for @distributed to mention this requirement.
  2. Update the implementation of @distributed to work for iterators.

We could do JuliaLang/julia#1 until JuliaLang/julia#2 is finished.

Here's an example:

@distributed (*) for (i, j) in Base.Iterators.product([1, 2], [3, 4])
    i + j
end
ERROR: LoadError: MethodError: no method matching getindex(::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64)
(::getfield(Main, Symbol("##7#8")))(::typeof(+), ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64, ::Int64) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:275
(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:339
run_work_thunk(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch(::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
(::getfield(Distributed, Symbol("##167#168")){typeof(+),getfield(Main, Symbol("##7#8")),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Array{UnitRange{Int64},1},Int64,Int64})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:259
Stacktrace:
 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] wait(::Condition) at ./event.jl:46
 [4] wait(::Task) at ./task.jl:188
 [5] fetch at ./task.jl:202 [inlined]
 [6] iterate at ./generator.jl:47 [inlined]
 [7] collect(::Base.Generator{Array{Task,1},typeof(fetch)}) at ./array.jl:619
 [8] preduce(::Function, ::Function, ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:263
 [9] top-level scope at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:274
 [10] include at ./boot.jl:317 [inlined]
 [11] include_relative(::Module, ::String) at ./loading.jl:1044
 [12] include(::Module, ::String) at ./sysimg.jl:29
 [13] include(::String) at ./client.jl:392
 [14] top-level scope at none:0
in expression starting at /home/dan/julia/test.jl:3

This is surprising, since the following non-parallel reduction works:

julia> reduce((*), [i + j for (i, j) = Base.Iterators.product([1, 2], [3, 4])])  
600

Adding a collect to the distributed version fixes it:

@distributed (*) for (i, j) in collect(Base.Iterators.product([1, 2], [3, 4]))
    i + j
end

Output: 600

Julia Version 1.0.2
Commit d789231e99* (2018-11-08 20:11 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.0 (ORCJIT, skylake)
Environment:
  JULIA_REVISE_INCLUDE = 1
  JULIA_DEBUG = all

dangirsh avatar Dec 11 '18 00:12 dangirsh

Does any one tell me whether the macro "@distributed" work on iterators now? I have a very large data array (more than 900,000,000) after combinations, collect() function will exhaust all memory.

sunnsi avatar Feb 19 '19 06:02 sunnsi

Yes, @distributed still requires indexing — it'd indeed be good to document this requirement.

Note that distributing such a large array to your workers may swamp the computation time with communication overhead. You may want to consider "peeling off" one of your combinatorial elements, distribute that amongst your workers, and then have each of your workers compute the combinations of that one element with the remainder.

mbauman avatar Feb 20 '19 19:02 mbauman

Thanks, mbauman. I will try your suggestions.

sunnsi avatar Feb 21 '19 08:02 sunnsi

Hello Matt,

if I understand correctly, this indexing requirement is the reason behind eachrow & eachcol not working with @distributed? Julia reports a syntax error saying 'invalid assignment location' in 1.2.

bocc avatar Dec 16 '19 17:12 bocc

Yes, that's correct. Just as a breadcrumb, with https://github.com/JuliaLang/julia/pull/32310 we could possibly add indexing support.

mbauman avatar Dec 16 '19 18:12 mbauman