tablecloth
tablecloth copied to clipboard
support for working on collections of data files
see here for initial proposal / discussion: https://github.com/techascent/tech.ml.dataset/issues/145
Thanks, will read it tomorrow.
Let's do this! I wonder which path should it go. Can we pack it in dataset call? Or specialized function? I can imagine that multiple CSVs can also be interesting.
I have some code, which constructs the correct "grouped data frame" by iterating over arrow files on disk which are mmaped.
Can you share it? Or maybe you want to make PR?
Continuing, write! can also store multiple files (CSVs, nippy) when grouped dataset is saved.
Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
there are several think to consider:
- The R code does not return a data_frame, but an other thing, on which most dplyr methods can be called
I was creating in my code a "grouped data set".
Is this the best think to do ?
My code uses exclusively the case of "arrow" and "mmaped" file (which only works for arrow). This results in very nice behaviour (no GC pressure) even for large (larger then memory) data
BUT: all data stays on disk... Each calculation reads from disk. Having SSD helps a lot...., I suppose. Of course, we hope the OS caches the mmaped files in RAM
I am planning to "try" the concrete case with nyc-taxi data as described here: https://arrow.apache.org/docs/r/articles/dataset.html
Whats about a "Non-grouped" flat list of arrow files ? Can I have a "grouped dataset", without grouping by = None ....
The principle could work as well on non-arrow files. But does it make sense, if the files are non mmaped ?
The approach would need the concept of a "lazy" dataset, which we don't have in t.d.m, do we ?
It works with the mmapped-arrow files, because there are "not-lazy", but "not-on-heap" neither.
Maybe @cnuernber can commet on this as well.
Let's do this!
I see this approach of working with large data, as "the future" for Big Data Analysis, namely the combination of:
- fast SSD
- data in nested folders on local disk (implementing some levels of group-by, using directories)
- arrow file format (fast)
- mmaped (allows data larger then JVM Heap and cached by OS)
- data stays originally in some form of data lake, mabe even as single, gigantic files . the local storage is temporary, the folder structure done for the analytics case at hand
Let's do this! I wonder which path should it go. Can we pack it in
datasetcall? Or specialized function? I can imagine that multiple CSVs can also be interesting.
I tend to think, that we should only look at the "mmaped arrow files".
Everything else needed to be transformed to that "First": I was searching for, but did not find simple, fast. CLI tools which can do:
parque2arrow csv2arrow
ideally allowing to write the data in "grouped-by" folder hierarchies.
Maybe we can write one, using Clojure with Graal Native....
Let's do this! I wonder which path should it go. Can we pack it in dataset call? Or specialized function? I can imagine that multiple CSVs can also be interesting.
I got lately so much bitten by CSV issues, that I start to think, that we should stop supporting it as input..
Mmaped arrow files only and a tool for tranforming xxx2arrow...!!!
(maybe the tool is there already : using R with dplyr + r package "arrow" ) A few lines of R, packaging it in Docker, and the tool is done.
Parquet and CSV should be seen as "formats for storage and exchange", not as formats to be "opened for analysis purposes"
an other idea, mixed datasets:
# instead of using confusing factory function
dataset([
factory("s3://old-taxi-data", format="parquet"),
factory("local/path/to/new/data", format="csv")
])
# let the user to construct a new dataset directly from dataset objects
dataset([
dataset("s3://old-taxi-data", format="parquet"),
dataset("local/path/to/new/data", format="csv")
])
Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
If the partitions are in separate files - it's relatively easy. If not, some parsing/updating (like you've written in #7) is necessary.
Whats about a "Non-grouped" flat list of arrow files ? Can I have a "grouped dataset", without grouping by = None ....
Yes, you can produce grouped dataset without grouping by. It's pretty straightforward, you need three columns:
- :name - containing grouping value, the best here is map (ungrouping will create proper columns), eg.
{"year" 2002 "month" 11} - :group_id - just consecutive unique group number
- :data - a (sub)dataset
When such dataset is created you need to mark such dataset as grouped (https://github.com/scicloj/tablecloth/blob/master/src/tablecloth/api/group_by.clj#L20 <- I will make this function public)
So we need two paths on the tablecloth level:
- ability to read chunked dataset and store it in grouped dataset (I hope
mmapdatasets will work without any problem - to be tested) - ability to write grouped dataset to chunks
I wish to have io operations on the tech.ml.dataset side (@cnuernber)
The converter of known formats can be left for clojisr for example or maybe Spark/Geni? I don't know here...
Maybe I can come to a PR. But we should maybe think together about the path. I liked the idea of an other methods, as done in R:
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))If the partitions are in separate files - it's relatively easy. If not, some parsing/updating (like you've written in #7) is necessary.
Yes, I would assume this for this case. The existing folder structure on disk need to match the partitioning information. So we just give a "name" to the directory levels.
I did some thinking on this feature, and it is clear that I want to support as well "big data" use cases, namely:
- The size of all files together is more then RAM
- The size of a single file is more then RAM
This means, if I am right, I cannot adopt 1:1 the "grouped dataset" way, correct ? Currently a grouped dataset contains the "list of Dataset instances", right ?
If this is true, then the Dataset instances themself will require more heap then we have.
We would need something more lazy, which instantiates the Dataset instances only as needed. i think the pieces for this are present in ns tech.v3.libs.arrow tech.v3.datatype.mmap tech.v3.datatype.native-buffer
@cnuernber @genmeblog
Maybe the correct starting point for this would be to make first an implementation of the PColumnarDataset protocol, which is "lazy", in the sense that it does not store any data, but has a "pointer" to the mmaped arrow files (or maybe just the filenames).
Hmm, then does that correspond to how we manipulate columns? Do we need a '+' operator that simply creates a node in a DAG and then performs it on demand? That is getting closer to the spark Dataframe design for sure.
Maybe we could go rather far by doing 2 new implementations of the protocol PColumnarDataset, namely:
-
which works on a arrow native buffer (or even just on the arrow file)
- calls on every call to it arrow/read-stream-dataset-inplace first and
- delegates to the dataset created before
-
an other implementation which takes a "folder" or "a seq of arrow files in hierarchy"
- and calls the methods on each file and joins results together
This could probably be start "simple", and implement very few methods. Kind of the minimal to select rows and columns from the potentially very large underlying arrow files.
Then we need a kind of "collect" method, which produces then a normal in-memory dataset.
This would then follow the idea of "first selecting a window" on a very large (mmaped) data set and then running the analysis on this windows (which is supped to fit in heap) is a normal Dataset
Hmm, so load enough of the file to know the schema and then stop at that point? Or assume the first file's schema applies to the rest of the files?
There is select-window and then operator and there is also giant group-by-type operations for aggregations such as Anthony's benchmark: https://github.com/zero-one-group/geni/blob/develop/docs/simple_performance_benchmark.md
Hmm, so load enough of the file to know the schema and then stop at that point? Or assume the first file's schema applies to the rest of the files?
I would assume that in this scenario, the schema is exactly the same in all arrow files. So having different schemas across the files would be an invalid situation
If you can know your pipeline a-priori you can do a lot of optimizations by combing steps or reorganization operations. The Spark Dataframe system has these types of optimizations at the cost of a whole lot of engineering and not being able to see partial results, especially in the repl without an explicit collect call. Thinking that through a second, it seems to me that toString could implicitly imply a collect call and then you would know some things like row-count and you would be able to print out the head of the table or something.
It very interesting how closely the Geni API matched the tablecloth API.
I could be implemented in this way:
(deftype DirectoryDS [fnames]
ds-proto/PColumnarDataset
(columns [this]
(->>
(map
#(arrow/read-stream-dataset-inplace %)
fnames)
(apply ds/concat)
(ds/columns)))
;;; implement all of PColumnar dataset
Counted
(count [this]
(->>
(map
#(arrow/read-stream-dataset-inplace %)
fnames)
(map ds/row-count)
(reduce +)
)))
;; usage
(def dss (DirectoryDS.
[
"screenings/AA_Procurement_Demo/screenings.arrow"
"screenings/AHAW_EFSA-Q-2011-00853_SVD/screenings.arrow"
]))
(dss/row-count dss)
(dss/columns dss)
I would like to add support for "partitions", so that nested directory level are handled like additional columns of the dataset
You would need a more efficient version of ds/concat. Specifically it concats data at the reader level incurring a cost at each index access. If, for instance, the definition of column supported more than one reader/missing set then you could begin to construct operations (like adding elemwise 2 columns) that were efficient.
If you can know your pipeline a-priori you can do a lot of optimizations by combing steps or reorganization operations. The Spark Dataframe system has these types of optimizations at the cost of a whole lot of engineering and not being able to see partial results, especially in the repl without an explicit collect call.
I did not think about this level of complexity and optimisation...
If you have the atomic operations right then some of that can be done by the programmer for sure. It isn't necessary to start there.
Or rather, perhaps it is easier to start there? What if your dataset definition is a schema and a list of files and then you have a function that, given a datastructure, produces a new dataset. This datastructure contains a simple representation of your data pipeline? Then it would be easy to optimize pipelines.
And potentially only somewhat harder to construct them.
Main mains use case, is to handle the > 50 GB datasets in clojure. The steps for this should be:
- Have/make a tool and transform the input from whatever format in to directories of arrow files (all having same schema)
- use something like DiretoryDS to "point" to this directory and use tablecloth operations to analyse it. (eventually this requires only a subet of PCColumnarDataset implemented, namely subset / filter by rows and columnes)
- this should return a "in-memory" dataset, and the I stay in usual tablecloth world