tablecloth icon indicating copy to clipboard operation
tablecloth copied to clipboard

support for working on collections of data files

Open behrica opened this issue 3 years ago • 37 comments

see here for initial proposal / discussion: https://github.com/techascent/tech.ml.dataset/issues/145

behrica avatar Oct 22 '20 16:10 behrica

Thanks, will read it tomorrow.

genmeblog avatar Oct 22 '20 20:10 genmeblog

Let's do this! I wonder which path should it go. Can we pack it in dataset call? Or specialized function? I can imagine that multiple CSVs can also be interesting.

I have some code, which constructs the correct "grouped data frame" by iterating over arrow files on disk which are mmaped.

Can you share it? Or maybe you want to make PR?

genmeblog avatar Oct 23 '20 08:10 genmeblog

Continuing, write! can also store multiple files (CSVs, nippy) when grouped dataset is saved.

genmeblog avatar Oct 23 '20 09:10 genmeblog

Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

behrica avatar Oct 23 '20 14:10 behrica

there are several think to consider:

  • The R code does not return a data_frame, but an other thing, on which most dplyr methods can be called

I was creating in my code a "grouped data set".

Is this the best think to do ?

behrica avatar Oct 23 '20 14:10 behrica

My code uses exclusively the case of "arrow" and "mmaped" file (which only works for arrow). This results in very nice behaviour (no GC pressure) even for large (larger then memory) data

BUT: all data stays on disk... Each calculation reads from disk. Having SSD helps a lot...., I suppose. Of course, we hope the OS caches the mmaped files in RAM

behrica avatar Oct 23 '20 14:10 behrica

I am planning to "try" the concrete case with nyc-taxi data as described here: https://arrow.apache.org/docs/r/articles/dataset.html

behrica avatar Oct 23 '20 14:10 behrica

Whats about a "Non-grouped" flat list of arrow files ? Can I have a "grouped dataset", without grouping by = None ....

behrica avatar Oct 23 '20 14:10 behrica

The principle could work as well on non-arrow files. But does it make sense, if the files are non mmaped ?

The approach would need the concept of a "lazy" dataset, which we don't have in t.d.m, do we ?

It works with the mmapped-arrow files, because there are "not-lazy", but "not-on-heap" neither.

Maybe @cnuernber can commet on this as well.

behrica avatar Oct 23 '20 14:10 behrica

Let's do this!

I see this approach of working with large data, as "the future" for Big Data Analysis, namely the combination of:

  • fast SSD
  • data in nested folders on local disk (implementing some levels of group-by, using directories)
  • arrow file format (fast)
  • mmaped (allows data larger then JVM Heap and cached by OS)
  • data stays originally in some form of data lake, mabe even as single, gigantic files . the local storage is temporary, the folder structure done for the analytics case at hand

behrica avatar Oct 23 '20 14:10 behrica

Let's do this! I wonder which path should it go. Can we pack it in dataset call? Or specialized function? I can imagine that multiple CSVs can also be interesting.

I tend to think, that we should only look at the "mmaped arrow files".

Everything else needed to be transformed to that "First": I was searching for, but did not find simple, fast. CLI tools which can do:

parque2arrow csv2arrow

ideally allowing to write the data in "grouped-by" folder hierarchies.

Maybe we can write one, using Clojure with Graal Native....

behrica avatar Oct 23 '20 14:10 behrica

Let's do this! I wonder which path should it go. Can we pack it in dataset call? Or specialized function? I can imagine that multiple CSVs can also be interesting.

I got lately so much bitten by CSV issues, that I start to think, that we should stop supporting it as input..

Mmaped arrow files only and a tool for tranforming xxx2arrow...!!!

(maybe the tool is there already : using R with dplyr + r package "arrow" ) A few lines of R, packaging it in Docker, and the tool is done.

Parquet and CSV should be seen as "formats for storage and exchange", not as formats to be "opened for analysis purposes"

behrica avatar Oct 23 '20 15:10 behrica

an other idea, mixed datasets:

# instead of using confusing factory function
dataset([
     factory("s3://old-taxi-data", format="parquet"),
     factory("local/path/to/new/data", format="csv")
])

# let the user to construct a new dataset directly from dataset objects
dataset([ 
    dataset("s3://old-taxi-data", format="parquet"),
    dataset("local/path/to/new/data", format="csv")
])

behrica avatar Oct 23 '20 15:10 behrica

Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

If the partitions are in separate files - it's relatively easy. If not, some parsing/updating (like you've written in #7) is necessary.

genmeblog avatar Oct 23 '20 20:10 genmeblog

Whats about a "Non-grouped" flat list of arrow files ? Can I have a "grouped dataset", without grouping by = None ....

Yes, you can produce grouped dataset without grouping by. It's pretty straightforward, you need three columns:

  • :name - containing grouping value, the best here is map (ungrouping will create proper columns), eg. {"year" 2002 "month" 11}
  • :group_id - just consecutive unique group number
  • :data - a (sub)dataset

When such dataset is created you need to mark such dataset as grouped (https://github.com/scicloj/tablecloth/blob/master/src/tablecloth/api/group_by.clj#L20 <- I will make this function public)

So we need two paths on the tablecloth level:

  • ability to read chunked dataset and store it in grouped dataset (I hope mmap datasets will work without any problem - to be tested)
  • ability to write grouped dataset to chunks

I wish to have io operations on the tech.ml.dataset side (@cnuernber) The converter of known formats can be left for clojisr for example or maybe Spark/Geni? I don't know here...

genmeblog avatar Oct 23 '20 20:10 genmeblog

Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

If the partitions are in separate files - it's relatively easy. If not, some parsing/updating (like you've written in #7) is necessary.

Yes, I would assume this for this case. The existing folder structure on disk need to match the partitioning information. So we just give a "name" to the directory levels.

behrica avatar Oct 24 '20 19:10 behrica

I did some thinking on this feature, and it is clear that I want to support as well "big data" use cases, namely:

  • The size of all files together is more then RAM
  • The size of a single file is more then RAM

This means, if I am right, I cannot adopt 1:1 the "grouped dataset" way, correct ? Currently a grouped dataset contains the "list of Dataset instances", right ?

If this is true, then the Dataset instances themself will require more heap then we have.

We would need something more lazy, which instantiates the Dataset instances only as needed. i think the pieces for this are present in ns tech.v3.libs.arrow tech.v3.datatype.mmap tech.v3.datatype.native-buffer

behrica avatar Oct 25 '20 14:10 behrica

@cnuernber @genmeblog

Maybe the correct starting point for this would be to make first an implementation of the PColumnarDataset protocol, which is "lazy", in the sense that it does not store any data, but has a "pointer" to the mmaped arrow files (or maybe just the filenames).

behrica avatar Oct 25 '20 14:10 behrica

Hmm, then does that correspond to how we manipulate columns? Do we need a '+' operator that simply creates a node in a DAG and then performs it on demand? That is getting closer to the spark Dataframe design for sure.

cnuernber avatar Oct 25 '20 14:10 cnuernber

Maybe we could go rather far by doing 2 new implementations of the protocol PColumnarDataset, namely:

  1. which works on a arrow native buffer (or even just on the arrow file)

    • calls on every call to it arrow/read-stream-dataset-inplace first and
    • delegates to the dataset created before
  2. an other implementation which takes a "folder" or "a seq of arrow files in hierarchy"

  • and calls the methods on each file and joins results together

This could probably be start "simple", and implement very few methods. Kind of the minimal to select rows and columns from the potentially very large underlying arrow files.

Then we need a kind of "collect" method, which produces then a normal in-memory dataset.

This would then follow the idea of "first selecting a window" on a very large (mmaped) data set and then running the analysis on this windows (which is supped to fit in heap) is a normal Dataset

behrica avatar Oct 25 '20 15:10 behrica

Hmm, so load enough of the file to know the schema and then stop at that point? Or assume the first file's schema applies to the rest of the files?

There is select-window and then operator and there is also giant group-by-type operations for aggregations such as Anthony's benchmark: https://github.com/zero-one-group/geni/blob/develop/docs/simple_performance_benchmark.md

cnuernber avatar Oct 25 '20 15:10 cnuernber

Hmm, so load enough of the file to know the schema and then stop at that point? Or assume the first file's schema applies to the rest of the files?

I would assume that in this scenario, the schema is exactly the same in all arrow files. So having different schemas across the files would be an invalid situation

behrica avatar Oct 25 '20 19:10 behrica

If you can know your pipeline a-priori you can do a lot of optimizations by combing steps or reorganization operations. The Spark Dataframe system has these types of optimizations at the cost of a whole lot of engineering and not being able to see partial results, especially in the repl without an explicit collect call. Thinking that through a second, it seems to me that toString could implicitly imply a collect call and then you would know some things like row-count and you would be able to print out the head of the table or something.

It very interesting how closely the Geni API matched the tablecloth API.

cnuernber avatar Oct 25 '20 19:10 cnuernber

I could be implemented in this way:


(deftype DirectoryDS [fnames]
  ds-proto/PColumnarDataset
  (columns [this]
    (->>    
     (map
      #(arrow/read-stream-dataset-inplace %)
      fnames)
     (apply ds/concat)
     (ds/columns)))
  ;;;  implement all of PColumnar dataset
  Counted
  (count [this]
    (->>    
     (map
      #(arrow/read-stream-dataset-inplace %)
      fnames)
     (map ds/row-count)
     (reduce +)

     )))

;; usage     
    

(def dss (DirectoryDS.
                 [
                  "screenings/AA_Procurement_Demo/screenings.arrow"
                  "screenings/AHAW_EFSA-Q-2011-00853_SVD/screenings.arrow"
                  ]))

(dss/row-count dss)
(dss/columns dss)

I would like to add support for "partitions", so that nested directory level are handled like additional columns of the dataset

behrica avatar Oct 25 '20 19:10 behrica

You would need a more efficient version of ds/concat. Specifically it concats data at the reader level incurring a cost at each index access. If, for instance, the definition of column supported more than one reader/missing set then you could begin to construct operations (like adding elemwise 2 columns) that were efficient.

cnuernber avatar Oct 25 '20 19:10 cnuernber

If you can know your pipeline a-priori you can do a lot of optimizations by combing steps or reorganization operations. The Spark Dataframe system has these types of optimizations at the cost of a whole lot of engineering and not being able to see partial results, especially in the repl without an explicit collect call.

I did not think about this level of complexity and optimisation...

behrica avatar Oct 25 '20 19:10 behrica

If you have the atomic operations right then some of that can be done by the programmer for sure. It isn't necessary to start there.

cnuernber avatar Oct 25 '20 19:10 cnuernber

Or rather, perhaps it is easier to start there? What if your dataset definition is a schema and a list of files and then you have a function that, given a datastructure, produces a new dataset. This datastructure contains a simple representation of your data pipeline? Then it would be easy to optimize pipelines.

cnuernber avatar Oct 25 '20 19:10 cnuernber

And potentially only somewhat harder to construct them.

cnuernber avatar Oct 25 '20 19:10 cnuernber

Main mains use case, is to handle the > 50 GB datasets in clojure. The steps for this should be:

  • Have/make a tool and transform the input from whatever format in to directories of arrow files (all having same schema)
  • use something like DiretoryDS to "point" to this directory and use tablecloth operations to analyse it. (eventually this requires only a subet of PCColumnarDataset implemented, namely subset / filter by rows and columnes)
  • this should return a "in-memory" dataset, and the I stay in usual tablecloth world

behrica avatar Oct 25 '20 20:10 behrica