Dagger.jl
Dagger.jl copied to clipboard
Various errors working with DTables.jl
I tried to create a MWE that was closer to the actual workflow I'm working with. I'm guessing the errors occurring here are related to #437 (one of the four reported errors below is the same as the linked issue). I hope this is helpful and not just extra noise!
Contents of mwe.jl
:
using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)
@everywhere using DTables, DataFrames, CSV
@everywhere job_channel = Channel(100)
remotecall(2) do
while true
job = take!(job_channel)
try
func = job[1]
args = job[2:end]
func(args...)
catch ex
@info "error $ex"
@info "stacktrace: $(stacktrace(catch_backtrace()))"
end
end
end
remotecall_fetch(2) do
dt = DTable(x -> CSV.File(x), ["file.csv"]; tabletype = DataFrame)
df = fetch(dt)
cols1 = [df[!, c] for c in 1:48]
cols2 = [df[!, c] for c in 49:102]
cols = (cols1, cols2)
cols_appended = (cols1, (cols2..., rand(length(cols2[1]))))
df = DataFrame(
(names(df)[1:48] .=> cols_appended[1])...,
((names(df)[49:102]..., "appended") .=> cols_appended[2])...;
copycols = false,
)
dt = DTable(df)
@info "$(length(dt))"
@info "$(length(dt))"
df = fetch(dt)
cols1 = [df[!, c] for c in 1:48]
cols2 = [df[!, c] for c in 49:102]
cols = (cols1, cols2)
df = fetch(dt)
foreach((:new1, :new2), (rand(length(dt)), rand(length(dt)))) do name, val
setproperty!(df, name, val)
end
dt = DTable(df)
i = [6, 12, 48, 93, 94]
dt = select(dt, i...; copycols = false)
gdt = groupby(dt, Symbol.(names(df)[[6, 12, 48]]))
gkeys = sort!(collect(keys(gdt)))
sums = map(gkeys) do key
reduce(+, gdt[key]; cols = Symbol.(names(df)[[93, 94]]))
end .|> fetch
end
I include
d mwe.jl
in a fresh Julia session multiple times (meaning each include
occurred in its own fresh Julia session) and recorded the following errors. Note that nothing changed in mwe.jl
from run to run.
Error 1:
julia> include("mwe.jl")
From worker 2: [ Info: 233930
From worker 2: [ Info: 233930
ERROR: LoadError: On worker 2:
MethodError: Cannot `convert` an object of type
Vector{Any} to an object of type
Union{Dagger.Thunk, Dagger.Chunk}
Closest candidates are:
convert(::Type{T}, ::T) where T
@ Base Base.jl:64
Stacktrace:
[1] get!
@ ./dict.jl:455
[2] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
[3] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
[4] #eager_submit_internal!#96
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
[5] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
[6] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
[7] #invokelatest#2
@ ./essentials.jl:819
[8] invokelatest
@ ./essentials.jl:816
[9] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[10] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[11] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[12] #109
@ ./task.jl:514
Stacktrace:
[1] #remotecall_fetch#159
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] eager_submit!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
[6] eager_launch!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
[7] enqueue!
@ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
[8] #spawn#88
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
[9] spawn
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
[10] #39
@ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35 [inlined]
[11] iterate
@ ./generator.jl:47 [inlined]
[12] _collect
@ ./array.jl:802
[13] collect_similar
@ ./array.jl:711
[14] map
@ ./abstractarray.jl:3263
[15] map
@ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35
[16] _manipulate
@ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:89
[17] #manipulate#247
@ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:48
[18] #select#258
@ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:171
[19] #7
@ ~/tmp/mwe.jl:47
[20] #invokelatest#2
@ ./essentials.jl:819 [inlined]
[21] invokelatest
@ ./essentials.jl:816
[22] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[23] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[24] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[25] #109
@ ./task.jl:514
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch(::Function, ::Distributed.Worker)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch(::Function, ::Int64)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] top-level scope
@ ~/tmp/mwe.jl:22
[6] include(fname::String)
@ Base.MainInclude ./client.jl:478
[7] top-level scope
@ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22
Error 2:
julia> include("mwe.jl")
From worker 2: [ Info: 233930
From worker 2: [ Info: 233930
ERROR: LoadError: On worker 2:
UndefRefError: access to undefined reference
Stacktrace:
[1] getindex
@ ./essentials.jl:13 [inlined]
[2] get!
@ ./dict.jl:465
[3] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
[4] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
[5] #eager_submit_internal!#96
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
[6] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
[7] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
[8] #invokelatest#2
@ ./essentials.jl:819
[9] invokelatest
@ ./essentials.jl:816
[10] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[11] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[12] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[13] #109
@ ./task.jl:514
Stacktrace:
[1] #remotecall_fetch#159
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] eager_submit!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
[6] eager_launch!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
[7] enqueue!
@ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
[8] #spawn#88
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
[9] spawn
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
[10] #15
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
[11] iterate
@ ./generator.jl:47 [inlined]
[12] collect
@ ./array.jl:782
[13] chunk_lengths
@ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:254
[14] length
@ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:258
[15] #7
@ ~/tmp/mwe.jl:42
[16] #invokelatest#2
@ ./essentials.jl:819 [inlined]
[17] invokelatest
@ ./essentials.jl:816
[18] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[19] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[20] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[21] #109
@ ./task.jl:514
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch(::Function, ::Distributed.Worker)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch(::Function, ::Int64)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] top-level scope
@ ~/tmp/mwe.jl:22
[6] include(fname::String)
@ Base.MainInclude ./client.jl:478
[7] top-level scope
@ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22
Error 3:
julia> include("mwe.jl")
From worker 2: [ Info: 233930
From worker 2: [ Info: 233930
[7463] signal (11.1): Segmentation fault
in expression starting at /home/steven/tmp/mwe.jl:22
jl_object_id__cold at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/builtins.c:417
type_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1332
typekey_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1344
jl_precompute_memoized_dt at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1409
inst_datatype_inner at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1731
jl_inst_arg_tuple_type at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1826
arg_type_tuple at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2100 [inlined]
jl_lookup_generic_ at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2884 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2936
collect_task_inputs at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:392
signature at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:256
#99 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:680
lock at ./lock.jl:229
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
scheduler_run at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:508
#compute_dag#82 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:449
compute_dag at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:414 [inlined]
#compute#141 at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:23
compute at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:22 [inlined]
macro expansion at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/eager.jl:28 [inlined]
#50 at ./threadingconstructs.jl:410
unknown function (ip: 0x7efbf8213f8f)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
jl_apply at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/julia.h:1880 [inlined]
start_task at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/task.c:1092
Allocations: 34071935 (Pool: 34043867; Big: 28068); GC: 39
zsh: segmentation fault julia --project
Error 3b: Occasionally the segfault was preceded by one or more occurrences of:
Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
Stacktrace:
[1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
@ Base ./abstractarray.jl:949
[2] _collect
@ ./array.jl:713 [inlined]
[3] collect
@ ./array.jl:707 [inlined]
[4] macro expansion
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1189 [inlined]
[5] (::Dagger.Sch.var"#126#133"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
@ Dagger.Sch ./task.jl:134
Error 4:
julia> include("mwe.jl")
From worker 2: [ Info: 233930
From worker 2: [ Info: 233930
ERROR: LoadError: On worker 2:
AssertionError: Multiple concurrent writes to Dict detected!
Stacktrace:
[1] rehash!
@ ./dict.jl:208
[2] _setindex!
@ ./dict.jl:355 [inlined]
[3] get!
@ ./dict.jl:477
[4] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
[5] reschedule_syncdeps!
@ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
[6] #eager_submit_internal!#96
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
[7] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
[8] eager_submit_internal!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
[9] #invokelatest#2
@ ./essentials.jl:819
[10] invokelatest
@ ./essentials.jl:816
[11] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[12] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[13] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[14] #109
@ ./task.jl:514
Stacktrace:
[1] #remotecall_fetch#159
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] eager_submit!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
[6] eager_launch!
@ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
[7] enqueue!
@ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
[8] #spawn#88
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
[9] spawn
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
[10] #48
@ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
[11] iterate
@ ./generator.jl:47 [inlined]
[12] collect_to!
@ ./array.jl:840 [inlined]
[13] collect_to_with_first!
@ ./array.jl:818 [inlined]
[14] collect
@ ./array.jl:792
[15] #reduce#42
@ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:111
[16] #14
@ ~/tmp/mwe.jl:51
[17] iterate
@ ./generator.jl:47 [inlined]
[18] collect_to!
@ ./array.jl:840 [inlined]
[19] collect_to_with_first!
@ ./array.jl:818 [inlined]
[20] _collect
@ ./array.jl:812
[21] collect_similar
@ ./array.jl:711
[22] map
@ ./abstractarray.jl:3263
[23] #7
@ ~/tmp/mwe.jl:50
[24] #invokelatest#2
@ ./essentials.jl:819 [inlined]
[25] invokelatest
@ ./essentials.jl:816
[26] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[27] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[28] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[29] #109
@ ./task.jl:514
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch(::Function, ::Distributed.Worker)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch(::Function, ::Int64)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
[5] top-level scope
@ ~/tmp/mwe.jl:22
[6] include(fname::String)
@ Base.MainInclude ./client.jl:478
[7] top-level scope
@ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22
Comments:
- The segfault was by far the most common error; the others occurred just once each (over the 10--15 trials I ran).
- In my actual work, I don't think I've ever come across the
MethodError
withconvert
(error 1). I most commonly run into the error mentioned in https://github.com/JuliaParallel/Dagger.jl/issues/437#issuecomment-1739631443, which I did not see withmwe.jl
. -
"file.csv"
is a 157 MB table with 233930 rows and 102 columns ofString
andFloat64
values. - The
remotecall
probably isn't necessary for reproducing the bugs, but I included it because that is how my actual work is.
@StevenWhitaker can you try reproducing these again on Dagger master
?
Thanks for getting a patch released!
The issues are different now, so that's something ;)
Now I observe the following behavior (EDIT: when running Julia with multiple threads):
- Sometimes the code runs to completion (yay!).
- Sometimes the code hangs when computing
sums
(not sure if it was inmap
orreduce
orfetch
). - Sometimes the following error occurs when computing
sums
:
After this error, most of the time it hangs, sometimes it runs to completion.Unhandled Task ERROR: ArgumentError: destination has fewer elements than required Stacktrace: [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}}) @ Base ./abstractarray.jl:949 [2] _collect @ ./array.jl:713 [inlined] [3] collect @ ./array.jl:707 [inlined] [4] macro expansion @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1189 [inlined] [5] (::Dagger.Sch.var"#128#135"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})() @ Dagger.Sch ./task.jl:134
I realized that I start Julia with multiple threads by default, so I also ran the code with a single thread (julia --project -t1
). In this case, I saw the Unhandled Task ERROR
once (incidentally, the first time), and every time I ran the code (including the first time) it ran to completion.
So, besides the one sporadic error, this issue seems to be addressed, assuming the issues I observed with multiple threads are due to the interplay between Distributed
and Threads
.
Edit to my previous comment:
I'm running my actual code with a single thread now, and it also hangs, so there might be something else still at play.
I can reproduce the hangs - I'll keep investigating! Thanks for your patience :slightly_smiling_face:
Running through your example with Dagger's logging enabled, I find that we spend a good bit of time (about 0.3-0.5 s for me) in the reduce
calls at the end, which are running in serial over 233K keys - at this pace, I can see why it looks like it's hanging :laughing:
A large portion of the time is spent in the GC (about 40% time over ~80K allocations totaling ~500MB), so I suspect allocations are what's killing performance. If I can figure out how to reduce those allocations, it would also be reasonable to parallelize the reduce
calls (by doing two map
s, one to launch a task per key, and one to fetch the results), and that should give us much better runtimes.
Additionally, the other calls that took a while are select
and groupby
, so we could probably look into improving those a bit.
EDIT: Those timings and allocations are so high because of logging - they drop significantly when logging is disabled, although then I see a ton of long-lived allocations that threaten to crash Julia. I still need to see if some of those allocations can be reduced.
EDIT 2: Silly me, these reductions are already asynchronous :smile: I guess the task completes before we return from reduce
anyway, since we're only running with 1 thread.
Ok, something that I would recommend is, instead of the map
-> reduce
pattern, just use a single reduce
call: reduce(+, gdt; cols=Symbol.(names(df)[[93,94]]))
. This appears to be much more memory and time efficient, which makes sense because it can internally do more optimizations (it already knows that you intend to reduce over each key in the group).
Can you test that and confirm whether it speeds your script up sufficiently for it to complete in a reasonable amount of time?
Thanks for the tip. I tried it out on my actual project (not the exact example in the OP), and it does seem to help, but I still see the code hang occasionally. I'm pretty sure it's not just taking forever, because when the code does complete, it doesn't take that long, and when it hangs the cpu utilization drops to 0.
It actually seems to be the case that my code hangs only when calling my main function again after a successful run. Or at least the chances of hanging are higher in that case. I'm not really sure why that would be the case, though.
I also saw a new error (when calling fetch
on a DTable
, with Dagger v0.18.4 and DTables v0.4.2):
Dagger.ThunkFailedException{Dagger.ThunkFailedException{CapturedException}}(Thunk[3](isnonempty, Any[Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame])]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Dagger.ThunkFailedException{CapturedException}(Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), CapturedException(UndefRefError(), Any[(getindex at essentials.jl:13 [inlined], 1), (get! at dict.jl:465, 1), (OSProc at processor.jl:109 [inlined], 2), (do_task at Sch.jl:1368, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#132 at task.jl:134, 1)])))
It looks like it has to do with file loading, so this is the code I use to load .csv files:
DTable(x -> CSV.File(x), [filepath]; tabletype = DataFrame)
I only saw the error once, though.
And another one-time error (in the function with the reduce
call):
From worker 4: ┌ 2023-10-24T13:00:07.238 ] pid: 20516 proc: 4 Error: Error on 4 while connecting to peer 3, exiting
From worker 4: │ exception =
From worker 4: │ ConcurrencyViolationError("lock must be held")
From worker 4: │ Stacktrace:
From worker 4: │ [1] concurrency_violation()
From worker 4: │ @ Base ./condition.jl:8
From worker 4: │ [2] assert_havelock
From worker 4: │ @ ./condition.jl:25 [inlined]
From worker 4: │ [3] assert_havelock
From worker 4: │ @ ./condition.jl:48 [inlined]
From worker 4: │ [4] assert_havelock
From worker 4: │ @ ./condition.jl:72 [inlined]
From worker 4: │ [5] notify(c::Condition, arg::Any, all::Bool, error::Bool)
From worker 4: │ @ Base ./condition.jl:150
From worker 4: │ [6] #notify#622
From worker 4: │ @ ./condition.jl:148 [inlined]
From worker 4: │ [7] notify (repeats 2 times)
From worker 4: │ @ ./condition.jl:148 [inlined]
From worker 4: │ [8] set_worker_state
From worker 4: │ @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:148 [inlined]
From worker 4: │ [9] Distributed.Worker(id::Int, r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, manager::Distributed.DefaultClusterManager; version::Nothing, config::WorkerConfig)
From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:126
From worker 4: │ [10] Worker
From worker 4: │ @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:116 [inlined]
From worker 4: │ [11] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int, wconfig::WorkerConfig)
From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:363
From worker 4: │ [12] (::Distributed.var"#121#123"{Int, WorkerConfig})()
From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:349
From worker 4: │ [13] exec_conn_func(w::Distributed.Worker)
From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:181
From worker 4: │ [14] (::Distributed.var"#21#24"{Distributed.Worker})()
From worker 4: └ @ Distributed ./task.jl:514
The above errors occurred when calling my main function the first time.