DataFrames.jl
DataFrames.jl copied to clipboard
Thread Safety
Not sure if this is a feature request or a bug report. It is unclear to me what operations pertaining to DataFrames, if any, are thread-safe. For example, I would have expected the below code to be thread-safe since each thread is operating on different parts of the memory, yet it explodes in a cloud of corruption:
using DataFrames
#WARNING: DO NOT RUN THIS
function tsmwecorrupt(N=100_000)
df = DataFrame(rand(N,100))
df.grpcol = (i->i%50).(1:N)
Threads.@threads for sdf ∈ groupby(df, :grpcol)
sdf.x3 .= -1.
end
println(sum(df.x3))
end
tsmwecorrupt()
OUTPUT:
(many pages of garbage)
On the other hand, this code seems fine:
function tsmwe(N=100_000)
df = DataFrame(rand(N,100))
df.grpcol = (i->i%50).(1:N)
Threads.@threads for r ∈ eachrow(df)
r.x3 = -1.
end
println(sum(df.x3))
end
tsmwe()
OUTPUT:
-100000.0
and so does this:
function tsmwe2(N=100_000)
df = DataFrame(rand(N,100))
df.grpcol = (i->i%50).(1:N)
Threads.@threads for sdf ∈ collect(groupby(df, :grpcol))
sdf.x3 .= -1.
end
println(sum(df.x3))
end
tsmwe2()
OUTPUT:
-100000.0
If this is not a bug, then perhaps this issue can serve as a feature request for thread safety under a wider variety of use cases.
EDIT: Also see 1896
This is a problem with Threads.@threads
which assumes that it iterates over a range or an array and nothing else is allowed.
Check out this code as an example:
julia> Threads.@threads for c in "ssf"
end
Error thrown in threaded loop on thread 0: MethodError(f=typeof(Base.unsafe_getindex)(), args=("ssf", 1), world=0x000000000000643a)
it fails for the same reason.
The way you should fix the code is to write:
g = groupby(df, :grpcol)
Threads.@threads for i in 1:length(g)
sdf = g[i]
sdf.x3 .= -1.
end
as now you iterate over a range.
I am leaving this open as @nalimilan who is working on groupby
infrastructure might consider:
- adding
axes
toGroupedDataFrame
object (now we do not define it) - and even making a
GroupedDataFrame
an abstract array (then your code would work); but this is unlikely - I assume that his intention is to rather make it anAbstractDataFrame
in the future (but we can discuss it)
Got it, makes sense- thanks.
This is a problem with
Threads.@threads
which assumes that it iterates over a range or an array and nothing else is allowed.
I'm not familiar with @threads
, so can you explain why it crashes in the provided example? If we defined axes
, would it use it? We should probably define it anyway (but not make GroupedDataFrame
an AbstractVector
since we want e.g. map
to return another GroupedDataFrame
, not a vector).
EDIT: cross-ref https://discourse.julialang.org/t/threaded-for-loop-over-arbitrary-indexable-objects/26957/1
If we defined
axes
, would it use it? We should probably define it anyway
It would not use it, but I think we should define axes
as then it would be possible to broadcast over GroupedDataFrame
.
so can you explain why it crashes in the provided example?
Threads.@threads
expands to use Base.unsafe_getindex
internally, and you have:
julia> methods(Base.unsafe_getindex)
# 4 methods for generic function "unsafe_getindex":
[1] unsafe_getindex(r::StepRangeLen{T,#s72,#s71} where #s71<:Base.TwicePrecision where #s72<:Base.TwicePrecision, i::Integer) where T in Base at twiceprecision.jl:447
[2] unsafe_getindex(r::StepRangeLen{T,R,S} where S where R, i::Integer) where T in Base at range.jl:644
[3] unsafe_getindex(r::LinRange, i::Integer) in Base at range.jl:654
[4] unsafe_getindex(A::AbstractArray, I...) in Base at abstractarray.jl:930
and its definition is simply:
function unsafe_getindex(A::AbstractArray, I...)
@_inline_meta
@inbounds r = getindex(A, I...)
r
end
(for the most common case)
so essentially we could define unsafe_getindex
ourselves to make it work, but first I want to clarify what core devs want in Base (that is why I have asked on Discourse)
we want e.g.
map
to return anotherGroupedDataFrame
, not a vector
Actually it would be easy to override map
and moreover it would be very simple to override broadcasting to return a GroupedDataFrame
(so that transformation.(grouped_data_frame)
returns a GroupedDataFrame
). This is a more complex decision though so I would defer it (but we can safely add axes
now if we want).
OK. Actually I've realized that if we define size
we will get axes
for free. But I'm not sure whether we should do that, as it means GroupedDataFrame
would not be just a collection, but a kind of array, which isn't really the case. We could support axes
anyway since tuples also support it (even if they don't support size
), and it's associated with getindex
. Anyway, it looks like @threads
should really allow for any indexable, not just arrays.
Regarding broadcast
, I guess we could allow it, but it would basically just be a shorthand for map
. So maybe better wait until we're sure we don't want to do something else with it (no idea what).
And I just realized that somehow broadcasting does not requite axes
😄:
julia> describe.(gdf)
3-element Array{DataFrame,1}:
4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean │ min │ median │ max │ nunique │
│ │ Symbol │ Float64 │ Float64 │ Float64 │ Float64 │ Nothing │
├─────┼──────────┼──────────┼──────────┼──────────┼──────────┼─────────┤
│ 1 │ x1 │ 0.669556 │ 0.669556 │ 0.669556 │ 0.669556 │ │
│ 2 │ x2 │ 0.375771 │ 0.375771 │ 0.375771 │ 0.375771 │ │
│ 3 │ x3 │ 0.978432 │ 0.978432 │ 0.978432 │ 0.978432 │ │
│ 4 │ x4 │ 0.786694 │ 0.786694 │ 0.786694 │ 0.786694 │ │
4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean │ min │ median │ max │ nunique │
│ │ Symbol │ Float64 │ Float64 │ Float64 │ Float64 │ Nothing │
├─────┼──────────┼───────────┼───────────┼───────────┼───────────┼─────────┤
│ 1 │ x1 │ 0.450037 │ 0.450037 │ 0.450037 │ 0.450037 │ │
│ 2 │ x2 │ 0.0699309 │ 0.0699309 │ 0.0699309 │ 0.0699309 │ │
│ 3 │ x3 │ 0.900284 │ 0.900284 │ 0.900284 │ 0.900284 │ │
│ 4 │ x4 │ 0.72257 │ 0.72257 │ 0.72257 │ 0.72257 │ │
4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean │ min │ median │ max │ nunique │
│ │ Symbol │ Float64 │ Float64 │ Float64 │ Float64 │ Nothing │
├─────┼──────────┼──────────┼──────────┼──────────┼──────────┼─────────┤
│ 1 │ x1 │ 0.934747 │ 0.934747 │ 0.934747 │ 0.934747 │ │
│ 2 │ x2 │ 0.43983 │ 0.43983 │ 0.43983 │ 0.43983 │ │
│ 3 │ x3 │ 0.753521 │ 0.753521 │ 0.753521 │ 0.753521 │ │
│ 4 │ x4 │ 0.34014 │ 0.34014 │ 0.34014 │ 0.34014 │ │
(although the docs says it should)
Funny. It works because it falls back to collect(gdf)
. I don't really like that, since that means any future change will be breaking, even if we didn't intent this to work. Maybe we should just throw an error for now?
See https://github.com/JuliaData/DataFrames.jl/pull/1907
Reopening.
You mean that we should disallow collect
for GroupedDataFrame
? What would be a benefit of this? We support iterate
on it.
I think I meant disallowing broadcasting on GroupedDataFrame
, which #1907 did.
Though we still need to get @threads
in Julia to work on custom types?
Though we still need to get
@threads
in Julia to work on custom types?
Probably yes, but if I understand things correctly the API for threading might still change a bit in the future so I would not change anything in DataFrames.jl for now.
So I am marking it as non-breaking feature and remove the milestone (but leave it open so we can come back to it in the future).
i'm having trouble following along here. do you mean to say that DataFrames.jl is not thread safe?
i ask, because i'm getting a ERROR: LoadError: TaskFailedException ... nested task error: StackOverflowError:
when putting a select!(df, ...)
inside a Threads.@threads
. i can try to get a MWE if you like.
when putting a
select!(df, ...)
But is df
thread local? If yes - then can you please give an example?
If df
is not thread local then this is expected and select!
cannot be thread safe (as it is an in-place operation).
df
is thread local. here's a MWE.
seems to be a problem only when using >10_000 columns one of whose name is String
and the rest are String15
:
julia> using CSV, DataFrames
julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x19991"
"x19992"
"x19993"
"x19994"
"x19995"
"x19996"
"x19997"
"x19998"
"x19999"
"x20000"
julia> Threads.@threads for _=1:10
df = DataFrame("y"=>1)
for col in cols
df[!,col] .= 0
end
select!(df, ["y", reverse(cols)...])
end
ERROR: TaskFailedException
Stacktrace:
[1] wait
@ ./task.jl:345 [inlined]
[2] threading_run(fun::var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}}, static::Bool)
@ Base.Threads ./threadingconstructs.jl:38
[3] top-level scope
@ ./threadingconstructs.jl:89
nested task error: StackOverflowError:
Stacktrace:
[1] promote_typeof(::String15, ::String15, ::Vararg{String15}) (repeats 18570 times)
@ Base ./promotion.jl:339
[2] vect(::String, ::Vararg{Any})
@ Base ./array.jl:144
[3] macro expansion
@ ./REPL[85]:6 [inlined]
[4] (::var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}})(tid::Int64; onethread::Bool)
@ Main ./threadingconstructs.jl:84
[5] #422#threadsfor_fun
@ ./threadingconstructs.jl:51 [inlined]
[6] (::Base.Threads.var"#1#2"{var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}}, Int64})()
@ Base.Threads ./threadingconstructs.jl:30
no problem with 10_000 columns with one String
and the rest String15
:
julia> cols = [String15(string('x',x)) for x in 1:10_000]
10000-element Vector{String15}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x9991"
"x9992"
"x9993"
"x9994"
"x9995"
"x9996"
"x9997"
"x9998"
"x9999"
"x10000"
julia> Threads.@threads for _=1:10
df = DataFrame("y"=>1)
for col in cols
df[!,col] .= 0
end
select!(df, ["y", reverse(cols)...])
end
julia>
also no problem with 20_000 columns all of whose name is String
:
julia> cols = [String(string('x',x)) for x in 1:20_000]
20000-element Vector{String}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x19991"
"x19992"
"x19993"
"x19994"
"x19995"
"x19996"
"x19997"
"x19998"
"x19999"
"x20000"
julia> Threads.@threads for _=1:10
df = DataFrame("y"=>1)
for col in cols
df[!,col] .= 0
end
select!(df, ["y", reverse(cols)...])
end
julia>
also no problem if all 20_000 columns are String15
:
julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x19991"
"x19992"
"x19993"
"x19994"
"x19995"
"x19996"
"x19997"
"x19998"
"x19999"
"x20000"
julia> Threads.@threads for _=1:10
df = DataFrame()
for col in cols
df[!,col] .= 0
end
select!(df, reverse(cols))
end
julia>
also no problem for 20_000 columns with one String
and the rest String15
without Threads.@threads
:
julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x19991"
"x19992"
"x19993"
"x19994"
"x19995"
"x19996"
"x19997"
"x19998"
"x19999"
"x20000"
julia> for _=1:10
df = DataFrame("y"=>1)
for col in cols
df[!,col] .= 0
end
select!(df, ["y", reverse(cols)...])
end
julia>
so promoting a large number of mixed types in a thread is not being handled correctly. maybe a Julia or CSV.jl problem and not a DataFrames.jl problem?
EDIT: this is with Julia v1.9.1, CSV v0.10.11, and DataFrames v1.5.0
- I cannot reproduce it on Win11.
- I assume if you run the code with 1 thread enabled it does not error?
- What happens if you do
Any["y", reverse(cols)...]
? - What happens if you drop
select!
and just try to evaluate["y", reverse(cols)...]
?
In general it seems to be a Julia issue as it happens in ["y", reverse(cols)...]
part that is executed before select!
is called. But before reporting it let us try to narrow down the issue.
- I cannot reproduce it on Win11.
wow! neither can i!! but i do have the problem on both ubuntu 22.04 and mac os x 13.4. (unrelatedly, i just learned that prompt pasting doesn't work for me in powershell.)
- I assume if you run the code with 1 thread enabled it does not error?
nope! even with just one thread it doesn't work
- What happens if you do
Any["y", reverse(cols)...]
?
that fixes it!
- What happens if you drop
select!
and just try to evaluate["y", reverse(cols)...]
?
evaluating just the array is not a problem:
julia> cols
20000-element Vector{String15}:
"x1"
"x2"
"x3"
"x4"
"x5"
"x6"
"x7"
"x8"
"x9"
"x10"
⋮
"x19991"
"x19992"
"x19993"
"x19994"
"x19995"
"x19996"
"x19997"
"x19998"
"x19999"
"x20000"
julia> ["y", reverse(cols)...]
20001-element Vector{String}:
"y"
"x20000"
"x19999"
"x19998"
"x19997"
"x19996"
"x19995"
"x19994"
"x19993"
"x19992"
"x19991"
"x19990"
⋮
"x10"
"x9"
"x8"
"x7"
"x6"
"x5"
"x4"
"x3"
"x2"
"x1"
julia>
It seems to be an issue with promotion in InlineStrings.jl. I opened https://github.com/JuliaStrings/InlineStrings.jl/issues/65.
This is enough to reproduce the bug so it's unrelated to DataFrames:
Threads.@threads for _=1:10
["y", reverse(cols)...]
end
Interestingly, I cannot reproduce with a local Julia build of 1.9.0-rc2, but I can with official binaries for the same release. So it seems really dependent on compilation details.
The fact that this is OS/version dependent seems to suggest this is a Base issue, not InlineStrings, right?
In general yes, but I thought that maybe some promotion rule in InlineStrings.jl could trigger problematic behavior of Base.
Actually we don't even need InlineStrings: https://github.com/JuliaLang/julia/issues/50284
@bkamins Are you able to reproduce this on Windows using a larger number of values, like 200_000 ? I suspect the stack is just larger there.
The problem seems to boil down to the fact that threads use a smaller stack. Maybe this can be improved, but in general splatting so many values isn't a good idea and ["y"; reverse(cols)]
should be preferred.
Yes, even non-threaded:
julia> cols = [x for x in 1:200_000];
julia> ["y", reverse(cols)...]
ERROR: StackOverflowError:
overflows on Windows. In conclusion ["y"; reverse(cols)]
should be used.