Distributed.jl icon indicating copy to clipboard operation
Distributed.jl copied to clipboard

printing of worker information not correct

Open affans opened this issue 6 months ago • 8 comments

using Distributed on a Slurm cluster, I am unable to connect to the workers. The reason is that SlurmClusterManager requires the output from start_worker to be in the form julia_worker:PORT#IP.IP.IP.IP. However, when using srun to launch workers across allocated resources, this is what I get:

┌ Debug: srun command: `srun -D /home/affans /home/affans/.julia/juliaup/julia-1.10.3+0.x64.linux.gnu/bin/julia --worker`
└ @ SlurmClusterManager REPL[7]:25


julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:9427#172.16.1.26
9431#172.1.1.26
9425#172.1.1.26
9428#172.1.1.26
9430#172.1.1.26
9429#172.1.1.26
9423#172.1.1.26
9432#172.1.1.26
9426#172.1.1.26
9424#172.1.1.26

So somehow the print statements (to stdout) are in a race? I asked for 10 workers here, and it seemed to print all 10 julia_workers all on the same line. Here is another example of the print :

[ Info: Worker 1 output: julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_wor$[ Info: Worker 2 output: julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_wor$[ Info: Worker 3 output: julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_wor$[ Info: Worker 4 output: 9369#172.16.1.41
[ Info: Worker 5 output: julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_worker:julia_wor$[ Info: Worker 6 output: 9361#172.16.1.41
[ Info: Worker 7 output: julia_worker:9360#172.16.1.41
[ Info: Worker 8 output: 9365#172.16.1.41

Any reason what could be causing this?

Version info:

julia> versioninfo()
Julia Version 1.10.3
Commit 0b4590a5507 (2024-04-30 10:59 UTC)
Build Info:
  Official https://julialang.org/ release
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 32 × Intel(R) Xeon(R) CPU E5-2620 v4 @ 2.10GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-15.0.7 (ORCJIT, broadwell)
Threads: 32 default, 0 interactive, 16 GC (on 32 virtual cores)
Environment:
  LD_LIBRARY_PATH = /cm/shared/apps/slurm/16.05.8/lib64/slurm:/cm/shared/apps/slurm/16.05.8/lib64:/cm/shared/apps/openmpi/gcc/64/1.10.1/lib64
  JULIA_NUM_THREADS = 32
  LD_RUN_PATH = /cm/shared/apps/openmpi/gcc/64/1.10.1/lib64

julia> Distributed.VERSION
v"1.10.3"

affans avatar Jun 23 '25 21:06 affans

Barring a race condition in SlurmClusterManager.jl, I suspect this is because Distributed is not thread-safe in 1.10. Could you try with 1.12? The version of Distributed in 1.12 should be threadsafe: #73.

JamesWrigley avatar Jun 23 '25 22:06 JamesWrigley

I don't know about 1.12, but it works on 1.11 (which means it works on 1.12 probably). Could I ask what exactly is meant by saying Distributed is not thread-safe in 1.10? Doesn't the function start_worker() in distributed/src/cluster.jl run independently for each launched julia worker process?

Also, isn't the fix easy? Couldn't we replace

    print(out, "julia_worker:")  # print header
    print(out, "$(string(LPROC.bind_port))#") # print port
    print(out, LPROC.bind_addr)
    print(out, '\n')
    flush(out)

to just

print(out, "julia_worker:$(string(LPROC.bind_port))#LPROC.bind_addr")  # print header

Why is it split over three print statements?

Is there a way to fix this in 1.10?

affans avatar Jun 23 '25 22:06 affans

I am confuse about how it's working in 1.11, maybe it's some other problem 🤔 Can you try starting Julia with 1 thread and see if the problem still happens on 1.10?

Could I ask what exactly is meant by saying Distributed is not thread-safe in 1.10?

Basically Distributed is quite an old libary and was written before Julia supported threads, and so some parts of the codebase did not support stuff happening on threads other than thread 1. This caused various issues whenever multiple threads were used, some of which are described in #73. We finally made it threadsafe for 1.12 so that 1.12 can start with an interactive thread by default safely.

Doesn't the function start_worker() in distributed/src/cluster.jl run independently for each launched julia worker process?

It does, but I wonder if the problem might be somewhere in the handler for each worker running on the head node. About a potential fix for 1.10, if this is indeed caused by Distributed's lack of thread-safety then we could in theory backport all the threadsafety fixes, but that's rather a lot of changes. An alternative would be DistributedNext.jl but SlurmClusterManager.jl doesn't support that yet (though I don't think it would be difficult to add support).

JamesWrigley avatar Jun 23 '25 22:06 JamesWrigley

In 1.11, my Julia is configured to start with one thread:

(base) odinuser02@podin:~$ julia
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.11.5 (2025-04-14)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> Threads.nthreads()
1

julia> using Distributed, SlurmClusterManager; addprocs(SlurmManager()) # works fine

On 1.10, I was starting with 32 threads, but even setting JULIA_NUM_THREADS = 1 I get

julia> Threads.nthreads()
1

julia> addprocs(SlurmManager())
nested task error: could not parse 9820#172.16.1.26
        Stacktrace:
         [1] error(s::String)
           @ Base ./error.jl:35
         [2] (::SlurmClusterManager.var"#13#18"{SlurmManager, Vector{WorkerConfig}, Condition})()
           @ SlurmClusterManager ./REPL[4]:36

I did make sure the environment variable JULIA_NUM_THREADS is set to 1 to ensure all the workers have the same env. But still, it didn't work.

I am just very confused about this. The workers are launched using srun --worker which I believes hits the start_worker() entry point, and the only async line there is:

init_worker(cookie)
    interface = IPv4(LPROC.bind_addr)
    if LPROC.bind_port == 0
        port_hint = 9000 + (getpid() % 1000)
        (port, sock) = listenany(interface, UInt16(port_hint))
        LPROC.bind_port = port
    else
        sock = listen(interface, LPROC.bind_port)
    end
    errormonitor(@async while isopen(sock)           # <- can this be causing the issue?
        client = accept(sock)
        process_messages(client, client, true)
    end)
    print(out, "julia_worker:")  # print header
    print(out, "$(string(LPROC.bind_port))#") # print port
    print(out, LPROC.bind_addr)
    print(out, '\n')
    flush(out)

affans avatar Jun 23 '25 23:06 affans

Are you by any chance using more than one slurm task? I see that SlurmClusterManager is reading the output from srun concurrently: https://github.com/JuliaParallel/SlurmClusterManager.jl/blob/11d491c67b4439a2c1bf2afcb56b2fa078019daf/src/slurmmanager.jl#L202

With multiple tasks that definitely won't be safe 😅 In that case, changing Distributed to print everything in a single call would probably fix it.

JamesWrigley avatar Jun 24 '25 20:06 JamesWrigley

@JamesWrigley After a few hours working with this, I just edited Distributed to

#print(out, "hello from distributed: ")
#print(out, "julia_worker:")  # print header
#print(out, "$(string(LPROC.bind_port))#") # print port
#print(out, LPROC.bind_addr)
#print(out, '\n')
print(out, "julia_worker:$(string(LPROC.bind_port))#$(LPROC.bind_addr)\n")

This fixed the race issue between all the print statements to stdout. Do you think this is an acceptable PR? Why would it be broken up into multiple print statements anyways?

affans avatar Jul 12 '25 02:07 affans

Just to confirm, are you using more than one slurm task? I don't think your fix will make things worse but I'm not sure it's a complete fix given how SlurmClusterManager works.

JamesWrigley avatar Jul 12 '25 08:07 JamesWrigley

Yes, in my tests I was allocating somewhere 300 cpus (or "tasks"). I didn't understand the race condition before, but I get it now I think. Basically srun launches n number of worker julia processes for each task. As they are launching, they are printing their information to stdout which then causes the race condition. I believe the ClusterManagers didn't read from stdout. I believe they had the worker processes write the port+ip to a separate file per task, and then read that from those files. It made the working directory messy but never had this sort of issue with it.

I am still so confused that this isn't an issue on one cluster but is a issue in another cluster (even with --threads 1). I even set both machines to the same Julia + Distributed versions.

affans avatar Jul 12 '25 14:07 affans