DistributedArrays.jl icon indicating copy to clipboard operation
DistributedArrays.jl copied to clipboard

bad scaling in `map`

Open JeffBezanson opened this issue 5 years ago • 8 comments

julia> using Distributed

julia> addprocs(2);

julia> @everywhere using DistributedArrays

julia> a = fill(1000,10);

julia> da = distribute(a);

julia> @time map(x->rand(x,x)^2, a);
  0.903241 seconds (63.61 k allocations: 155.698 MiB, 29.39% gc time)

julia> @time map(x->rand(x,x)^2, da);
  0.967328 seconds (776.84 k allocations: 38.713 MiB)

(first time compilation omitted)

Even though this is embarrassingly parallel, the distributed version is consistently around the same time or slower. I tried this in julia 0.3 and the distributed time is around 0.5 seconds, close to the expected ~2x speedup.

JeffBezanson avatar Oct 25 '18 03:10 JeffBezanson

It looks like it actually scales

julia> @time map(x->sleep(0.005), a);
  0.098876 seconds (58.11 k allocations: 2.854 MiB)

julia> @time map(x->sleep(0.005), da);
  0.282395 seconds (689.12 k allocations: 35.036 MiB, 3.20% gc time)

julia> @time map(x->sleep(0.5), a);
  5.066354 seconds (58.11 k allocations: 2.854 MiB)

julia> @time map(x->sleep(0.5), da);
  2.763649 seconds (688.98 k allocations: 34.847 MiB, 0.15% gc time)

but that the latency is huge. I might try to bisect this later.

andreasnoack avatar Oct 25 '18 08:10 andreasnoack

Yes latency has been becoming larger, and I haven't figured out where it is coming from...

vchuravy avatar Nov 01 '18 12:11 vchuravy

I think this is related to matrix multiplication using multi threaded BLAS on the master but single threaded BLAS on the workers.

julia> using Distributed

julia> addprocs(4)
4-element Array{Int64,1}:
 2
 3
 4
 5

julia> @everywhere map(x->rand(x,x)^2, fill(1000,10));

julia> @time map(x->rand(x,x)^2, fill(1000,10));
  0.308907 seconds (97.94 k allocations: 157.404 MiB, 18.32% gc time)

julia> @sync begin
           for p in workers()
               @spawnat p @time map(x->rand(x,x)^2, fill(1000,10))
           end
       end
      From worker 3:	  0.709352 seconds (43 allocations: 152.590 MiB, 18.87% gc time)
      From worker 5:	  0.699126 seconds (43 allocations: 152.590 MiB, 19.28% gc time)
      From worker 4:	  0.721240 seconds (43 allocations: 152.590 MiB, 18.96% gc time)
      From worker 2:	  0.704395 seconds (43 allocations: 152.590 MiB, 18.28% gc time)

Julia 0.3 used to set blas num threads to 1 on the master too when multi was invoked - https://github.com/JuliaLang/julia/blob/v0.3.0/base/multi.jl#L1233 . Somewhere along the way this was changed to the master process having default blas settings.

amitmurthy avatar Nov 24 '18 17:11 amitmurthy

Setting num BLAS threads to 1 on the master shows distributed map scaling reasonably well for larger workloads.

julia> using Distributed

julia> addprocs(4);

julia> @everywhere using DistributedArrays

julia> using LinearAlgebra

julia> LinearAlgebra.BLAS.set_num_threads(1)

julia> a = fill(1000,100);

julia> da = distribute(a);

julia> @time map(x->rand(x,x)^2, a);
  6.314533 seconds (61.50 k allocations: 1.493 GiB, 8.63% gc time)

julia> @time map(x->rand(x,x)^2, da);
  1.779780 seconds (790.24 k allocations: 39.784 MiB)

Compilation runs omitted.

amitmurthy avatar Nov 25 '18 04:11 amitmurthy

It doesn't explain my sleep example though. I just tried the same example in Julia 0.3 and got

julia> @time map(x->(sleep(0.005);x), a);
elapsed time: 0.071501629 seconds (5472 bytes allocated)

julia> @time map(x->(sleep(0.005);x), da);
elapsed time: 0.035031453 seconds (24552 bytes allocated)

so the latency is now much higher. In Julia 0.6, I'm getting

julia> @time map(x->(sleep(0.005);x), a);
  0.085523 seconds (9.70 k allocations: 530.142 KiB)

julia> @time map(x->(sleep(0.005);x), da);
  0.117121 seconds (66.14 k allocations: 3.522 MiB)

Initially, I thought this might just be overhead from broadcast since map now calls broadcast to simplify the code. Back in Julia 0.3, we didn't have broadcast so map was a much simpler function. However, it doesn't seem to be the actual issue here. Now switching back to 1.0.2, I'm getting

julia> function mymap(f, da::DArray)
         DArray(size(da), procs(da)) do I
           map(f, Array(da[I...]))
         end
       end
mymap (generic function with 1 method)

julia> @time mymap(x->(sleep(0.005);x), da);
  0.173570 seconds (434.61 k allocations: 21.887 MiB, 2.71% gc time)

andreasnoack avatar Nov 25 '18 09:11 andreasnoack

Timing map on DArray from within a function is vastly faster than outside. Though this is not the behavior for a local map.

julia> using Distributed

julia> addprocs(4);

julia> @everywhere using DistributedArrays

julia> a = fill(1000,10);

julia> da = distribute(a);

julia> function f()
           @time map(x->(sleep(0.005);1), a);
           @time map(x->(sleep(0.005);1), da);
       end
f (generic function with 1 method)

julia> f();  # Precompile
  0.123318 seconds (158.19 k allocations: 8.089 MiB)
  2.017093 seconds (3.42 M allocations: 172.407 MiB, 4.86% gc time)

julia> f();
  0.074737 seconds (52 allocations: 2.047 KiB)
  0.028171 seconds (635 allocations: 29.703 KiB)

julia> @time map(x->(sleep(0.005);1), a); # Direct map call is similar
  0.100029 seconds (56.04 k allocations: 2.764 MiB)

julia> @time map(x->(sleep(0.005);1), da); # Direct dmap call is much slower
  0.337232 seconds (1.02 M allocations: 50.974 MiB, 3.73% gc time)

julia> @time map(x->(sleep(0.005);1), da); # Direct dmap is much slower
  0.248714 seconds (761.22 k allocations: 38.618 MiB, 4.34% gc time)

The latency is in the REPL though I don't understand why it so much only for the DArray map case - look at the number of allocations in the latter....

amitmurthy avatar Nov 25 '18 11:11 amitmurthy

Good observation. After setting the number of threads to one on the master process, the original example becomes

julia> function f()
         a  = fill(1000,10)
         da = distribute(a)
         @time map(t -> rand(t,t)^2, a)
         @time map(t -> rand(t,t)^2, da)
         return nothing
       end
f (generic function with 1 method)

julia> f()
  0.401364 seconds (42 allocations: 152.590 MiB)
  0.206807 seconds (371 allocations: 25.438 KiB)

after warm up.

So the extra latency seems to be associated with compilation.

andreasnoack avatar Nov 25 '18 13:11 andreasnoack

@JeffBezanson What are your thoughts here? It seems that the main difference between the 0.3 and 1.0 timings is due to more/slower compilation associated with the top level execution of map.

andreasnoack avatar Nov 26 '18 20:11 andreasnoack