DataFrames.jl icon indicating copy to clipboard operation
DataFrames.jl copied to clipboard

Thread Safety

Open clintonTE opened this issue 5 years ago • 24 comments

Not sure if this is a feature request or a bug report. It is unclear to me what operations pertaining to DataFrames, if any, are thread-safe. For example, I would have expected the below code to be thread-safe since each thread is operating on different parts of the memory, yet it explodes in a cloud of corruption:

using DataFrames
#WARNING: DO NOT RUN THIS

function tsmwecorrupt(N=100_000)
  df = DataFrame(rand(N,100))
  df.grpcol = (i->i%50).(1:N)

  Threads.@threads for sdf ∈ groupby(df, :grpcol)
    sdf.x3 .= -1.
  end

  println(sum(df.x3))
end

tsmwecorrupt()

OUTPUT: 
(many pages of garbage)

On the other hand, this code seems fine:

function tsmwe(N=100_000)
  df = DataFrame(rand(N,100))
  df.grpcol = (i->i%50).(1:N)

  Threads.@threads for r ∈ eachrow(df)
    r.x3 = -1.
  end

  println(sum(df.x3))
end

tsmwe()

OUTPUT: 
-100000.0

and so does this:

function tsmwe2(N=100_000)
  df = DataFrame(rand(N,100))
  df.grpcol = (i->i%50).(1:N)

  Threads.@threads for sdf ∈ collect(groupby(df, :grpcol))
    sdf.x3 .= -1.
  end

  println(sum(df.x3))
end


tsmwe2()

OUTPUT: 
-100000.0

If this is not a bug, then perhaps this issue can serve as a feature request for thread safety under a wider variety of use cases.

EDIT: Also see 1896

clintonTE avatar Jul 29 '19 20:07 clintonTE

This is a problem with Threads.@threads which assumes that it iterates over a range or an array and nothing else is allowed.

Check out this code as an example:

julia> Threads.@threads for c in "ssf"
       end

Error thrown in threaded loop on thread 0: MethodError(f=typeof(Base.unsafe_getindex)(), args=("ssf", 1), world=0x000000000000643a)

it fails for the same reason.

The way you should fix the code is to write:

  g = groupby(df, :grpcol)
  Threads.@threads for i in 1:length(g)
    sdf = g[i]
    sdf.x3 .= -1.
  end

as now you iterate over a range.

I am leaving this open as @nalimilan who is working on groupby infrastructure might consider:

  1. adding axes to GroupedDataFrame object (now we do not define it)
  2. and even making a GroupedDataFrame an abstract array (then your code would work); but this is unlikely - I assume that his intention is to rather make it an AbstractDataFrame in the future (but we can discuss it)

bkamins avatar Jul 29 '19 21:07 bkamins

Got it, makes sense- thanks.

clintonTE avatar Jul 29 '19 22:07 clintonTE

This is a problem with Threads.@threads which assumes that it iterates over a range or an array and nothing else is allowed.

I'm not familiar with @threads, so can you explain why it crashes in the provided example? If we defined axes, would it use it? We should probably define it anyway (but not make GroupedDataFrame an AbstractVector since we want e.g. map to return another GroupedDataFrame, not a vector).

EDIT: cross-ref https://discourse.julialang.org/t/threaded-for-loop-over-arbitrary-indexable-objects/26957/1

nalimilan avatar Jul 30 '19 14:07 nalimilan

If we defined axes, would it use it? We should probably define it anyway

It would not use it, but I think we should define axes as then it would be possible to broadcast over GroupedDataFrame.

so can you explain why it crashes in the provided example?

Threads.@threads expands to use Base.unsafe_getindex internally, and you have:

julia> methods(Base.unsafe_getindex)
# 4 methods for generic function "unsafe_getindex":
[1] unsafe_getindex(r::StepRangeLen{T,#s72,#s71} where #s71<:Base.TwicePrecision where #s72<:Base.TwicePrecision, i::Integer) where T in Base at twiceprecision.jl:447

[2] unsafe_getindex(r::StepRangeLen{T,R,S} where S where R, i::Integer) where T in Base at range.jl:644
[3] unsafe_getindex(r::LinRange, i::Integer) in Base at range.jl:654
[4] unsafe_getindex(A::AbstractArray, I...) in Base at abstractarray.jl:930

and its definition is simply:

function unsafe_getindex(A::AbstractArray, I...)
    @_inline_meta
    @inbounds r = getindex(A, I...)
    r
end

(for the most common case) so essentially we could define unsafe_getindex ourselves to make it work, but first I want to clarify what core devs want in Base (that is why I have asked on Discourse)

we want e.g. map to return another GroupedDataFrame, not a vector

Actually it would be easy to override map and moreover it would be very simple to override broadcasting to return a GroupedDataFrame (so that transformation.(grouped_data_frame) returns a GroupedDataFrame). This is a more complex decision though so I would defer it (but we can safely add axes now if we want).

bkamins avatar Jul 30 '19 15:07 bkamins

OK. Actually I've realized that if we define size we will get axes for free. But I'm not sure whether we should do that, as it means GroupedDataFrame would not be just a collection, but a kind of array, which isn't really the case. We could support axes anyway since tuples also support it (even if they don't support size), and it's associated with getindex. Anyway, it looks like @threads should really allow for any indexable, not just arrays.

Regarding broadcast, I guess we could allow it, but it would basically just be a shorthand for map. So maybe better wait until we're sure we don't want to do something else with it (no idea what).

nalimilan avatar Jul 30 '19 15:07 nalimilan

And I just realized that somehow broadcasting does not requite axes 😄:

julia> describe.(gdf)
3-element Array{DataFrame,1}:
 4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean     │ min      │ median   │ max      │ nunique │
│     │ Symbol   │ Float64  │ Float64  │ Float64  │ Float64  │ Nothing │
├─────┼──────────┼──────────┼──────────┼──────────┼──────────┼─────────┤
│ 1   │ x1       │ 0.669556 │ 0.669556 │ 0.669556 │ 0.669556 │         │
│ 2   │ x2       │ 0.375771 │ 0.375771 │ 0.375771 │ 0.375771 │         │
│ 3   │ x3       │ 0.978432 │ 0.978432 │ 0.978432 │ 0.978432 │         │
│ 4   │ x4       │ 0.786694 │ 0.786694 │ 0.786694 │ 0.786694 │         │
 4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean      │ min       │ median    │ max       │ nunique │
│     │ Symbol   │ Float64   │ Float64   │ Float64   │ Float64   │ Nothing │
├─────┼──────────┼───────────┼───────────┼───────────┼───────────┼─────────┤
│ 1   │ x1       │ 0.450037  │ 0.450037  │ 0.450037  │ 0.450037  │         │
│ 2   │ x2       │ 0.0699309 │ 0.0699309 │ 0.0699309 │ 0.0699309 │         │
│ 3   │ x3       │ 0.900284  │ 0.900284  │ 0.900284  │ 0.900284  │         │
│ 4   │ x4       │ 0.72257   │ 0.72257   │ 0.72257   │ 0.72257   │         │
 4×8 DataFrame. Omitted printing of 2 columns
│ Row │ variable │ mean     │ min      │ median   │ max      │ nunique │
│     │ Symbol   │ Float64  │ Float64  │ Float64  │ Float64  │ Nothing │
├─────┼──────────┼──────────┼──────────┼──────────┼──────────┼─────────┤
│ 1   │ x1       │ 0.934747 │ 0.934747 │ 0.934747 │ 0.934747 │         │
│ 2   │ x2       │ 0.43983  │ 0.43983  │ 0.43983  │ 0.43983  │         │
│ 3   │ x3       │ 0.753521 │ 0.753521 │ 0.753521 │ 0.753521 │         │
│ 4   │ x4       │ 0.34014  │ 0.34014  │ 0.34014  │ 0.34014  │         │

(although the docs says it should)

bkamins avatar Jul 30 '19 16:07 bkamins

Funny. It works because it falls back to collect(gdf). I don't really like that, since that means any future change will be breaking, even if we didn't intent this to work. Maybe we should just throw an error for now?

nalimilan avatar Aug 02 '19 16:08 nalimilan

See https://github.com/JuliaData/DataFrames.jl/pull/1907

bkamins avatar Aug 02 '19 18:08 bkamins

Reopening.

bkamins avatar Sep 09 '19 11:09 bkamins

You mean that we should disallow collect for GroupedDataFrame? What would be a benefit of this? We support iterate on it.

bkamins avatar Feb 12 '20 11:02 bkamins

I think I meant disallowing broadcasting on GroupedDataFrame, which #1907 did.

Though we still need to get @threads in Julia to work on custom types?

nalimilan avatar Feb 13 '20 21:02 nalimilan

Though we still need to get @threads in Julia to work on custom types?

Probably yes, but if I understand things correctly the API for threading might still change a bit in the future so I would not change anything in DataFrames.jl for now.

So I am marking it as non-breaking feature and remove the milestone (but leave it open so we can come back to it in the future).

bkamins avatar Feb 13 '20 22:02 bkamins

i'm having trouble following along here. do you mean to say that DataFrames.jl is not thread safe?

i ask, because i'm getting a ERROR: LoadError: TaskFailedException ... nested task error: StackOverflowError: when putting a select!(df, ...) inside a Threads.@threads. i can try to get a MWE if you like.

bjarthur avatar Jun 08 '23 16:06 bjarthur

when putting a select!(df, ...)

But is df thread local? If yes - then can you please give an example?

If df is not thread local then this is expected and select! cannot be thread safe (as it is an in-place operation).

bkamins avatar Jun 08 '23 21:06 bkamins

df is thread local. here's a MWE.

seems to be a problem only when using >10_000 columns one of whose name is String and the rest are String15:

julia> using CSV, DataFrames

julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x19991"
 "x19992"
 "x19993"
 "x19994"
 "x19995"
 "x19996"
 "x19997"
 "x19998"
 "x19999"
 "x20000"

julia> Threads.@threads for _=1:10
           df = DataFrame("y"=>1)
           for col in cols
               df[!,col] .= 0
           end 
           select!(df, ["y", reverse(cols)...])
       end
ERROR: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:345 [inlined]
 [2] threading_run(fun::var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}}, static::Bool)
   @ Base.Threads ./threadingconstructs.jl:38
 [3] top-level scope
   @ ./threadingconstructs.jl:89

    nested task error: StackOverflowError:
    Stacktrace:
     [1] promote_typeof(::String15, ::String15, ::Vararg{String15}) (repeats 18570 times)
       @ Base ./promotion.jl:339
     [2] vect(::String, ::Vararg{Any})
       @ Base ./array.jl:144
     [3] macro expansion
       @ ./REPL[85]:6 [inlined]
     [4] (::var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}})(tid::Int64; onethread::Bool)
       @ Main ./threadingconstructs.jl:84
     [5] #422#threadsfor_fun
       @ ./threadingconstructs.jl:51 [inlined]
     [6] (::Base.Threads.var"#1#2"{var"#422#threadsfor_fun#53"{var"#422#threadsfor_fun#52#54"{UnitRange{Int64}}}, Int64})()
       @ Base.Threads ./threadingconstructs.jl:30

no problem with 10_000 columns with one String and the rest String15:

julia> cols = [String15(string('x',x)) for x in 1:10_000]
10000-element Vector{String15}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x9991"
 "x9992"
 "x9993"
 "x9994"
 "x9995"
 "x9996"
 "x9997"
 "x9998"
 "x9999"
 "x10000"

julia> Threads.@threads for _=1:10
           df = DataFrame("y"=>1)
           for col in cols
               df[!,col] .= 0
           end 
           select!(df, ["y", reverse(cols)...])
       end

julia> 

also no problem with 20_000 columns all of whose name is String:

julia> cols = [String(string('x',x)) for x in 1:20_000]
20000-element Vector{String}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x19991"
 "x19992"
 "x19993"
 "x19994"
 "x19995"
 "x19996"
 "x19997"
 "x19998"
 "x19999"
 "x20000"

julia> Threads.@threads for _=1:10
           df = DataFrame("y"=>1)
           for col in cols
               df[!,col] .= 0
           end 
           select!(df, ["y", reverse(cols)...])
       end

julia> 

also no problem if all 20_000 columns are String15:

julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x19991"
 "x19992"
 "x19993"
 "x19994"
 "x19995"
 "x19996"
 "x19997"
 "x19998"
 "x19999"
 "x20000"

julia> Threads.@threads for _=1:10
           df = DataFrame()
           for col in cols
               df[!,col] .= 0
           end 
           select!(df, reverse(cols))
       end

julia> 

also no problem for 20_000 columns with one String and the rest String15 without Threads.@threads:

julia> cols = [String15(string('x',x)) for x in 1:20_000]
20000-element Vector{String15}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x19991"
 "x19992"
 "x19993"
 "x19994"
 "x19995"
 "x19996"
 "x19997"
 "x19998"
 "x19999"
 "x20000"

julia> for _=1:10
           df = DataFrame("y"=>1)
           for col in cols
               df[!,col] .= 0
           end 
           select!(df, ["y", reverse(cols)...])
       end

julia>

so promoting a large number of mixed types in a thread is not being handled correctly. maybe a Julia or CSV.jl problem and not a DataFrames.jl problem?

EDIT: this is with Julia v1.9.1, CSV v0.10.11, and DataFrames v1.5.0

bjarthur avatar Jun 09 '23 13:06 bjarthur

  1. I cannot reproduce it on Win11.
  2. I assume if you run the code with 1 thread enabled it does not error?
  3. What happens if you do Any["y", reverse(cols)...]?
  4. What happens if you drop select! and just try to evaluate ["y", reverse(cols)...]?

In general it seems to be a Julia issue as it happens in ["y", reverse(cols)...] part that is executed before select! is called. But before reporting it let us try to narrow down the issue.

bkamins avatar Jun 09 '23 16:06 bkamins

  1. I cannot reproduce it on Win11.

wow! neither can i!! but i do have the problem on both ubuntu 22.04 and mac os x 13.4. (unrelatedly, i just learned that prompt pasting doesn't work for me in powershell.)

  1. I assume if you run the code with 1 thread enabled it does not error?

nope! even with just one thread it doesn't work

  1. What happens if you do Any["y", reverse(cols)...]?

that fixes it!

  1. What happens if you drop select! and just try to evaluate ["y", reverse(cols)...]?

evaluating just the array is not a problem:

julia> cols
20000-element Vector{String15}:
 "x1"
 "x2"
 "x3"
 "x4"
 "x5"
 "x6"
 "x7"
 "x8"
 "x9"
 "x10"
 ⋮
 "x19991"
 "x19992"
 "x19993"
 "x19994"
 "x19995"
 "x19996"
 "x19997"
 "x19998"
 "x19999"
 "x20000"

julia> ["y", reverse(cols)...]
20001-element Vector{String}:
 "y"
 "x20000"
 "x19999"
 "x19998"
 "x19997"
 "x19996"
 "x19995"
 "x19994"
 "x19993"
 "x19992"
 "x19991"
 "x19990"
 ⋮
 "x10"
 "x9"
 "x8"
 "x7"
 "x6"
 "x5"
 "x4"
 "x3"
 "x2"
 "x1"

julia> 

bjarthur avatar Jun 09 '23 17:06 bjarthur

It seems to be an issue with promotion in InlineStrings.jl. I opened https://github.com/JuliaStrings/InlineStrings.jl/issues/65.

bkamins avatar Jun 09 '23 20:06 bkamins

This is enough to reproduce the bug so it's unrelated to DataFrames:

Threads.@threads for _=1:10
    ["y", reverse(cols)...]
end

Interestingly, I cannot reproduce with a local Julia build of 1.9.0-rc2, but I can with official binaries for the same release. So it seems really dependent on compilation details.

nalimilan avatar Jun 11 '23 14:06 nalimilan

The fact that this is OS/version dependent seems to suggest this is a Base issue, not InlineStrings, right?

quinnj avatar Jun 13 '23 03:06 quinnj

In general yes, but I thought that maybe some promotion rule in InlineStrings.jl could trigger problematic behavior of Base.

bkamins avatar Jun 13 '23 06:06 bkamins

Actually we don't even need InlineStrings: https://github.com/JuliaLang/julia/issues/50284

@bkamins Are you able to reproduce this on Windows using a larger number of values, like 200_000 ? I suspect the stack is just larger there.

nalimilan avatar Jun 24 '23 20:06 nalimilan

The problem seems to boil down to the fact that threads use a smaller stack. Maybe this can be improved, but in general splatting so many values isn't a good idea and ["y"; reverse(cols)] should be preferred.

nalimilan avatar Jun 24 '23 20:06 nalimilan

Yes, even non-threaded:

julia> cols = [x for x in 1:200_000];

julia> ["y", reverse(cols)...]
ERROR: StackOverflowError:

overflows on Windows. In conclusion ["y"; reverse(cols)] should be used.

bkamins avatar Jun 25 '23 08:06 bkamins