tsfresh icon indicating copy to clipboard operation
tsfresh copied to clipboard

how can rolling_time_series deal with big data ?

Open lk1983823 opened this issue 3 years ago • 6 comments

I have a 7m rows dataframe to deal with and want to use rolling_time_series in tsfresh. Whether using dask or not, I find it impossible to make it available. Because when using dask, inputs to the rolling_time_series function should using the form of df.compute(), thereby occupying a lot of memories during running. Are there any helpful suggestions ? Thanks!

lk1983823 avatar Nov 02 '20 11:11 lk1983823

The easiest way order compute instance with a lot of memory on Google cloud or AWS ;)

madpower2000 avatar Nov 02 '20 12:11 madpower2000

The easiest way order compute instance with a lot of memory on Google cloud or AWS ;) But I want to make it run in my machine.

lk1983823 avatar Nov 02 '20 12:11 lk1983823

What do you mean with the input needs df.compute()?

If you use dask as input to the rolling function, you can also hand over a non-evaluated dask data frame (not a pandas data frame). If you pass this directly to the extract features function, you in principle never need to go outside of dask.

However - of course - that is still a lot of data and I assume the feature extraction on 7M time series will also take a long time...

nils-braun avatar Nov 02 '20 13:11 nils-braun

What do you mean with the input needs df.compute()?

If you use dask as input to the rolling function, you can also hand over a non-evaluated dask data frame (not a pandas data frame). If you pass this directly to the extract features function, you in principle never need to go outside of dask.

However - of course - that is still a lot of data and I assume the feature extraction on 7M time series will also take a long time...

here is my code below:

windowSize = 100
dfSH05FillNan = dd.read_parquet('./testdf.parquet.gzip')
dfSH05FillNanDaskRolled = roll_time_series(dfSH05FillNan, 
                                              column_id="timeContinuity", column_sort="timestamp", chunksize=50,
                                              max_timeshift=windowSize-1, min_timeshift=windowSize-1)

it shows the error as follows:

Traceback (most recent call last):
  File "tsProcess.py", line 169, in <module>
    max_timeshift=windowSize-1, min_timeshift=windowSize-1)
  File "/home/lk/.pyenv/versions/anaconda3-2020.02/envs/ai/lib/python3.7/site-packages/tsfresh/utilities/dataframe_functions.py", line 576, in roll_time_series
    if df[column_sort].isnull().any():
  File "/home/lk/.pyenv/versions/anaconda3-2020.02/envs/ai/lib/python3.7/site-packages/dask/dataframe/core.py", line 199, in __bool__
    "a conditional statement.".format(self)
TypeError: Trying to convert dd.Scalar<series-..., dtype=bool> to a boolean value. Because Dask objects are lazily evaluated, they cannot be converted to a boolean value or used in boolean conditions like if statements. Try calling .compute() to force computation prior to converting to a boolean value or using in a conditional statement.

My tsfresh version is 0.17.0

lk1983823 avatar Nov 02 '20 14:11 lk1983823

Oh - I am very sorry. I somehow assumed #731 is already finished and implemented but have just remembered that this is not the case. Hm, well then you are completely correct - so far the rolling only accepts pandas data.

I think you can either wait for the PR to be finished (although it is kind of stuck currently) and/or help there - or you could try to do a staged approach where you roll only the first part of your data, then extract features and store it, then the second part... Maybe you can automate this with e.g. luigi.

If you want, I guess working on #731 would be very nice.

nils-braun avatar Nov 02 '20 16:11 nils-braun

Dask support for rolling_time_series() will be highly appreciated.

With larger than memory data, I assume following would be an ideal workflow:

  1. Using Dask, read data for different ids (large number of CSVs or parquet files) stored in hard drive or cloud storage.
  2. rolling_time_series() accepts a Dask DataFrame (source) and outputs the rolled data in another Dask DataFrame. This resultant Dask DataFrame, if computed will be larger than the source DataFrame. Hence, should be passed as it is (as Dask DataFrame without computation) to next step.
  3. dask_feature_extraction_on_chunk() accepts the output from the previous step (in some shape and form). Computes the fetaures for different ids in the form of Dask DataFrame.
  4. Features computed at step #3 can be stored back to the hard drive/cloud storage segregated based on ids. Or the Dask DataFrame from #3 can be passed as it is to Machine Learning libraries for training or prediction directly.

arnabbiswas1 avatar Jan 03 '21 15:01 arnabbiswas1