JuliaDB.jl icon indicating copy to clipboard operation
JuliaDB.jl copied to clipboard

Load big CSV files in smaller chunks

Open iblislin opened this issue 7 years ago • 18 comments

I read the manual: http://juliadb.org/latest/manual/out-of-core.html But it doesn't help for the case that I have a single huge CSV ~500G.

I want to read a chunk (maybe 10K lines a chunk) for a iteration, then do some preprocessing. Is this possible for JuliaDB?

iblislin avatar Feb 02 '18 04:02 iblislin

No, not yet possible... You could split the files into 256MB or half GB chunks... I think it would be great to have this though..! I'll turn this issue into a feature request.

shashi avatar Feb 02 '18 07:02 shashi

Splitting file is .... infeasible for me. Since it require doubling disk space.

iblislin avatar Feb 02 '18 07:02 iblislin

Why not use the IterableTables/Query machinery?

If you do:

using CSVFiles, FileIO
itr = load(filename)

you get an IterableTables of NamedTuples (lazy, you haven't read the data yet).

You can either get the iterator with getiterator explicitly and take from it (not too sure why you need the getiterator call, I had oped the output of load would be able to iterate already but apparently not):

Iterators.take(TableTraits.getiterator(itr), 10000) |> table

Or do a Query:

using Query
@from i in itr begin
       @select i
       @take 10000
       @collect table
end

I thought the standalone Query would also work, but for some reason it doesn't, maybe a bug?

julia> itr |> @take 10000 |> table
ERROR: ArgumentError: iter cannot be turned into a NextTable.

piever avatar Feb 02 '18 10:02 piever

That's a good approach! However, you also want to be able to load the file in parallel. This may be possible with CSVFiles in the future, I'm not sure if it is already.

I have been thinking about this feature a bit, it's common to have files of different sizes, say 4 files of 10mb, 200mb, 500mb, 1gb.. in this case you want to be able to load them using many process into chunks of approximately equal sizes. This might be possible by reading ~400 MB of CSV at a time i.e. file 1, 2 and some of 3 would be read in by process 1 while the rest of file 3 may be read by process 2, process 3 & 4 may share 4th file... There's a caveat though: the size of the CSV data is not proportional to the size of the data in Julia (e.g. if many NA fields are present, you still take up memory in Julia but not in the CSV)

One has to be careful to give TextParse the first line as the header in each case, otherwise we may end up with incorrect column order.

shashi avatar Feb 02 '18 10:02 shashi

A useful utility (maybe @iblis17 was going for this with ChunkIter) is BlockIO https://github.com/JuliaParallel/Dagger.jl/blob/master/src/lib/block-io.jl#L26 written by Tanmay. It lets you treat a part (range) of a file as a single file, and also allows splitting the file on newlines.

shashi avatar Feb 02 '18 10:02 shashi

right now loadtable takes a vector of filenames (if a directory is provided it will look for all files in it), and does these steps:

  1. split the vector of files into p equal (by number of files) groups -- where p is the number of worker processes (or the chunks argument to loadtable)
  2. call _loadtable_serial with each group on in an async parallel task (using Dagger.delayed) _loadtable_serial in turn calls TextParse.csvread(files::AbstractArray{<:AbstractString}...) method which can read multiple files at a time. This will load these files (a group) as a single table.

I think the changes required for this feature would be:

  1. Find the size of each file, add them up, divide by number of chunks required..
  2. Instead of creating groups of files (vector of vector of file names) we need to create a vector of vector of (filename, range, header) tuples where header is the header read from that file.
  3. Create a csvread(::AbstractArray{Tuple{String, <:Range, <:AbstractVector}} function which takes an element of the above grouping (a vector of these tuples) and reads those parts into a single table.
  4. call this method from _loadtable_serial.

shashi avatar Feb 02 '18 10:02 shashi

I just tried CSVFiles (don't know its existence before)

julia> i = TableTraits.getiterator(iter)                                                                                                          
ERROR: OutOfMemoryError()                                                                                                                         
Stacktrace:
 [1] #csvread#28(::Array{Any,1}, ::Function, ::IOStream, ::Char) at /home/iblis/.julia/v0.6/TextParse/src/csv.jl:77                               
 [2] (::TextParse.#kw##csvread)(::Array{Any,1}, ::TextParse.#csvread, ::IOStream, ::Char) at ./<missing>:0                                        
 [3] (::TextParse.##26#27{Array{Any,1},String,Char})(::IOStream) at /home/iblis/.julia/v0.6/TextParse/src/csv.jl:71                               
 [4] open(::TextParse.##26#27{Array{Any,1},String,Char}, ::String, ::String) at ./iostream.jl:152                                                 
 [5] getiterator(::CSVFiles.CSVFile) at /home/iblis/.julia/v0.6/CSVFiles/src/CSVFiles.jl:31

Seems TextParse doesn't do chunking?

iblislin avatar Feb 02 '18 11:02 iblislin

Query failed, also

julia> @from i in iter begin
              @select i
              @take 10000
       end                                                                                                                                        
ERROR: OutOfMemoryError()                                                                                                                         
Stacktrace:                                                                                                                                       
 [1] #csvread#28(::Array{Any,1}, ::Function, ::IOStream, ::Char) at /home/iblis/.julia/v0.6/TextParse/src/csv.jl:77                               
 [2] (::TextParse.#kw##csvread)(::Array{Any,1}, ::TextParse.#csvread, ::IOStream, ::Char) at ./<missing>:0                                        
 [3] (::TextParse.##26#27{Array{Any,1},String,Char})(::IOStream) at /home/iblis/.julia/v0.6/TextParse/src/csv.jl:71                               
 [4] open(::TextParse.##26#27{Array{Any,1},String,Char}, ::String, ::String) at ./iostream.jl:152                                                 
 [5] getiterator(::CSVFiles.CSVFile) at /home/iblis/.julia/v0.6/CSVFiles/src/CSVFiles.jl:31                                                       
 [6] query(::CSVFiles.CSVFile) at /home/iblis/.julia/v0.6/QueryOperators/src/source_iterable.jl:7

iblislin avatar Feb 02 '18 11:02 iblislin

I think CSVFiles is based on TextParse, and isn't probably using its feature of reading rows line-by-line.

shashi avatar Feb 02 '18 11:02 shashi

My bad, I somehow assumed that CSVFiles would read line by line to produce the NamedTuple iterator but apparently not, it tries to read everything at once. Still, it be nice to have a simple way to go from a csv file to a NamedTuple iterator.

piever avatar Feb 02 '18 11:02 piever

I think CSVFiles is based on TextParse, and isn't probably using its feature of reading rows line-by-line.

@shashi I read the doc about csvread, but do not get how TextParse does read file line-by-line.

iblislin avatar Feb 02 '18 12:02 iblislin

Also found this https://github.com/JuliaParallel/Blocks.jl But it looks unmaintained. Should I devote my effort to Blocks.jl?

iblislin avatar Feb 02 '18 12:02 iblislin

No, if you're planning to contribute to this, I'd suggest just copying block-io.jl file from Dagger.jl I linked above to JuliaDB and working with it...

shashi avatar Feb 02 '18 12:02 shashi

By the way, please review my PR on Dagger.jl :)

iblislin avatar Feb 02 '18 12:02 iblislin

Yes, CSVFiles.jl right now doesn't read line-by-line. It really should, though :) Every time I look into implementing that, though, I start to realize that I would have to copy much of _csvread_internal, and then it doesn't get done...

@shashi, there is not some extra piece somewhere hidden in TextParse.jl that takes an AbstractString and just returns an iterator for the rows, right?

davidanthoff avatar Feb 04 '18 04:02 davidanthoff

Yes, there is something...

Basically you can call tryparsenext with a Record object which represents the parser for a file. An example of creating a Record is here https://github.com/JuliaComputing/TextParse.jl/blob/master/src/csv.jl#L215 (it's created after guessing what the parser should be)

shashi avatar Feb 04 '18 05:02 shashi

Ah, ok!

Could we factor all the stuff above that line 215 into a separate function that I can call from CSVFiles.jl? Then I could re-use all the guessing logic as well and wouldn't have to copy that. I guess roughly that would be a function that takes str and all the options as an argument, and returns guess and anything else I might need that I don't understand yet ;)

Then both _csvread_internal and CSVFiles.jl could call that function and be based on the same detection machinery.

davidanthoff avatar Feb 04 '18 05:02 davidanthoff

I think this is a good request because sometimes our datasets just have one or two big data files. Considering now the input file format is CSV. So an easy helper function to load the data from a big file is just to set up how many lines in each chunk instead of the byte size of each chunk or the total chunk number.

By the way, if the data is saved in ordinary user's disk, parallel reading or parallel writing won't speed up at all unless they save the file in distributed file system. This doesn't matter in this request.

right now loadtable takes a vector of filenames (if a directory is provided it will look for all files in it), and does these steps:

  1. split the vector of files into p equal (by number of files) groups -- where p is the number of worker processes (or the chunks argument to loadtable)
  2. call _loadtable_serial with each group on in an async parallel task (using Dagger.delayed) _loadtable_serial in turn calls TextParse.csvread(files::AbstractArray{<:AbstractString}...) method which can read multiple files at a time. This will load these files (a group) as a single table.

I think the changes required for this feature would be:

  1. Find the size of each file, add them up, divide by number of chunks required..
  2. Instead of creating groups of files (vector of vector of file names) we need to create a vector of vector of (filename, range, header) tuples where header is the header read from that file.
  3. Create a csvread(::AbstractArray{Tuple{String, <:Range, <:AbstractVector}} function which takes an element of the above grouping (a vector of these tuples) and reads those parts into a single table.
  4. call this method from _loadtable_serial.

bearxu83 avatar Feb 23 '19 05:02 bearxu83