dask
dask copied to clipboard
Full support for multiindex in dataframes
Dask can load a dataframe from a pytables hdf5 file, and pytables already supports a hierarchy tables. Why not simulate a multiindex (like in pandas) by loading all tables from an hdf5 file into one dask dataframe with nested column indices?
I encourage you to prototype this, perhaps with dask.delayed
. http://dask.readthedocs.io/en/latest/delayed-collections.html
I was originally thinking of doing this as a dict
that wraps a bunch of dask.dataframe
s, but as you recommended I'm trying this with dask.delayed
. I am using pandas to read/write the hdf data rather than pytables using these functions:
import pandas as pd
def custom_load(key):
return pd.read_hdf('test.hdf', key)
def custom_save(df, key):
df.to_hdf('test.hdf', key)
Unfortunately, when I try to use them to build a dask.dataframe
I get a TypeError
exception:
import dask.dataframe as dd
from dask.delayed import delayed
dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]
df = dd.from_delayed(dfs)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-19-59138932b0db> in <module>()
----> 1 df = dd.from_delayed(dfs)
C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix)
670 return Series(merge(dsk, dsk2), name, metadata, divisions)
671 else:
--> 672 return DataFrame(merge(dsk, dsk2), name, metadata, divisions)
673
674
C:\Python34\lib\site-packages\dask\dataframe\core.py in __new__(cls, dask, name, columns, divisions)
1322 result._name = name
1323
-> 1324 result._pd, result._known_dtype = cls._build_pd(columns)
1325 result.divisions = tuple(divisions)
1326 return result
C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata)
201 else:
202 if np.isscalar(metadata) or metadata is None:
--> 203 _pd = cls._partition_type([], name=metadata)
204 else:
205 _pd = cls._partition_type(columns=metadata)
TypeError: __init__() got an unexpected keyword argument 'name'
I don't have much experience with dask.delayed
so I'm not sure what the problem is. For reference, this is how I built test.hdf
:
from pandas_datareader import data as web
df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01')
df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')
df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc')
df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')
Can you do me a favor and try this from git master?
On Mon, Aug 22, 2016 at 8:05 PM, dirkbike [email protected] wrote:
I was originally thinking of doing this as a dict that wraps a bunch of dask.dataframes, but as you recommended I'm trying this with dask.delayed. I am using pandas to read/write the hdf data rather than pytables using these functions:
import pandas as pd
def custom_load(key): return pd.read_hdf('test.hdf', key)
def custom_save(df, key): df.to_hdf('test.hdf', key)
Unfortunately, when I try to use them to build a dask.dataframe I get a TypeError exception:
import dask.dataframe as dd from dask.delayed import delayed
dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]
df = dd.from_delayed(dfs)
TypeError Traceback (most recent call last)
in () ----> 1 df = dd.from_delayed(dfs) C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix) 670 return Series(merge(dsk, dsk2), name, metadata, divisions) 671 else: --> 672 return DataFrame(merge(dsk, dsk2), name, metadata, divisions) 673 674
C:\Python34\lib\site-packages\dask\dataframe\core.py in new(cls, dask, name, columns, divisions) 1322 result._name = name 1323 -> 1324 result._pd, result._known_dtype = cls._build_pd(columns) 1325 result.divisions = tuple(divisions) 1326 return result
C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata) 201 else: 202 if np.isscalar(metadata) or metadata is None: --> 203 _pd = cls._partition_type([], name=metadata) 204 else: 205 _pd = cls._partition_type(columns=metadata)
TypeError: init() got an unexpected keyword argument 'name'
I don't have much experience with dask.delayed so I'm not sure what the problem is. For reference, this is how I built test.hdf:
from pandas_datareader import data as web
df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01') df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')
df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc') df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493#issuecomment-241588548, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLSaZvyokPKWr5qE1Kdj6Dnnq2Tkks5qijlVgaJpZM4JpFvx .
That worked, thanks. So, what happened is that all of the 'aapl' data was concatenated to the end of 'msft' data in one large dataframe. However, in this case it would be more desirable to have a top-level index that uses key
from custom_load
in a way similar to this:
df1 = pd.read_hdf('test.hdf', 'msft')
df2 = pd.read_hdf('test.hdf', 'aapl')
df = pd.concat([df1, df2], keys=['msft','aapl'], axis=1)
df['msft'].head() # first key
Open High Low Close Volume Adj Close
Date
2000-01-03 117.3750 118.625 112.0000 116.5625 53228400 39.840438
2000-01-04 113.5625 117.125 112.2500 112.6250 54119000 38.494621
2000-01-05 111.1250 116.375 109.3750 113.8125 64059600 38.900502
2000-01-06 112.1875 113.875 108.3750 110.0000 54976600 37.597410
2000-01-07 108.6250 112.250 107.3125 111.4375 62013600 38.088740
df['aapl'].head() # second key
Open High Low Close Volume Adj Close
Date
2000-01-03 104.874997 112.499998 101.687501 111.937502 133949200 3.660058
2000-01-04 108.250001 110.625002 101.187503 102.500003 128094400 3.351477
2000-01-05 103.749998 110.562497 103.000001 103.999997 194580400 3.400523
2000-01-06 106.124999 106.999999 94.999998 94.999998 191993200 3.106247
2000-01-07 96.499999 101.000002 95.500003 99.500001 115183600 3.253385
Then perhaps you're right that your dict-of-dataframes idea would suit better
Just curious, but why can't the dask.dataframe
object support these kinds of keys internally? I think that is all that's necessary to simulate a pandas multiindex (at least across columns). Would modifying the _Frame
class be the best place for this?
Eventually yes, it would be nice for DataFrame to support multiindices. It's non-trivial to change all functions within dask.dataframe to support this. I budget this task at somewhere between a week and a month of developer time, though I am often pessimistic in things like this.
Have you read through the design documentation of dask.dataframe? http://dask.readthedocs.io/en/latest/dataframe-partitions.html
So if I'm understanding correctly, it seems that the best way to support multiindex would be to map them to multiple dimensions of partitions since the multiindex itself provides a natural place to create a partition. My example above would only add a second dimension to the partitions (partitions would span time index and then first-level column keys). It would be a lot easier to maintain these partitions outside of dask by managing multiple dask.dataframe
objects, but you would lose the ability to slice across multiple of these partitions and won't guarantee the data stays aligned with the index. I agree that this is non-trivial, since it looks like it would require changing how all of the blocked algorithms are handled and might add undesired overhead.
Yes, that seems like a reasonable synopsis. We would choose some depth of the multi-index along with to partition. For example we might partition along the second or third step of the multi-index. Partitions would then hold a list of tuples of values rather than a list of single values. Many of the operations can probably be changed in bulk, by changing some of the heavier functions, like elemwise and reduction, but I would expect groupbys, joins, etc. to take a fair amount of finesse. I don't yet see a way to do this incrementally.
This may be a bit of a stretch, but maybe it's worth considering more abstract partitioning. I got some inspiration from this paper A Hierarchical Aggregation Framework for Efficient Multilevel Visual Exploration and Analysis that breaks data into hierarchical chunks of smaller and smaller index-slices to make data exploration faster. Partitioning the data would be an expensive operation, but can be done as data is collected. You would need at least one hierarchy for the index and possibly one for the columns (column groupings can come from an existing storage hierarchy or can be made dynamically using groupbys). The index hierarchy would define the partitions similar to dask's current structure except using multiple levels (i.e. days are grouped into months, which are grouped into years, etc.). Columns could use a pseudo-index to map to the main index (i.e. a range or years, months, or specific days) to keep the data dense (no filler NaNs) and allow calculations to quickly skip regions with no data. Index and column groupings would be exposed to the end user via indexing and slicing methods and would provide natural partition boundaries for applied computation. A column hierarchy also provides an organized structure for caching intermediate computation results.
Sounds very cool. I encourage you to explore that further.
I started a prototype using basic python structures (dicts, and subclasses of lists) and realized that data columns either need to use a sequence of index labels to identify each element (because of the hierarchical index), or the columns can map to a flat representation of the hierarchical index (using a pseudo-index). I couldn't think of another way to do this, and mapping each element individually with labels would be very wasteful. The problem with using a pseudo-index is that when data is appended to the data set, the pseudo-index needs to be recalculated. I'm starting to re-think the use of hierarchies at all. Relational databases can represent hierarchical structures by referencing keys between tables of data, and joining tables on a specific column already aligns tables to each other. Perhaps it's better to treat every chunk of data (of n-columns) as a regular 2D dataframe, and use a relational representation to tie all of the dataframes together. Each dataframe would have its own independent index, avoiding the pseudo-index problem, and only when chunks are joined would the index need to be adjusted. The end user could still reference a specific subset of data using slices and labels, but chunks of data would be dynamically joined (or split if necessary) behind the scenes. I'm going to try and prototype something using sqlite and pandas with some more stock data and see how that might work.
Any update on this issue?
I ended up using a regular SQL database to track chunks of data and assembling them as necessary into Pandas DataFrames.
Does dask support reading multi-level indices yet? I'm particularly interested in reading a table written to parquet with a multi-level column index, and I'm getting the following traceback when I try to do this:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<timed exec> in <module>()
/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
797
798 return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 799 categories=categories, index=index)
800
801
/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index)
557 dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()}
558
--> 559 meta = _meta_from_dtypes(all_columns, dtypes, index_names, column_index_names)
560
561 if out_type == Series:
/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _meta_from_dtypes(to_read_columns, file_dtypes, index_cols, column_index_names)
140 df.columns.name = column_index_names[0]
141 else:
--> 142 df.columns.names = column_index_names
143 return df
144
~/.local/lib/python3.6/site-packages/pandas/core/indexes/base.py in _set_names(self, values, level)
1116 if len(values) != 1:
1117 raise ValueError('Length of new names must be 1, got %d' %
-> 1118 len(values))
1119 self.name = values[0]
1120
ValueError: Length of new names must be 1, got 3
If multi-level indices are generally supported, but not in read_parquet
, then perhaps this should be a new issue?
To add my 5 cents: absence of MultiIndex support is the show-stopper for me in terms of doing anything with Dask beyond poking around a bit. It is the most important missing feature. Please, do consider implementing it in some form sooner.
I definitely agree @vss888 if this is something that you'd like to contribute that would be very welcome!
This is maybe a simple but not ideal hack with a less that idea resolution. If there were a way to do element-wise concatenation on the two indexes you could create a unique multi-index value (sort of).
The issue that I am running into is that i can't figure out how to do element-wise concat on two dask arrays. Any way to do the following?
index = np.array([x1 + x2 +x3 for x1,x2,x3 in zip(index_id1.astype('str'),
repeat('-', len(index_id1) ) ,
index_id2.astype('str'))])
Hi there. I'm trying to get a handle on what all might be involved in supporting development on this. It sounds like a few options were previously explored, but the method discussed by @dirkbike and @mrocklin above is the preferred path forward, although the main blocker to that is the amount of work and inability to implement such a change incrementally. @mrocklin do you have a ballpark on the number of functions in the DataFrame API that would be affected by this? I see that it's a complex issue, but I'd like to at least look into supporting this or breaking it down and finding some people to help chip away at it.
I don't personally have a ballpark estimate, no. Others might though.
On Fri, Jun 28, 2019 at 11:33 PM Marissa [email protected] wrote:
Hi there. I'm trying to get a handle on what all might be involved in supporting development on this. It sounds like a few options were previously explored, but the method discussed by @dirkbike https://github.com/dirkbike and @mrocklin https://github.com/mrocklin above is the preferred path forward, although the main blocker to that is the amount of work and inability to implement such a change incrementally. @mrocklin https://github.com/mrocklin do you have a ballpark on the number of functions in the DataFrame API that would be affected by this? I see that it's a complex issue, but I'd like to at least look into supporting this or breaking it down and finding some people to help chip away at it.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493?email_source=notifications&email_token=AACKZTFSSIGDF4PLUZDU3XTP42GVPA5CNFSM4CNELPY2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODY3J45I#issuecomment-506895989, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTFAJLRMCFUKX5NDOITP42GVPANCNFSM4CNELPYQ .
@TomAugspurger, do you have any thoughts on this one?
Still open, still worth doing.
On Thu, Aug 8, 2019 at 10:31 AM jakirkham [email protected] wrote:
@TomAugspurger https://github.com/TomAugspurger, do you have any thoughts on this one?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/1493?email_source=notifications&email_token=AAKAOIU3Y7QFNSAWW2YUW2LQDQ34TA5CNFSM4CNELPY2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3374FQ#issuecomment-519568918, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOIVFR476BN5MDFKVICTQDQ34TANCNFSM4CNELPYQ .
In the interim, does anyone have a workaround? I don't actually need the multi-index, but all of the intermediate operations I want to use output a multi-index dataframe (pivot_table
or groupby
with first().unstack()
before I can call reset_index
) .
These two operations produce the same result for my data, but both produce a multi-index as an intermediate step:
# First, pivot_table
df.pivot_table(index=['Query', 'Target'], columns='Path', values='Percent_similar').reset_index()
# groupby with unstack
df.groupby(['Query', 'Target', 'Path'])['Percent_similar'].first().unstack().reset_index()
My only workaround is to iterate over the unique values of one of the desired indices (in this case, Query
), perform pivot_table
, assign a new column to the value I'm iterating over, and finally concatenate them all back together.
In the interim, does anyone have a workaround? @jvivian-atreca have you found any?
Small status update: I worked on this a bit. We need to find a good (approximate) quantile algorithm to discover the partitions. Everything we have right now works with 1D arrays. I think the primary difficulty is that the algorithm needs to work with heterogenous data if we want to avoid casting to object dtype.
Alternatively, we can do just that: cast to object dtype and treat MutliIndex as a big list of tuples. With some slight modifications, that becomes 1-D enough for the purposes of quantiling and discovering partitions. Casting to object dtype is expensive though, so I'd like to avoid that if possible.
A secondary issue is that pandas' indexing API isn't well specified with a MultiIndex. Some behaviors like df.loc['A', 'B']
are ambiguous with a MultiIndex, since it's unclear whether that's specifying a slice into a 2-level MultiIndex, or a depth-1 slice on the index and a column selection. But that's a limitation we'll just have to document and accept.
@rjzamora and the RAPIDS folks have been playing with multi-column indexing I think
One of the nice things about approximate quantiles is that they're decently robust to sampling. I imagine that if we collected 100 samples per partition, and then turned those into objects/tuples that we would still provide a decent solution most of the time. cc @eriknw
To determine the partitions, I would have few reservations about creating a 1-D array of dtype object
of tuples and then using the existing partition_quantiles
method. I realize this isn't the most performant (speed or memory), but I bet it would be "good enough" for a while, and partition_quantiles
is written to work on any dtype. This operation should be compared to what happens next: shuffling the data into new partitions, which is much more costly.
That's good to hear. I'll dust off my branch that started down this path and push something up, hopefully later this week.
Related code suggestion in https://github.com/dask/dask/issues/6074
@TomAugspurger - I support any effort to support multiIndex in dask.dataframe (including multi-column sort_values/set_index operations).
Just a note (for your information) - In dask_cudf
we currently support a multi-column sort_values
. This method uses the same logic as upstream dask to calculate quantiles. However, it works on multiple columns, because we have a DataFrame
-level quantiles
method, as well as a DataFrame
-level searchsorted method (neither of which is available in pandas, if I understand/remember correctly). Our general approach is to follow the same logic that dask already uses, but to allow divisions to be passed in or calculated/represented as a cudf DataFrame.