arrow-julia icon indicating copy to clipboard operation
arrow-julia copied to clipboard

Do some advanced benchmarking on arrow data + common operations

Open quinnj opened this issue 3 years ago • 10 comments

Chatted briefly with @bkamins on this; we'd like to take some common data processing workflows and run benchmarks comparing arrow data vs. regular in-memory Julia data.

quinnj avatar Apr 23 '21 02:04 quinnj

I will prepare generators of test case files.

bkamins avatar Apr 23 '21 07:04 bkamins

OK. I attach files that are used to generate the test data sets (change the extension from txt to R, as GitHub disallows R as extension)

groupby-datagen.txt join-datagen.txt

The commands to generate the files used in H2O benchmarks are (I selected only 10^8 rows scenario - the middle one):

Rscript groupby-datagen.R 1e8 1e2 0 0
Rscript groupby-datagen.R 1e8 1e1 0 0
Rscript groupby-datagen.R 1e8 2e0 0 0
Rscript groupby-datagen.R 1e8 1e2 0 1
Rscript groupby-datagen.R 1e8 1e2 5 0

Rscript join-datagen.R 1e8 NA 0 0
Rscript join-datagen.R 1e8 NA 5 0
Rscript join-datagen.R 1e8 NA 0 1

it is probably better to first concentrate on joins (and it really does not matter which one, you can start with the Rscript join-datagen.R 1e8 NA 0 1 case)

This is the set to run join benchmarks (you need to insert the names of the files as they differ by the benchmark above; of course for Arrow.jl this would have to be tweaked accordingly but you have the data and the testsets):

using DataFrames
using CSV

# here I give what currently H2O benchmarks use, this should be changed as discussed
x = DataFrame(CSV.File("FILE_X", pool=true)); # here goes the file with NA in its name
small = DataFrame(CSV.File("FILE_Y_1", pool=true));
medium = DataFrame(CSV.File("FILE_Y_2", pool=true));
big = DataFrame(CSV.File("FILE3_Y_3", pool=true));

GC.gc();
@time innerjoin(x, small, on = :id1, makeunique=true, matchmissing=:equal);
GC.gc();
@time innerjoin(x, small, on = :id1, makeunique=true, matchmissing=:equal);

GC.gc();
@time innerjoin(x, medium, on = :id2, makeunique=true, matchmissing=:equal);
GC.gc();
@time innerjoin(x, medium, on = :id2, makeunique=true, matchmissing=:equal);

GC.gc();
@time leftjoin(x, medium, on = :id2, makeunique=true, matchmissing=:equal);
GC.gc();
@time leftjoin(x, medium, on = :id2, makeunique=true, matchmissing=:equal);

GC.gc();
@time innerjoin(x, medium, on = :id5, makeunique=true, matchmissing=:equal);
GC.gc();
@time innerjoin(x, medium, on = :id5, makeunique=true, matchmissing=:equal);

GC.gc();
@time innerjoin(x, big, on = :id3, makeunique=true, matchmissing=:equal);
GC.gc();
@time innerjoin(x, big, on = :id3, makeunique=true, matchmissing=:equal);

and these benchmarks are run on groupby:

using DataFrames
using CSV
using Statistics

# again - I am showing what they use now; this should be tweaked; the file name needs to be updated
x = DataFrame(CSV.File(
  "FILE_NAME", pool=true,
  types=[PooledString, PooledString, PooledString, Int, Int, Int, Int, Int, Float64]
));


GC.gc();
@time combine(groupby(x, :id1), :v1 => sum∘skipmissing => :v1);
GC.gc();
@time combine(groupby(x, :id1), :v1 => sum∘skipmissing => :v1);

GC.gc();
@time combine(groupby(x, [:id1, :id2]), :v1 => sum∘skipmissing => :v1);
GC.gc();
@time combine(groupby(x, [:id1, :id2]), :v1 => sum∘skipmissing => :v1);

GC.gc();
@time combine(groupby(x, :id3), :v1 => sum∘skipmissing => :v1, :v3 => mean∘skipmissing => :v3);
GC.gc();
@time combine(groupby(x, :id3), :v1 => sum∘skipmissing => :v1, :v3 => mean∘skipmissing => :v3);

GC.gc();
@time combine(groupby(x, :id4), :v1 => mean∘skipmissing => :v1, :v2 => mean∘skipmissing => :v2, :v3 => mean∘skipmissing => :v3);
GC.gc();
@time combine(groupby(x, :id4), :v1 => mean∘skipmissing => :v1, :v2 => mean∘skipmissing => :v2, :v3 => mean∘skipmissing => :v3);

GC.gc();
@time combine(groupby(x, :id6), :v1 => sum∘skipmissing => :v1, :v2 => sum∘skipmissing => :v2, :v3 => sum∘skipmissing => :v3);
GC.gc();
@time combine(groupby(x, :id6), :v1 => sum∘skipmissing => :v1, :v2 => sum∘skipmissing => :v2, :v3 => sum∘skipmissing => :v3);

GC.gc();
@time combine(groupby(x, [:id4, :id5]), :v3 => median∘skipmissing => :median_v3, :v3 => std∘skipmissing => :sd_v3);
GC.gc();
@time combine(groupby(x, [:id4, :id5]), :v3 => median∘skipmissing => :median_v3, :v3 => std∘skipmissing => :sd_v3);

GC.gc();
@time combine(groupby(x, :id3), [:v1, :v2] => ((v1, v2) -> maximum(skipmissing(v1))-minimum(skipmissing(v2))) => :range_v1_v2);
GC.gc();
@time combine(groupby(x, :id3), [:v1, :v2] => ((v1, v2) -> maximum(skipmissing(v1))-minimum(skipmissing(v2))) => :range_v1_v2);

GC.gc();
@time combine(groupby(dropmissing(x, :v3), :id6), :v3 => (x -> partialsort!(x, 1:min(2, length(x)), rev=true)) => :largest2_v3);
GC.gc();
@time combine(groupby(dropmissing(x, :v3), :id6), :v3 => (x -> partialsort!(x, 1:min(2, length(x)), rev=true)) => :largest2_v3);

function cor2(x, y)
    nm = @. !ismissing(x) & !ismissing(y)
    return count(nm) < 2 ? NaN : cor(view(x, nm), view(y, nm))
end

GC.gc();
@time combine(groupby(x, [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor2(v1, v2)^2) => :r2);
GC.gc();
@time combine(groupby(x, [:id2, :id4]), [:v1, :v2] => ((v1,v2) -> cor2(v1, v2)^2) => :r2);

GC.gc();
@time combine(groupby(x, [:id1, :id2, :id3, :id4, :id5, :id6]), :v3 => sum∘skipmissing => :v3, :v3 => length => :count);
GC.gc();
@time combine(groupby(x, [:id1, :id2, :id3, :id4, :id5, :id6]), :v3 => sum∘skipmissing => :v3, :v3 => length => :count);

CC @nalimilan

bkamins avatar Apr 23 '21 08:04 bkamins

@quinnj is the info above enough for you?

bkamins avatar Apr 23 '21 23:04 bkamins

Yes, this is very helpful. I'm going to dive in to these early next week.

quinnj avatar Apr 24 '21 04:04 quinnj

Ah - following the recommendation by Julia Base devs above all GC.gc() should be also followed by GC.gc(false) (as in the PR to H2O benchmarks). But this should be a minor thing (mostly affecting the triggering of GC we discussed a few days ago).

bkamins avatar Apr 24 '21 08:04 bkamins

Unfortunately, it turns out I can't figure out how to get R + data.table installed on my macOS, so I'm having trouble running these exact benchmarks. But I'm at least using the innerjoin syntax on a local file I can read in. If there's a possibility of generating the data and allowing it to be downloaded from somewhere, I dont' mind a several GB tar.gz if possible.

quinnj avatar May 01 '21 07:05 quinnj

These files are very big. Here is a generator for groupby benchmark (without sorting but I think we can skip it for now):

using DataFrames, CSV

N = 100_000_000
K = 100 # also test 10 and 2
nas = 5 # also test 0

DT = DataFrame()
DT.id1 = rand("id" .* string.(1:K, pad=3), N)
DT.id2 = rand("id" .* string.(1:K, pad=3), N)
DT.id3 = rand("id" .* string.(1:N÷K, pad=10), N)
DT.id4 = rand(1:K, N)
DT.id5 = rand(1:K, N)
DT.id6 = rand(1:N÷K, N)
DT.v1 =  rand(1:5, N)
DT.v2 =  rand(1:15, N)
DT.v3 =  round.(100 .* rand(N), digits=6)
if nas > 0
    allowmissing!(DT)
    for i in 1:6
        DT[rand(axes(DT, 1), nrow(DT) * nas ÷ 100), i] .= missing
    end
end
CSV.write("G_$(N)_$(K)_$(nas).csv")

I will write the generator for join benchmark later today

bkamins avatar May 01 '21 10:05 bkamins

Here is a generator for join data (without sorting and missing imputation but for now it should be enough):

using DataFrames, CSV, Random

N = 100_000_000

function split_xlr(n)
  key = shuffle(1:n + n ÷ 10)
  return (lhs=key[1:n], rhs=key[[1:(n÷10*9); (n+1):length(key)]])
end

function sample_all(x, n)
    @assert length(x) <= n
      y = [x; rand(x, max(n-length(x), 0))]
    @assert length(y) == n
    return shuffle(y)
end

key1 = split_xlr(N ÷ 1_000_000)
key2 = split_xlr(N ÷ 1000)
key3 = split_xlr(N)

x = DataFrame()
x.id1 = sample_all(key1.lhs, N)
x.id2 = sample_all(key2.lhs, N)
x.id3 = sample_all(key3.lhs, N)
@assert length(Set(x.id1)) == N ÷ 1_000_000
@assert length(Set(x.id2)) == N ÷ 1_000
@assert length(Set(x.id3)) == N
x.id4 = "id" .* string.(x.id1)
x.id5 = "id" .* string.(x.id2)
x.id6 = "id" .* string.(x.id3)
x.v1 = round.(100 .* rand(N), digits=6)
CSV.write("J_X_$(N).csv", x)

n = N ÷ 1_000_000
small = DataFrame()
small.id1 = sample_all(key1.rhs, n)
small.id4 = "id" .* string.(small.id1)
small.v2 = round.(100 .* rand(n), digits=6)
CSV.write("J_small_$(N).csv", small)

n = N ÷ 1_000
medium = DataFrame()
medium.id1 = sample_all(key1.rhs, n)
medium.id2 = sample_all(key2.rhs, n)
medium.id4 = "id" .* string.(medium.id1)
medium.id5 = "id" .* string.(medium.id2)
medium.v2 = round.(100 .* rand(n), digits=6)
CSV.write("J_medium_$(N).csv", medium)

n = N
big = DataFrame()
big.id1 = sample_all(key1.rhs, n)
big.id2 = sample_all(key2.rhs, n)
big.id3 = sample_all(key3.rhs, n)
big.id4 = "id" .* string.(big.id1)
big.id5 = "id" .* string.(big.id2)
big.id6 = "id" .* string.(big.id3)
big.v2 = round.(100 .* rand(n), digits=6)
CSV.write("J_big_$(N).csv", big)

bkamins avatar May 01 '21 11:05 bkamins

@quinnj - I have edited a typo in big table creation

bkamins avatar May 07 '21 22:05 bkamins

c.f. https://github.com/h2oai/db-benchmark/issues/210

bkamins avatar May 13 '21 16:05 bkamins