tsfresh
tsfresh copied to clipboard
Better support/integration for custom functions
Hi!
First of all congrats for the great package, it's really well made.
I think extract_features()
would highly benefit from an easier way to add custom feature calculation functions on the fly, without having to modify the feature_calculators
source file. Having to clone the repository and change the source code is not very handy, especially when the code might need to be used in production environments. At the same time, I've been needing to use very project-specific functions which are not worth adding to the package through a PR.
My suggestion is therefore to add an optional parameter to extract_features
, which could be a dictionary such as {'function_name': foo}
, where foo is the actual function to be called. Then each function in default_fc_parameters
is fetched from feature_calculators
, if it's not found, it's called from the new dictionary. The flow would look something like this:
# Custom function to add
def count(x):
return len(x)
# Dictionary of custom functions
custom_functions = {
'count': count
}
# The settings dictionary looks exactly the same
settings = {
'mean': None,
'count': None
}
# The extract_features call just has an extra parameter
extract_features(settings, custom_functions=custom_functions ....}
In the backend, in feature_extraction.extraction._do_extraction_on_chunk
, line 286, would be modified to something like:
for function_name, parameter_list in fc_parameters.items():
if hasattr(feature_calculators, function_name):
func = getattr(feature_calculators, function_name)
else:
func = custom_functions.get(function_name)
# Maybe raise an informative error here if it's not found
It doesn't need to be exactly like this, but this way would be pretty straightforward to implement. If there is no reason why we shouldn't do this, I'm happy to make a PR and continue the technical discussion there.
Thanks!
sorry for the late response. That sounds like a very interesting idea, do you want to submit a pr for that?
@MaxBenChrist Sure! Will come soon.
@MaxBenChrist sadly the approach of passing a dictionary with the functions does not work, because the pickling that occurs during the multiprocessing cannot deal with locally defined functions. The alternatives I came up with are:
- Pass the path to a separate module containing the custom functions (but it's not very pretty)
- Pass a class containing the custom functions (which might not very intuitive to make for a "regular" user)
Thoughts?
Ah yeah, well that is unfortunate.
1.) will break if tsfresh is run on a cluster, right? 2.) I have no idea how this looks, do you have an exemplary code snippet?
Another option would be to replace the multiprocessing with something like pathos that uses dill which in turn seems to be able to pickle functions, see http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
We had to deal with this when implementing custom primitives in Featuretools. We got around it by using cloudpickle.
We first turn the features into a string
features_string = cloudpickle.dumps(features)
Then, distribute that string. In each process, we then we deserialize back to the correct objects
features = cloudpickle.loads(features_string)
Is there anyone willing to implement this as a PR?
In principle I see the point why you would want to have custom feature calculators in tsfresh as simple as an additional argument. However, if it really means we need to start using cloudpickle etc. etc. I would argue against it. If you have a feature engineering pipeline you could just have your custom feature extractor as an additional step after tsfresh and add an additional column. All tsfresh would give you is the "groupby" as far as I can see. I guess I am wrong, so please correct me :-)
@nils-braun well, having a double feature engineering (half of which in pandas, for instance) kinda goes against the speed and scalability advantages of tsfresh! For now, my solution is to have a forked tsfresh repo in my own Github, which has a custom feature_calculators
module and a couple of other tweaks, and which I use in my projects. It's not ideal though – and I feel like being able to easily add functions would considerably improve the usability/flexibility of the package.
@michetonu I could not find these changes in your repository, can you please tell which commits implement what you want? BTW I played with the solution discussed above and it seems to work fine with multiprocessing (tested only on Linux), see https://github.com/dbarbier/tsfresh/commit/1c04a14492401b189f4a67c15bcc23e88c266e8f
Ah yes sorry it's on my company's repo: https://github.com/Pacmed/tsfresh
here's the diff, ignore the changes not in feature_calculators
, as I don't think the code in there is used currently https://github.com/blue-yonder/tsfresh/compare/master...Pacmed:master
@michetonu You wrote in a comment above
sadly the approach of passing a dictionary with the functions does not work, because the pickling that occurs during the multiprocessing cannot deal with locally defined functions.
Please have a look at branch https://github.com/dbarbier/tsfresh/tree/db/experiment-482, it seems to work just fine with multiprocessing, can you please confirm? See test-user-defined.py
file in top-level directory.
The only issue I see is with functions returning multiple values.
@michetonu @dbarbier is there still work on this? Or this is not an issue anymore because you can solve it as a post-processing step?
@nils-braun I've been meaning to try @dbarbier 's solution but I've been very busy! I'll give it a shot hopefully this week and will let you know.
@nils-braun @dbarbier sorry for the massive delay, but I'm happy to confirm that @dbarbier's code for custom functions works indeed. Great stuff! Shall we proceed with a PR?
@michetonu Great, I will rebase this branch to main and submit a PR
While working on the PR I had another idea; in fact _do_extraction_on_chunk
looks for functions in tsfresh.feature_extraction.feature_calculators
, so I wondered if one could add their own functions to this package, and this is indeed feasible.
Here is a full working example:
import tsfresh
from tsfresh.feature_extraction import feature_calculators
import pandas as pd
import numpy as np
from collections import Counter
np.random.seed(42)
df = pd.DataFrame(np.concatenate([np.repeat(range(200),10)[:,np.newaxis],
np.tile(range(10),200)[:,np.newaxis],
np.random.randn(2000, 1)], axis=1), columns=["id", "time", "x"])
# Custom functions to add
@feature_calculators.set_property("fctype", "combiner")
def value_count_all(x, param):
"""
Returns the number of values in x
:param x: the time series on which to calculate the feature.
:type x: pandas.Series
:param param: None
:return: the value of this feature
:return type: list
"""
values, counts = np.unique(x, return_counts=True)
return [("value_count__value_\"{}\"".format(value), feature_calculators.value_count(x, value))
for value in values]
@feature_calculators.set_property("fctype", "simple")
def last(x):
"""Return the last value of x.
:param x: the time series on which to calculate the feature.
:type x: pandas.Series
:return: the value of this feature
:return type: list
"""
return x[-1]
@feature_calculators.set_property("fctype", "simple")
def first(x):
"""Return the first value of x.
:param x: the time series on which to calculate the feature.
:type x: pandas.Series
:return: the value of this feature
:return type: list
"""
return x[0]
@feature_calculators.set_property("fctype", "simple")
def is_measured(x):
"""
Check if a variable has been measured – i.e. if the series is not empty.
:param x: the time series to calculate the feature of
:type x: np.ndarray
:return: the different feature values
:return type: float
"""
return float(bool(len(x)))
@feature_calculators.set_property("fctype", "simple")
def mode(x):
"""Return the mode of the parameter (i.e. most common value)
:param x: the time series to calculate the feature of
:type x: np.ndarray
:return: the different feature values
:return type: tuple
"""
c = Counter(x)
return tuple(x for x, count in c.items() if count == c.most_common(1)[0][1])
@feature_calculators.set_property("fctype", "simple")
@feature_calculators.set_property("minimal", True)
def count(x):
"""
Returns the number of elements in x
:param x: the time series to calculate the feature of
:type x: np.ndarray
:return: the value of this feature
:return type: int
"""
return len(x)
custom_functions = [value_count_all, last, first, is_measured, mode, count]
for func in custom_functions:
setattr(feature_calculators, func.__name__, func)
params = tsfresh.feature_extraction.EfficientFCParameters()
params.update({func.__name__: None for func in custom_functions})
ts = tsfresh.extract_features(df, column_id="id", default_fc_parameters=params)
print(ts)
So IMO there is nothing to do within tsfresh (and this issue can be closed), my previous solution was not simpler than this one.
This didn't use to work (see my comment above https://github.com/blue-yonder/tsfresh/issues/482#issuecomment-465652297), has it been fixed/changed in recent updates?
No idea, I just checked and it works on Linux.
You could borrow a simple concept from genetic programming tools such as DEAP. They have an abstraction called a PrimitiveSet which is a specialized container for function Metadata. You call addPrimitive and pass in a reference to your function, and lists if the input and output types. This works very well for strongly typed GP. The type and arity Metadata also assists greatly in automated selection of functions that return / accept a certain type, etc.
This is how I maintain my poor version of handmade time series extraction functions.
The example posted by @dbarbier threw the following error on my Macbook (OS: Big Sur, Python 3.8.5, tsfresh: 0.18.0), but worked on my Linux (Ubuntu 20.04.2 LTS, Python 3.8.5, tsfresh: 0.18.0).
So, there still seem to be some (OS-specific?) problems in the parallelization part... (When I pass n_jobs=0
to extract_features(), it works without an error).
AttributeError Traceback (most recent call last)
<ipython-input-15-95f18b28a331> in <module>
98 params.update({func.__name__: None for func in custom_functions})
99
--> 100 ts = tsfresh.extract_features(df, column_id="id", default_fc_parameters=params)
101 print(ts)
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in extract_features(timeseries_container, default_fc_parameters, kind_to_fc_parameters, column_id, column_sort, column_kind, column_value, chunksize, n_jobs, show_warnings, disable_progressbar, impute_function, profile, profiling_filename, profiling_sorting, distributor, pivot)
150 warnings.simplefilter("default")
151
--> 152 result = _do_extraction(df=timeseries_container,
153 column_id=column_id, column_value=column_value,
154 column_kind=column_kind,
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _do_extraction(df, column_id, column_value, column_kind, column_sort, default_fc_parameters, kind_to_fc_parameters, n_jobs, chunk_size, disable_progressbar, show_warnings, distributor, pivot)
253 show_warnings=show_warnings)
254
--> 255 result = distributor.map_reduce(_do_extraction_on_chunk, data=data,
256 chunk_size=chunk_size,
257 function_kwargs=kwargs)
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in map_reduce(self, map_function, data, function_kwargs, chunk_size, data_length)
211 result = self.distribute(_function_with_partly_reduce, chunk_generator, map_kwargs),
212
--> 213 result = list(itertools.chain.from_iterable(result))
214
215 self.close()
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tqdm/std.py in __iter__(self)
1176
1177 try:
-> 1178 for obj in iterable:
1179 yield obj
1180 # Update and possibly print the progressbar.
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/multiprocessing/pool.py in next(self, timeout)
866 if success:
867 return value
--> 868 raise value
869
870 __next__ = next # XXX
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/multiprocessing/pool.py in worker()
123 job, i, func, args, kwds = task
124 try:
--> 125 result = (True, func(*args, **kwds))
126 except Exception as e:
127 if wrap_exception and func is not _helper_reraises_exception:
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in _function_with_partly_reduce()
41 kwargs = kwargs or {}
42 results = (map_function(chunk, **kwargs) for chunk in chunk_list)
---> 43 results = list(itertools.chain.from_iterable(results))
44 return results
45
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in <genexpr>()
40 """
41 kwargs = kwargs or {}
---> 42 results = (map_function(chunk, **kwargs) for chunk in chunk_list)
43 results = list(itertools.chain.from_iterable(results))
44 return results
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _do_extraction_on_chunk()
335 warnings.simplefilter("default")
336
--> 337 return list(_f())
~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _f()
294 def _f():
295 for function_name, parameter_list in fc_parameters.items():
--> 296 func = getattr(feature_calculators, function_name)
297
298 # If the function uses the index, pass is at as a pandas Series.
AttributeError: module 'tsfresh.feature_extraction.feature_calculators' has no attribute 'last'
Thanks for all your input. After some thoughts and reading your ideas I have now tried to implement a possible solution in #845. Could anyone of you have a look, @dbarbier, @kmax12, @nobuyukioishi @michetonu? I am now using cloudpickle under the hood to make the settings pickle-able and support user-defined functions in the settings dictionary directly.
@nils-braun amazing! That seems to be working perfectly :)
Slightly convoluted having to create a dictionary with both function names and callables as keys. For a slight more elegant solution (as a future improvement) maybe we could think about passing the module containing the functions as an extra parameter to extract_features
instead, and fetch the functions under the hood. Not a huge problem and I'm not sure if this would create problems with the pickling, so for now this is fine I'd say.
Good to go from my end, nice job 👍
@nils-braun looks great, but I can only test on Linux; maybe you could run Github actions on MacOS and/or Windows?
@dbarbier I tested it on both MacOS and Linux and it seems to work fine on both!
@nils-braun Thank you for your quick and great work! (Sorry for the late response)
Closed in #845 and described in https://tsfresh.readthedocs.io/en/latest/text/how_to_add_custom_feature.html.