DistributedArrays.jl icon indicating copy to clipboard operation
DistributedArrays.jl copied to clipboard

Allowing a DArray to do as standard Array does

Open jwscook opened this issue 6 years ago • 4 comments

Dear All,

I'm considering the following MWE, which consists of creating a matrix and vector of a struct an doing a calculation on them. I import the relevant mathematical operators from base, and it just works. Next I addprocs and try it with DArray, and the same calculation fails. Is there a way to enable this kind of functionality? It's definitely possible that I'm barking up the wrong tree and there's a much better way of achieving this - please let me know!

using Distributed

import Base.+, Base.*, Base.zero

let
  struct Foo{T<:Number}
    x::T
  end
  Base.:+(a::Foo, b::Foo) = Foo(a.x + b.x)
  Base.:*(a::Foo, b::Foo) = Foo(a.x * b.x)
  Base.:zero(a::Foo{T}) where {T} = Foo(zero(T))
  function runlocal()
    A = [Foo(rand()) for i in 1:3, j in 1:3]
    b = [Foo(rand()) for j in 1:3]
    @show A * b
  end
  runlocal()
end

addprocs(2)
@everywhere using DistributedArrays
@everywhere let
  struct Foo{T<:Number}
    x::T
  end
  Base.:+(a::Foo, b::Foo) = Foo(a.x + b.x)
  Base.:*(a::Foo, b::Foo) = Foo(a.x * b.x)
  Base.:zero(a::Foo{T}) where {T} = Foo(zero(T))
  function rundistributed()
    A = @DArray [Foo(rand()) for i in 1:3, j in 1:3]
    b = @DArray [Foo(rand()) for j in 1:3]
    @show A * b
  end
  rundistributed()
end

Thanks, James

Output:

ERROR: On worker 2:
MethodError: Cannot `convert` an object of type Int64 to an object of type Foo{Float64}
Closest candidates are:
  convert(::Type{T}, ::T) where T at essentials.jl:154
  Foo{Float64}(::Any) where T<:Number at REPL[6]:3 (method too new to be called from this world context.)
fill! at ./array.jl:283
#253 at /home/cookj/.julia/packages/DistributedArrays/f3HAT/src/linalg.jl:111
run_work_thunk at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148 at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152 at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
#252 at /home/cookj/.julia/packages/DistributedArrays/f3HAT/src/linalg.jl:107 [inlined]
#672 at ./asyncmap.jl:100
macro expansion at ./asyncmap.jl:235 [inlined]
#688 at ./task.jl:259
#remotecall_wait#154(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Module, ::Vararg{Any,N} where N) at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:421
remotecall_wait(::Function, ::Distributed.Worker, ::Module, ::Vararg{Any,N} where N) at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:412
#remotecall_wait#157(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Module, ::Vararg{Any,N} where N) at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:433
remotecall_wait(::Function, ::Int64, ::Module, ::Vararg{Any,N} where N) at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:433
(::getfield(Distributed, Symbol("##163#165")){Module,Expr})() at ./task.jl:259

...and 2 more exception(s).

Stacktrace:
 [1] sync_end(::Array{Any,1}) at ./task.jl:226
 [2] macro expansion at ./task.jl:245 [inlined]
 [3] remotecall_eval(::Module, ::Array{Int64,1}, ::Expr) at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:206
 [4] top-level scope at /home/cookj/builds/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:190

jwscook avatar May 03 '19 12:05 jwscook

What you seeing is an unfortunate interaction between how distributed Julia is implemented and how struct definitions work.

Note the:

Foo{Float64}(::Any) where T<:Number at REPL[6]:3 (method too new to be called from this world context.)

Lift the call to the rundistributed function into this own @everywhere block.

Generally I try to avoid @everywhere as much as possible, but that requires method and struct definitions to be in their own module/file that is loaded on all workers at the beginning of the program.

vchuravy avatar May 03 '19 12:05 vchuravy

Lift the call to the rundistributed function into this own @everywhere block.

Sorry, can you explain exactly what you mean please? I put rundistributed with it's own @everywhere, but it failed again with the same error (see below). Also, is there a resource of best-practices? Thanks!

using Distributed
import Base.+, Base.*, Base.zero
addprocs(2)
@everywhere using DistributedArrays
@everywhere begin

  struct Foo{T<:Number}
    x::T
  end
  Base.:+(a::Foo, b::Foo) = Foo(a.x + b.x)
  Base.:*(a::Foo, b::Foo) = Foo(a.x * b.x)
  Base.:zero(a::Foo{T}) where {T} = Foo(zero(T))
  function rundistributed()
    A = @DArray [Foo(rand()) for i in 1:3, j in 1:3]
    b = @DArray [Foo(rand()) for j in 1:3]
    @show A * b
  end
end
@everywhere  rundistributed()

[edit - I was so distracted making this reply - see amended quote]

jwscook avatar May 03 '19 18:05 jwscook

For the record

Module.jl contains

struct Foo{T<:Number}
  x::T
end

Base.:+(a::Foo, b::Foo) = Foo(a.x + b.x)
Base.:*(a::Foo, b::Foo) = Foo(a.x * b.x)
Base.:zero(a::Foo{T}) where {T} = Foo(zero(T))

function run()
  A = @DArray [Foo(rand()) for i in 1:3, j in 1:3]
  b = @DArray [Foo(rand()) for j in 1:3]
  @show A * b
end

and Main.jl contains

using Distributed
addprocs(2)
@everywhere using DistributedArrays
@everywhere include("Module.jl")
@time run()

Results in:

$julia -p 2 Main.jl
ERROR: LoadError: On worker 2:
MethodError: Cannot `convert` an object of type Int64 to an object of type Foo{Float64}
Closest candidates are:
  convert(::Type{T}, !Matched::T) where T at essentials.jl:154
  Foo{Float64}(::Any) where T<:Number at /Users/james/Documents/code/julia/parallel/distributedarrays/Module.jl:3
fill! at ./array.jl:283
#237 at /Users/james/.julia/packages/DistributedArrays/XV7NS/src/linalg.jl:116
#112 at /Users/james/Documents/code/builds/julia1.0/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:269
run_work_thunk at /Users/james/Documents/code/builds/julia1.0/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
macro expansion at /Users/james/Documents/code/builds/julia1.0/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:269 [inlined]
#111 at ./task.jl:259
Stacktrace:
 [1] (::getfield(Base, Symbol("##682#684")))(::Task) at ./asyncmap.jl:178
 [2] foreach(::getfield(Base, Symbol("##682#684")), ::Array{Any,1}) at ./abstractarray.jl:1835
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{Int64,1}) at ./asyncmap.jl:178
 [4] wrap_n_exec_twice at ./asyncmap.jl:154 [inlined]
 [5] #async_usemap#667(::Int64, ::Nothing, ::Function, ::getfield(DistributedArrays, Symbol("##236#241")){DArray{Foo{Float64},1,Array{Foo{Float64},1}},Int64}, ::Array{Int64,1}) at ./asyncmap.jl:103
 [6] #async_usemap at ./sysimg.jl:0 [inlined]
 [7] #asyncmap#666 at ./asyncmap.jl:81 [inlined]
 [8] asyncmap at ./asyncmap.jl:81 [inlined]
 [9] mul!(::DArray{Foo{Float64},1,Array{Foo{Float64},1}}, ::DArray{Foo{Float64},2,Array{Foo{Float64},2}}, ::DArray{Foo{Float64},1,Array{Foo{Float64},1}}, ::Int64, ::Int64) at /Users/james/.julia/packages/DistributedArrays/XV7NS/src/linalg.jl:111
 [10] mul!(::DArray{Foo{Float64},1,Array{Foo{Float64},1}}, ::DArray{Foo{Float64},2,Array{Foo{Float64},2}}, ::DArray{Foo{Float64},1,Array{Foo{Float64},1}}) at /Users/james/.julia/packages/DistributedArrays/XV7NS/src/linalg.jl:91
 [11] *(::DArray{Foo{Float64},2,Array{Foo{Float64},2}}, ::DArray{Foo{Float64},1,Array{Foo{Float64},1}}) at /Users/james/.julia/packages/DistributedArrays/XV7NS/src/linalg.jl:276
 [12] macro expansion at ./show.jl:555 [inlined]
 [13] run() at /Users/james/Documents/code/julia/parallel/distributedarrays/Module.jl:13
 [14] top-level scope at util.jl:156
 [15] include at ./boot.jl:317 [inlined]
 [16] include_relative(::Module, ::String) at ./loading.jl:1044
 [17] include(::Module, ::String) at ./sysimg.jl:29
 [18] exec_options(::Base.JLOptions) at ./client.jl:266
 [19] _start() at ./client.jl:425
in expression starting at /Users/james/Documents/code/julia/parallel/distributedarrays/Main.jl:8

jwscook avatar May 09 '19 18:05 jwscook

I am currently low on bandwidth, if you have the time and inclination to look into this yourself, here is my hunch.

It seems to me that the method we want to call is too new. So the question is why? handle_msg is being called from a task (task freeze their world-age). So maybe calling msg.f in https://github.com/JuliaLang/julia/blob/829a4a665a749efd109ac5dd857b979ec7dd944d/stdlib/Distributed/src/process_messages.jl#L292 with invokelatest might sidestep this issue.

vchuravy avatar May 10 '19 02:05 vchuravy