dask-polars icon indicating copy to clipboard operation
dask-polars copied to clipboard

Dask bag

Open ritchie46 opened this issue 2 years ago • 4 comments

I start this issue to get some community insight/help on this. As I understand it starting with a Dask bag is the most natural step to start this.

I understand this as a partitioned data structure on which we can apply functional combinators like map, filter, fold, etc. Given the memory layout from polars, it seems most natural to have columnar or batches of columnar memory in a partitioned datastructure. Rows are very, very inefficient.

This might be a context switch as spark RDDs seem to work on rows, and pandas also has somewhat row oriented manager with C ordering in numpy + the row manager.

Things that also might be interesting is partitioning the lazy polars queries and have no data at all. Although this is also something the dask graph does already.

ritchie46 avatar Feb 12 '22 08:02 ritchie46

As I understand it starting with a Dask bag is the most natural step to start this.

I would look at Dask bag in order to understand how Dask collections work, but I wouldn't base anything on them. If Dask were a project like Spark then yes, this would make sense because the Spark RDD is the most basic Spark structure. That's not true for Dask though, there are lower level structures that make more sense.

A Dask bag is a bunch of lists, each of which contain Python objects. Using some rudimentary typing system it might look like this:

Bag[Any] :: List[List[Any]]

What you propose is the following:

DaskPolarsDataframe :: Bag[PolarsDataFrames] :: List[List[PolarsDataFrame]]

This is one level more complex than it should be though. Instead we want this

DaskPolarsDataFrame :: List[PolarsDataFrame]

Where a dask_polars.DataFrame is just a collection of individual polars.DataFrames, one dataframe is one partition. We don't need to restrict ourselves to map/filter. We can do whatever we want with arbitrary Dask graphs. See https://docs.dask.org/en/stable/graphs.html

mrocklin avatar Feb 12 '22 19:02 mrocklin

I've pushed up a minimal, and very bad implementation to this repository. It would be worth having a look.

There are many things in this implementation that we shouldn't do. At this stage I'm intentionally not bringing in some of the more advanced parts of Dask in order to keep this educational. When we're ready for more though there are things we can bring in, like adding all of the operators automatically, tree reductions, etc..

mrocklin avatar Feb 12 '22 19:02 mrocklin

Some good and fun next steps for someone who is not familar with Dask would be ...

  1. Educational: support the npartitions= keyword in from_dataframe (this forces one to learn about task graphs)
  2. Add in a simple from_dask_dataframe function, so that we can go back and forth
  3. Spin up a distributed cluster, make a large random dataset with dask dataframe (dask.datasets.timeseries), convert over to dask_polars.DataFrame. Do something large but very simple, like a large distributed sum.

For folks who are more familiar with Dask internals we would probably want to ...

  1. Add in all of the opertators by pulling in the _bind_foo methods from dask.dataframe (maybe best done by someone who knows Dask)
  2. Add in the apply-concat-apply pattern from dask.dataframe, use that to implement sum/count/max/... (maybe best done by someone who knows Dask)
  3. Start using HighLevelGraphs for to store all of the graph layers, even if the layers are just dicts for now (let's keep it simple)

mrocklin avatar Feb 12 '22 19:02 mrocklin

Hey! Let me know if you want help on this project. Learning rust now and using polars for data science. Also starting to use Dask at work. If you have anything eventually or on the smaller side let me know :).

esadler-hbo avatar Feb 22 '22 17:02 esadler-hbo