bad scaling in `map`
julia> using Distributed
julia> addprocs(2);
julia> @everywhere using DistributedArrays
julia> a = fill(1000,10);
julia> da = distribute(a);
julia> @time map(x->rand(x,x)^2, a);
0.903241 seconds (63.61 k allocations: 155.698 MiB, 29.39% gc time)
julia> @time map(x->rand(x,x)^2, da);
0.967328 seconds (776.84 k allocations: 38.713 MiB)
(first time compilation omitted)
Even though this is embarrassingly parallel, the distributed version is consistently around the same time or slower. I tried this in julia 0.3 and the distributed time is around 0.5 seconds, close to the expected ~2x speedup.
It looks like it actually scales
julia> @time map(x->sleep(0.005), a);
0.098876 seconds (58.11 k allocations: 2.854 MiB)
julia> @time map(x->sleep(0.005), da);
0.282395 seconds (689.12 k allocations: 35.036 MiB, 3.20% gc time)
julia> @time map(x->sleep(0.5), a);
5.066354 seconds (58.11 k allocations: 2.854 MiB)
julia> @time map(x->sleep(0.5), da);
2.763649 seconds (688.98 k allocations: 34.847 MiB, 0.15% gc time)
but that the latency is huge. I might try to bisect this later.
Yes latency has been becoming larger, and I haven't figured out where it is coming from...
I think this is related to matrix multiplication using multi threaded BLAS on the master but single threaded BLAS on the workers.
julia> using Distributed
julia> addprocs(4)
4-element Array{Int64,1}:
2
3
4
5
julia> @everywhere map(x->rand(x,x)^2, fill(1000,10));
julia> @time map(x->rand(x,x)^2, fill(1000,10));
0.308907 seconds (97.94 k allocations: 157.404 MiB, 18.32% gc time)
julia> @sync begin
for p in workers()
@spawnat p @time map(x->rand(x,x)^2, fill(1000,10))
end
end
From worker 3: 0.709352 seconds (43 allocations: 152.590 MiB, 18.87% gc time)
From worker 5: 0.699126 seconds (43 allocations: 152.590 MiB, 19.28% gc time)
From worker 4: 0.721240 seconds (43 allocations: 152.590 MiB, 18.96% gc time)
From worker 2: 0.704395 seconds (43 allocations: 152.590 MiB, 18.28% gc time)
Julia 0.3 used to set blas num threads to 1 on the master too when multi was invoked - https://github.com/JuliaLang/julia/blob/v0.3.0/base/multi.jl#L1233 . Somewhere along the way this was changed to the master process having default blas settings.
Setting num BLAS threads to 1 on the master shows distributed map scaling reasonably well for larger workloads.
julia> using Distributed
julia> addprocs(4);
julia> @everywhere using DistributedArrays
julia> using LinearAlgebra
julia> LinearAlgebra.BLAS.set_num_threads(1)
julia> a = fill(1000,100);
julia> da = distribute(a);
julia> @time map(x->rand(x,x)^2, a);
6.314533 seconds (61.50 k allocations: 1.493 GiB, 8.63% gc time)
julia> @time map(x->rand(x,x)^2, da);
1.779780 seconds (790.24 k allocations: 39.784 MiB)
Compilation runs omitted.
It doesn't explain my sleep example though. I just tried the same example in Julia 0.3 and got
julia> @time map(x->(sleep(0.005);x), a);
elapsed time: 0.071501629 seconds (5472 bytes allocated)
julia> @time map(x->(sleep(0.005);x), da);
elapsed time: 0.035031453 seconds (24552 bytes allocated)
so the latency is now much higher. In Julia 0.6, I'm getting
julia> @time map(x->(sleep(0.005);x), a);
0.085523 seconds (9.70 k allocations: 530.142 KiB)
julia> @time map(x->(sleep(0.005);x), da);
0.117121 seconds (66.14 k allocations: 3.522 MiB)
Initially, I thought this might just be overhead from broadcast since map now calls broadcast to simplify the code. Back in Julia 0.3, we didn't have broadcast so map was a much simpler function. However, it doesn't seem to be the actual issue here. Now switching back to 1.0.2, I'm getting
julia> function mymap(f, da::DArray)
DArray(size(da), procs(da)) do I
map(f, Array(da[I...]))
end
end
mymap (generic function with 1 method)
julia> @time mymap(x->(sleep(0.005);x), da);
0.173570 seconds (434.61 k allocations: 21.887 MiB, 2.71% gc time)
Timing map on DArray from within a function is vastly faster than outside. Though this is not the behavior for a local map.
julia> using Distributed
julia> addprocs(4);
julia> @everywhere using DistributedArrays
julia> a = fill(1000,10);
julia> da = distribute(a);
julia> function f()
@time map(x->(sleep(0.005);1), a);
@time map(x->(sleep(0.005);1), da);
end
f (generic function with 1 method)
julia> f(); # Precompile
0.123318 seconds (158.19 k allocations: 8.089 MiB)
2.017093 seconds (3.42 M allocations: 172.407 MiB, 4.86% gc time)
julia> f();
0.074737 seconds (52 allocations: 2.047 KiB)
0.028171 seconds (635 allocations: 29.703 KiB)
julia> @time map(x->(sleep(0.005);1), a); # Direct map call is similar
0.100029 seconds (56.04 k allocations: 2.764 MiB)
julia> @time map(x->(sleep(0.005);1), da); # Direct dmap call is much slower
0.337232 seconds (1.02 M allocations: 50.974 MiB, 3.73% gc time)
julia> @time map(x->(sleep(0.005);1), da); # Direct dmap is much slower
0.248714 seconds (761.22 k allocations: 38.618 MiB, 4.34% gc time)
The latency is in the REPL though I don't understand why it so much only for the DArray map case - look at the number of allocations in the latter....
Good observation. After setting the number of threads to one on the master process, the original example becomes
julia> function f()
a = fill(1000,10)
da = distribute(a)
@time map(t -> rand(t,t)^2, a)
@time map(t -> rand(t,t)^2, da)
return nothing
end
f (generic function with 1 method)
julia> f()
0.401364 seconds (42 allocations: 152.590 MiB)
0.206807 seconds (371 allocations: 25.438 KiB)
after warm up.
So the extra latency seems to be associated with compilation.
@JeffBezanson What are your thoughts here? It seems that the main difference between the 0.3 and 1.0 timings is due to more/slower compilation associated with the top level execution of map.