Distributed.jl
Distributed.jl copied to clipboard
Different remote copying behaviour with one process vs multiple
using Base.Test
function main()
@testset begin
rng = MersenneTwister(3742)
numbers = pmap(1:1000) do _
rand(rng)
end
@test all(numbers .== numbers[1])
spawn_numbers = [(@fetch rand(rng)) for i = 1:1000]
@test all(spawn_numbers .== spawn_numbers[1])
pfor_numbers = @parallel vcat for i = 1:1000
[rand(rng)]
end
@test all(pfor_numbers .== pfor_numbers[1])
end
end
main()
There are three cases here to compare different behaviours with the different parallel methods but I really just care about the @fetch/@spawn case.
ericdavies@whitacre> julia -p 1 ~/rng_parallel_test.jl
test set: Test Failed
Expression: all(pfor_numbers .== pfor_numbers[1])
in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
in macro expansion at rng_parallel_test.jl:20 [inlined]
in macro expansion at test.jl:674 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
Test Summary: | Pass Fail Total
test set | 2 1 3
ERROR: LoadError: Some tests did not pass: 2 passed, 1 failed, 0 errored, 0 broken.
in finish(::Base.Test.DefaultTestSet) at test.jl:498
in macro expansion at test.jl:681 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24
ericdavies@whitacre> julia ~/rng_parallel_test.jl
test set: Test Failed
Expression: all(numbers .== numbers[1])
in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
in macro expansion at rng_parallel_test.jl:10 [inlined]
in macro expansion at test.jl:674 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
test set: Test Failed
Expression: all(spawn_numbers .== spawn_numbers[1])
in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
in macro expansion at rng_parallel_test.jl:14 [inlined]
in macro expansion at test.jl:674 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
test set: Test Failed
Expression: all(pfor_numbers .== pfor_numbers[1])
in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
in macro expansion at rng_parallel_test.jl:20 [inlined]
in macro expansion at test.jl:674 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
Test Summary: | Fail Total
test set | 3 3
ERROR: LoadError: Some tests did not pass: 0 passed, 3 failed, 0 errored, 0 broken.
in finish(::Base.Test.DefaultTestSet) at test.jl:498
in macro expansion at test.jl:681 [inlined]
in main() at rng_parallel_test.jl:4
in include_from_node1(::String) at loading.jl:488
in include_from_node1(::String) at sys.dylib:?
in process_options(::Base.JLOptions) at client.jl:265
in _start() at client.jl:321
in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24
When there are one or more additional processes, rng is copied. When there is only one process, it is not.
The reason the @parallel for case fails both times is due to the use of CachingPool, which may also be used for pmap in the future via https://github.com/JuliaLang/julia/issues/21946.
Ideally I would like the behaviours to be consistent (though I don't need them to be deterministic).
This would be a tricky situation to address if the RNG is being serialized/deserialized every time.
On 0.6, global const rng = MersenneTwister(3742) would ensure consistency with/without additional workers.
The behaviour we're aiming for in our use case is actually to have the RNG copied each time, but that's outside this issue. You can replace the RNG with any local mutable for the purposes of this issue.
mutables are not synchronized across processes in any manner. There is no cluster wide shared state for mutables referenced in closures. A common data store external to the julia processes can address such a requirement. Or keeping data on one of the processes and workers doing an atomic update-and-fetch from the data store process.
But does that mean that mutating mutables captured with @spawn has undefined behaviour and might or might not mutate the same variable in other @spawn calls?
The docs currently say (emphasis mine):
Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process.
This is the only mention of the behaviour of captured mutables.
I think one of these should happen:
- Mutables are either copied for each
@spawncall, even if the only worker is the host, or cached and shared among processes based on some trait such as:
remote_move(::Type{MyType}) = Copy()
remote_move(::Type{MyOtherType}) = Cache()
- Mutables are always copied for each
@spawncall, even if the only worker is the host - The cache-for-each-worker behaviour is used for
@spawnandpmapand documented in@spawnandpmapdocstrings as well as in theParallel Computingsection of the manual
For performance reasons remote calls executing locally short-circuit the entire serialization-deserialization cycle of the closure. I get your view about consistent results between nprocs=1 and nprocs>1 in absolutely all cases.
The easier implementation to ensure similar behavior in all cases is to have remote calls to myid() go through a loopback connection. But at the cost of inefficiency in remote calls to the local process.