dask icon indicating copy to clipboard operation
dask copied to clipboard

Full support for multiindex in dataframes

Open dirkbike opened this issue 8 years ago • 53 comments

Dask can load a dataframe from a pytables hdf5 file, and pytables already supports a hierarchy tables. Why not simulate a multiindex (like in pandas) by loading all tables from an hdf5 file into one dask dataframe with nested column indices?

dirkbike avatar Aug 20 '16 14:08 dirkbike

I encourage you to prototype this, perhaps with dask.delayed. http://dask.readthedocs.io/en/latest/delayed-collections.html

mrocklin avatar Aug 20 '16 19:08 mrocklin

I was originally thinking of doing this as a dict that wraps a bunch of dask.dataframes, but as you recommended I'm trying this with dask.delayed. I am using pandas to read/write the hdf data rather than pytables using these functions:

import pandas as pd

def custom_load(key):
    return pd.read_hdf('test.hdf', key)

def custom_save(df, key):
    df.to_hdf('test.hdf', key)

Unfortunately, when I try to use them to build a dask.dataframe I get a TypeError exception:

import dask.dataframe as dd
from dask.delayed import delayed

dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]
df = dd.from_delayed(dfs)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-19-59138932b0db> in <module>()
----> 1 df = dd.from_delayed(dfs)

C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix)
    670         return Series(merge(dsk, dsk2), name, metadata, divisions)
    671     else:
--> 672         return DataFrame(merge(dsk, dsk2), name, metadata, divisions)
    673
    674

C:\Python34\lib\site-packages\dask\dataframe\core.py in __new__(cls, dask, name, columns, divisions)
   1322         result._name = name
   1323
-> 1324         result._pd, result._known_dtype = cls._build_pd(columns)
   1325         result.divisions = tuple(divisions)
   1326         return result

C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata)
    201         else:
    202             if np.isscalar(metadata) or metadata is None:
--> 203                 _pd = cls._partition_type([], name=metadata)
    204             else:
    205                 _pd = cls._partition_type(columns=metadata)

TypeError: __init__() got an unexpected keyword argument 'name'

I don't have much experience with dask.delayed so I'm not sure what the problem is. For reference, this is how I built test.hdf:

from pandas_datareader import data as web

df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01')
df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')

df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc')
df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')

dirkbike avatar Aug 23 '16 00:08 dirkbike

Can you do me a favor and try this from git master?

On Mon, Aug 22, 2016 at 8:05 PM, dirkbike [email protected] wrote:

I was originally thinking of doing this as a dict that wraps a bunch of dask.dataframes, but as you recommended I'm trying this with dask.delayed. I am using pandas to read/write the hdf data rather than pytables using these functions:

import pandas as pd

def custom_load(key): return pd.read_hdf('test.hdf', key)

def custom_save(df, key): df.to_hdf('test.hdf', key)

Unfortunately, when I try to use them to build a dask.dataframe I get a TypeError exception:

import dask.dataframe as dd from dask.delayed import delayed

dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]

df = dd.from_delayed(dfs)

TypeError Traceback (most recent call last) in () ----> 1 df = dd.from_delayed(dfs)

C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix) 670 return Series(merge(dsk, dsk2), name, metadata, divisions) 671 else: --> 672 return DataFrame(merge(dsk, dsk2), name, metadata, divisions) 673 674

C:\Python34\lib\site-packages\dask\dataframe\core.py in new(cls, dask, name, columns, divisions) 1322 result._name = name 1323 -> 1324 result._pd, result._known_dtype = cls._build_pd(columns) 1325 result.divisions = tuple(divisions) 1326 return result

C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata) 201 else: 202 if np.isscalar(metadata) or metadata is None: --> 203 _pd = cls._partition_type([], name=metadata) 204 else: 205 _pd = cls._partition_type(columns=metadata)

TypeError: init() got an unexpected keyword argument 'name'

I don't have much experience with dask.delayed so I'm not sure what the problem is. For reference, this is how I built test.hdf:

from pandas_datareader import data as web

df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01') df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')

df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc') df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493#issuecomment-241588548, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLSaZvyokPKWr5qE1Kdj6Dnnq2Tkks5qijlVgaJpZM4JpFvx .

mrocklin avatar Aug 23 '16 11:08 mrocklin

That worked, thanks. So, what happened is that all of the 'aapl' data was concatenated to the end of 'msft' data in one large dataframe. However, in this case it would be more desirable to have a top-level index that uses key from custom_load in a way similar to this:

df1 = pd.read_hdf('test.hdf', 'msft')
df2 = pd.read_hdf('test.hdf', 'aapl')
df = pd.concat([df1, df2], keys=['msft','aapl'], axis=1)
df['msft'].head() # first key
                Open     High       Low     Close    Volume  Adj Close
Date
2000-01-03  117.3750  118.625  112.0000  116.5625  53228400  39.840438
2000-01-04  113.5625  117.125  112.2500  112.6250  54119000  38.494621
2000-01-05  111.1250  116.375  109.3750  113.8125  64059600  38.900502
2000-01-06  112.1875  113.875  108.3750  110.0000  54976600  37.597410
2000-01-07  108.6250  112.250  107.3125  111.4375  62013600  38.088740

df['aapl'].head() # second key
                  Open        High         Low       Close     Volume  Adj Close
Date
2000-01-03  104.874997  112.499998  101.687501  111.937502  133949200   3.660058
2000-01-04  108.250001  110.625002  101.187503  102.500003  128094400   3.351477
2000-01-05  103.749998  110.562497  103.000001  103.999997  194580400   3.400523
2000-01-06  106.124999  106.999999   94.999998   94.999998  191993200   3.106247
2000-01-07   96.499999  101.000002   95.500003   99.500001  115183600   3.253385

dirkbike avatar Aug 24 '16 02:08 dirkbike

Then perhaps you're right that your dict-of-dataframes idea would suit better

mrocklin avatar Aug 24 '16 11:08 mrocklin

Just curious, but why can't the dask.dataframe object support these kinds of keys internally? I think that is all that's necessary to simulate a pandas multiindex (at least across columns). Would modifying the _Frame class be the best place for this?

dirkbike avatar Aug 25 '16 01:08 dirkbike

Eventually yes, it would be nice for DataFrame to support multiindices. It's non-trivial to change all functions within dask.dataframe to support this. I budget this task at somewhere between a week and a month of developer time, though I am often pessimistic in things like this.

Have you read through the design documentation of dask.dataframe? http://dask.readthedocs.io/en/latest/dataframe-partitions.html

mrocklin avatar Aug 25 '16 02:08 mrocklin

So if I'm understanding correctly, it seems that the best way to support multiindex would be to map them to multiple dimensions of partitions since the multiindex itself provides a natural place to create a partition. My example above would only add a second dimension to the partitions (partitions would span time index and then first-level column keys). It would be a lot easier to maintain these partitions outside of dask by managing multiple dask.dataframe objects, but you would lose the ability to slice across multiple of these partitions and won't guarantee the data stays aligned with the index. I agree that this is non-trivial, since it looks like it would require changing how all of the blocked algorithms are handled and might add undesired overhead.

dirkbike avatar Aug 26 '16 15:08 dirkbike

Yes, that seems like a reasonable synopsis. We would choose some depth of the multi-index along with to partition. For example we might partition along the second or third step of the multi-index. Partitions would then hold a list of tuples of values rather than a list of single values. Many of the operations can probably be changed in bulk, by changing some of the heavier functions, like elemwise and reduction, but I would expect groupbys, joins, etc. to take a fair amount of finesse. I don't yet see a way to do this incrementally.

mrocklin avatar Aug 26 '16 15:08 mrocklin

This may be a bit of a stretch, but maybe it's worth considering more abstract partitioning. I got some inspiration from this paper A Hierarchical Aggregation Framework for Efficient Multilevel Visual Exploration and Analysis that breaks data into hierarchical chunks of smaller and smaller index-slices to make data exploration faster. Partitioning the data would be an expensive operation, but can be done as data is collected. You would need at least one hierarchy for the index and possibly one for the columns (column groupings can come from an existing storage hierarchy or can be made dynamically using groupbys). The index hierarchy would define the partitions similar to dask's current structure except using multiple levels (i.e. days are grouped into months, which are grouped into years, etc.). Columns could use a pseudo-index to map to the main index (i.e. a range or years, months, or specific days) to keep the data dense (no filler NaNs) and allow calculations to quickly skip regions with no data. Index and column groupings would be exposed to the end user via indexing and slicing methods and would provide natural partition boundaries for applied computation. A column hierarchy also provides an organized structure for caching intermediate computation results.

dirkbike avatar Aug 26 '16 21:08 dirkbike

Sounds very cool. I encourage you to explore that further.

mrocklin avatar Aug 26 '16 21:08 mrocklin

I started a prototype using basic python structures (dicts, and subclasses of lists) and realized that data columns either need to use a sequence of index labels to identify each element (because of the hierarchical index), or the columns can map to a flat representation of the hierarchical index (using a pseudo-index). I couldn't think of another way to do this, and mapping each element individually with labels would be very wasteful. The problem with using a pseudo-index is that when data is appended to the data set, the pseudo-index needs to be recalculated. I'm starting to re-think the use of hierarchies at all. Relational databases can represent hierarchical structures by referencing keys between tables of data, and joining tables on a specific column already aligns tables to each other. Perhaps it's better to treat every chunk of data (of n-columns) as a regular 2D dataframe, and use a relational representation to tie all of the dataframes together. Each dataframe would have its own independent index, avoiding the pseudo-index problem, and only when chunks are joined would the index need to be adjusted. The end user could still reference a specific subset of data using slices and labels, but chunks of data would be dynamically joined (or split if necessary) behind the scenes. I'm going to try and prototype something using sqlite and pandas with some more stock data and see how that might work.

dirkbike avatar Aug 27 '16 15:08 dirkbike

Any update on this issue?

liorshk avatar May 05 '17 15:05 liorshk

I ended up using a regular SQL database to track chunks of data and assembling them as necessary into Pandas DataFrames.

dirkbike avatar May 05 '17 21:05 dirkbike

Does dask support reading multi-level indices yet? I'm particularly interested in reading a table written to parquet with a multi-level column index, and I'm getting the following traceback when I try to do this:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed exec> in <module>()

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    797 
    798     return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 799                 categories=categories, index=index)
    800 
    801 

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index)
    557     dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()}
    558 
--> 559     meta = _meta_from_dtypes(all_columns, dtypes, index_names, column_index_names)
    560 
    561     if out_type == Series:

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _meta_from_dtypes(to_read_columns, file_dtypes, index_cols, column_index_names)
    140         df.columns.name = column_index_names[0]
    141     else:
--> 142         df.columns.names = column_index_names
    143     return df
    144 

~/.local/lib/python3.6/site-packages/pandas/core/indexes/base.py in _set_names(self, values, level)
   1116         if len(values) != 1:
   1117             raise ValueError('Length of new names must be 1, got %d' %
-> 1118                              len(values))
   1119         self.name = values[0]
   1120 

ValueError: Length of new names must be 1, got 3

If multi-level indices are generally supported, but not in read_parquet, then perhaps this should be a new issue?

timothydmorton avatar Mar 22 '18 17:03 timothydmorton

To add my 5 cents: absence of MultiIndex support is the show-stopper for me in terms of doing anything with Dask beyond poking around a bit. It is the most important missing feature. Please, do consider implementing it in some form sooner.

vss888 avatar Jun 01 '18 21:06 vss888

I definitely agree @vss888 if this is something that you'd like to contribute that would be very welcome!

mrocklin avatar Jun 04 '18 14:06 mrocklin

This is maybe a simple but not ideal hack with a less that idea resolution. If there were a way to do element-wise concatenation on the two indexes you could create a unique multi-index value (sort of).

The issue that I am running into is that i can't figure out how to do element-wise concat on two dask arrays. Any way to do the following?

index = np.array([x1 + x2 +x3 for x1,x2,x3 in zip(index_id1.astype('str'),
                                           repeat('-', len(index_id1) ) ,
                                           index_id2.astype('str'))])

mmann1123 avatar Apr 26 '19 15:04 mmann1123

Hi there. I'm trying to get a handle on what all might be involved in supporting development on this. It sounds like a few options were previously explored, but the method discussed by @dirkbike and @mrocklin above is the preferred path forward, although the main blocker to that is the amount of work and inability to implement such a change incrementally. @mrocklin do you have a ballpark on the number of functions in the DataFrame API that would be affected by this? I see that it's a complex issue, but I'd like to at least look into supporting this or breaking it down and finding some people to help chip away at it.

mUtterberg avatar Jun 28 '19 22:06 mUtterberg

I don't personally have a ballpark estimate, no. Others might though.

On Fri, Jun 28, 2019 at 11:33 PM Marissa [email protected] wrote:

Hi there. I'm trying to get a handle on what all might be involved in supporting development on this. It sounds like a few options were previously explored, but the method discussed by @dirkbike https://github.com/dirkbike and @mrocklin https://github.com/mrocklin above is the preferred path forward, although the main blocker to that is the amount of work and inability to implement such a change incrementally. @mrocklin https://github.com/mrocklin do you have a ballpark on the number of functions in the DataFrame API that would be affected by this? I see that it's a complex issue, but I'd like to at least look into supporting this or breaking it down and finding some people to help chip away at it.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493?email_source=notifications&email_token=AACKZTFSSIGDF4PLUZDU3XTP42GVPA5CNFSM4CNELPY2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODY3J45I#issuecomment-506895989, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTFAJLRMCFUKX5NDOITP42GVPANCNFSM4CNELPYQ .

mrocklin avatar Jun 29 '19 08:06 mrocklin

@TomAugspurger, do you have any thoughts on this one?

jakirkham avatar Aug 08 '19 15:08 jakirkham

Still open, still worth doing.

On Thu, Aug 8, 2019 at 10:31 AM jakirkham [email protected] wrote:

@TomAugspurger https://github.com/TomAugspurger, do you have any thoughts on this one?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493?email_source=notifications&email_token=AAKAOIU3Y7QFNSAWW2YUW2LQDQ34TA5CNFSM4CNELPY2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3374FQ#issuecomment-519568918, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOIVFR476BN5MDFKVICTQDQ34TANCNFSM4CNELPYQ .

TomAugspurger avatar Aug 08 '19 15:08 TomAugspurger

In the interim, does anyone have a workaround? I don't actually need the multi-index, but all of the intermediate operations I want to use output a multi-index dataframe (pivot_table or groupby with first().unstack() before I can call reset_index) .

These two operations produce the same result for my data, but both produce a multi-index as an intermediate step:

# First, pivot_table
df.pivot_table(index=['Query', 'Target'], columns='Path', values='Percent_similar').reset_index()
# groupby with unstack
df.groupby(['Query', 'Target', 'Path'])['Percent_similar'].first().unstack().reset_index()

My only workaround is to iterate over the unique values of one of the desired indices (in this case, Query), perform pivot_table, assign a new column to the value I'm iterating over, and finally concatenate them all back together.

jvivian-atreca avatar Aug 26 '19 21:08 jvivian-atreca

In the interim, does anyone have a workaround? @jvivian-atreca have you found any?

victor-ab avatar Apr 07 '20 02:04 victor-ab

Small status update: I worked on this a bit. We need to find a good (approximate) quantile algorithm to discover the partitions. Everything we have right now works with 1D arrays. I think the primary difficulty is that the algorithm needs to work with heterogenous data if we want to avoid casting to object dtype.

Alternatively, we can do just that: cast to object dtype and treat MutliIndex as a big list of tuples. With some slight modifications, that becomes 1-D enough for the purposes of quantiling and discovering partitions. Casting to object dtype is expensive though, so I'd like to avoid that if possible.

A secondary issue is that pandas' indexing API isn't well specified with a MultiIndex. Some behaviors like df.loc['A', 'B'] are ambiguous with a MultiIndex, since it's unclear whether that's specifying a slice into a 2-level MultiIndex, or a depth-1 slice on the index and a column selection. But that's a limitation we'll just have to document and accept.

TomAugspurger avatar Apr 07 '20 11:04 TomAugspurger

@rjzamora and the RAPIDS folks have been playing with multi-column indexing I think

One of the nice things about approximate quantiles is that they're decently robust to sampling. I imagine that if we collected 100 samples per partition, and then turned those into objects/tuples that we would still provide a decent solution most of the time. cc @eriknw

mrocklin avatar Apr 07 '20 14:04 mrocklin

To determine the partitions, I would have few reservations about creating a 1-D array of dtype object of tuples and then using the existing partition_quantiles method. I realize this isn't the most performant (speed or memory), but I bet it would be "good enough" for a while, and partition_quantiles is written to work on any dtype. This operation should be compared to what happens next: shuffling the data into new partitions, which is much more costly.

eriknw avatar Apr 07 '20 16:04 eriknw

That's good to hear. I'll dust off my branch that started down this path and push something up, hopefully later this week.

TomAugspurger avatar Apr 07 '20 17:04 TomAugspurger

Related code suggestion in https://github.com/dask/dask/issues/6074

martindurant avatar Apr 08 '20 13:04 martindurant

@TomAugspurger - I support any effort to support multiIndex in dask.dataframe (including multi-column sort_values/set_index operations).

Just a note (for your information) - In dask_cudf we currently support a multi-column sort_values. This method uses the same logic as upstream dask to calculate quantiles. However, it works on multiple columns, because we have a DataFrame-level quantiles method, as well as a DataFrame-level searchsorted method (neither of which is available in pandas, if I understand/remember correctly). Our general approach is to follow the same logic that dask already uses, but to allow divisions to be passed in or calculated/represented as a cudf DataFrame.

rjzamora avatar Apr 08 '20 13:04 rjzamora