tsfresh icon indicating copy to clipboard operation
tsfresh copied to clipboard

Better support/integration for custom functions

Open michetonu opened this issue 6 years ago • 24 comments

Hi!

First of all congrats for the great package, it's really well made.

I think extract_features() would highly benefit from an easier way to add custom feature calculation functions on the fly, without having to modify the feature_calculators source file. Having to clone the repository and change the source code is not very handy, especially when the code might need to be used in production environments. At the same time, I've been needing to use very project-specific functions which are not worth adding to the package through a PR.

My suggestion is therefore to add an optional parameter to extract_features, which could be a dictionary such as {'function_name': foo}, where foo is the actual function to be called. Then each function in default_fc_parameters is fetched from feature_calculators, if it's not found, it's called from the new dictionary. The flow would look something like this:

# Custom function to add
def count(x):
    return len(x)

# Dictionary of custom functions
custom_functions = {
     'count': count
}

# The settings dictionary looks exactly the same
settings = {
    'mean': None,
    'count': None
}

# The extract_features call just has an extra parameter
extract_features(settings, custom_functions=custom_functions ....}

In the backend, in feature_extraction.extraction._do_extraction_on_chunk, line 286, would be modified to something like:

for function_name, parameter_list in fc_parameters.items():
    if hasattr(feature_calculators, function_name):
         func = getattr(feature_calculators, function_name)
    else:
        func = custom_functions.get(function_name)
    # Maybe raise an informative error here if it's not found

It doesn't need to be exactly like this, but this way would be pretty straightforward to implement. If there is no reason why we shouldn't do this, I'm happy to make a PR and continue the technical discussion there.

Thanks!

michetonu avatar Jan 14 '19 11:01 michetonu

sorry for the late response. That sounds like a very interesting idea, do you want to submit a pr for that?

MaxBenChrist avatar Feb 15 '19 18:02 MaxBenChrist

@MaxBenChrist Sure! Will come soon.

michetonu avatar Feb 16 '19 11:02 michetonu

@MaxBenChrist sadly the approach of passing a dictionary with the functions does not work, because the pickling that occurs during the multiprocessing cannot deal with locally defined functions. The alternatives I came up with are:

  1. Pass the path to a separate module containing the custom functions (but it's not very pretty)
  2. Pass a class containing the custom functions (which might not very intuitive to make for a "regular" user)

Thoughts?

michetonu avatar Feb 20 '19 16:02 michetonu

Ah yeah, well that is unfortunate.

1.) will break if tsfresh is run on a cluster, right? 2.) I have no idea how this looks, do you have an exemplary code snippet?

Another option would be to replace the multiprocessing with something like pathos that uses dill which in turn seems to be able to pickle functions, see http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

MaxBenChrist avatar Feb 20 '19 17:02 MaxBenChrist

We had to deal with this when implementing custom primitives in Featuretools. We got around it by using cloudpickle.

We first turn the features into a string

features_string = cloudpickle.dumps(features)

Then, distribute that string. In each process, we then we deserialize back to the correct objects

features = cloudpickle.loads(features_string)

kmax12 avatar Feb 20 '19 19:02 kmax12

Is there anyone willing to implement this as a PR?

In principle I see the point why you would want to have custom feature calculators in tsfresh as simple as an additional argument. However, if it really means we need to start using cloudpickle etc. etc. I would argue against it. If you have a feature engineering pipeline you could just have your custom feature extractor as an additional step after tsfresh and add an additional column. All tsfresh would give you is the "groupby" as far as I can see. I guess I am wrong, so please correct me :-)

nils-braun avatar Nov 17 '19 16:11 nils-braun

@nils-braun well, having a double feature engineering (half of which in pandas, for instance) kinda goes against the speed and scalability advantages of tsfresh! For now, my solution is to have a forked tsfresh repo in my own Github, which has a custom feature_calculators module and a couple of other tweaks, and which I use in my projects. It's not ideal though – and I feel like being able to easily add functions would considerably improve the usability/flexibility of the package.

michetonu avatar Nov 17 '19 17:11 michetonu

@michetonu I could not find these changes in your repository, can you please tell which commits implement what you want? BTW I played with the solution discussed above and it seems to work fine with multiprocessing (tested only on Linux), see https://github.com/dbarbier/tsfresh/commit/1c04a14492401b189f4a67c15bcc23e88c266e8f

dbarbier avatar Nov 17 '19 18:11 dbarbier

Ah yes sorry it's on my company's repo: https://github.com/Pacmed/tsfresh

here's the diff, ignore the changes not in feature_calculators, as I don't think the code in there is used currently https://github.com/blue-yonder/tsfresh/compare/master...Pacmed:master

michetonu avatar Nov 17 '19 18:11 michetonu

@michetonu You wrote in a comment above

sadly the approach of passing a dictionary with the functions does not work, because the pickling that occurs during the multiprocessing cannot deal with locally defined functions.

Please have a look at branch https://github.com/dbarbier/tsfresh/tree/db/experiment-482, it seems to work just fine with multiprocessing, can you please confirm? See test-user-defined.py file in top-level directory.

The only issue I see is with functions returning multiple values.

dbarbier avatar Nov 17 '19 20:11 dbarbier

@michetonu @dbarbier is there still work on this? Or this is not an issue anymore because you can solve it as a post-processing step?

nils-braun avatar Apr 11 '20 08:04 nils-braun

@nils-braun I've been meaning to try @dbarbier 's solution but I've been very busy! I'll give it a shot hopefully this week and will let you know.

michetonu avatar Apr 14 '20 07:04 michetonu

@nils-braun @dbarbier sorry for the massive delay, but I'm happy to confirm that @dbarbier's code for custom functions works indeed. Great stuff! Shall we proceed with a PR?

michetonu avatar Jul 31 '20 08:07 michetonu

@michetonu Great, I will rebase this branch to main and submit a PR

dbarbier avatar Jul 31 '20 09:07 dbarbier

While working on the PR I had another idea; in fact _do_extraction_on_chunk looks for functions in tsfresh.feature_extraction.feature_calculators, so I wondered if one could add their own functions to this package, and this is indeed feasible. Here is a full working example:

import tsfresh
from tsfresh.feature_extraction import feature_calculators
import pandas as pd
import numpy as np
from collections import Counter

np.random.seed(42)
df = pd.DataFrame(np.concatenate([np.repeat(range(200),10)[:,np.newaxis],
                                  np.tile(range(10),200)[:,np.newaxis],
                                  np.random.randn(2000, 1)], axis=1), columns=["id", "time", "x"])

# Custom functions to add
@feature_calculators.set_property("fctype", "combiner")
def value_count_all(x, param):
    """
    Returns the number of values in x

    :param x: the time series on which to calculate the feature.
    :type x: pandas.Series
    :param param: None
    :return: the value of this feature
    :return type: list
    """
    values, counts = np.unique(x, return_counts=True)

    return [("value_count__value_\"{}\"".format(value), feature_calculators.value_count(x, value))
            for value in values]


@feature_calculators.set_property("fctype", "simple")
def last(x):
    """Return the last value of x.

    :param x: the time series on which to calculate the feature.
    :type x: pandas.Series
    :return: the value of this feature
    :return type: list
    """
    return x[-1]

@feature_calculators.set_property("fctype", "simple")
def first(x):
    """Return the first value of x.

    :param x: the time series on which to calculate the feature.
    :type x: pandas.Series
    :return: the value of this feature
    :return type: list
    """
    return x[0]


@feature_calculators.set_property("fctype", "simple")
def is_measured(x):
    """
    Check if a variable has been measured – i.e. if the series is not empty.

    :param x: the time series to calculate the feature of
    :type x: np.ndarray
    :return: the different feature values
    :return type: float
    """
    return float(bool(len(x)))


@feature_calculators.set_property("fctype", "simple")
def mode(x):
    """Return the mode of the parameter (i.e. most common value)

    :param x: the time series to calculate the feature of
    :type x: np.ndarray
    :return: the different feature values
    :return type: tuple
    """
    c = Counter(x)
    return tuple(x for x, count in c.items() if count == c.most_common(1)[0][1])


@feature_calculators.set_property("fctype", "simple")
@feature_calculators.set_property("minimal", True)
def count(x):
    """
    Returns the number of elements in x

    :param x: the time series to calculate the feature of
    :type x: np.ndarray
    :return: the value of this feature
    :return type: int
    """
    return len(x)

custom_functions = [value_count_all, last, first, is_measured, mode, count]
for func in custom_functions:
    setattr(feature_calculators, func.__name__, func)

params = tsfresh.feature_extraction.EfficientFCParameters()
params.update({func.__name__: None for func in custom_functions})

ts = tsfresh.extract_features(df, column_id="id", default_fc_parameters=params)
print(ts)

So IMO there is nothing to do within tsfresh (and this issue can be closed), my previous solution was not simpler than this one.

dbarbier avatar Jul 31 '20 16:07 dbarbier

This didn't use to work (see my comment above https://github.com/blue-yonder/tsfresh/issues/482#issuecomment-465652297), has it been fixed/changed in recent updates?

michetonu avatar Aug 01 '20 07:08 michetonu

No idea, I just checked and it works on Linux.

dbarbier avatar Aug 01 '20 09:08 dbarbier

You could borrow a simple concept from genetic programming tools such as DEAP. They have an abstraction called a PrimitiveSet which is a specialized container for function Metadata. You call addPrimitive and pass in a reference to your function, and lists if the input and output types. This works very well for strongly typed GP. The type and arity Metadata also assists greatly in automated selection of functions that return / accept a certain type, etc.

This is how I maintain my poor version of handmade time series extraction functions.

gminorcoles avatar Nov 04 '20 12:11 gminorcoles

The example posted by @dbarbier threw the following error on my Macbook (OS: Big Sur, Python 3.8.5, tsfresh: 0.18.0), but worked on my Linux (Ubuntu 20.04.2 LTS, Python 3.8.5, tsfresh: 0.18.0).

So, there still seem to be some (OS-specific?) problems in the parallelization part... (When I pass n_jobs=0 to extract_features(), it works without an error).

AttributeError                            Traceback (most recent call last)
<ipython-input-15-95f18b28a331> in <module>
     98 params.update({func.__name__: None for func in custom_functions})
     99 
--> 100 ts = tsfresh.extract_features(df, column_id="id", default_fc_parameters=params)
    101 print(ts)

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in extract_features(timeseries_container, default_fc_parameters, kind_to_fc_parameters, column_id, column_sort, column_kind, column_value, chunksize, n_jobs, show_warnings, disable_progressbar, impute_function, profile, profiling_filename, profiling_sorting, distributor, pivot)
    150             warnings.simplefilter("default")
    151 
--> 152         result = _do_extraction(df=timeseries_container,
    153                                 column_id=column_id, column_value=column_value,
    154                                 column_kind=column_kind,

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _do_extraction(df, column_id, column_value, column_kind, column_sort, default_fc_parameters, kind_to_fc_parameters, n_jobs, chunk_size, disable_progressbar, show_warnings, distributor, pivot)
    253                   show_warnings=show_warnings)
    254 
--> 255     result = distributor.map_reduce(_do_extraction_on_chunk, data=data,
    256                                     chunk_size=chunk_size,
    257                                     function_kwargs=kwargs)

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in map_reduce(self, map_function, data, function_kwargs, chunk_size, data_length)
    211             result = self.distribute(_function_with_partly_reduce, chunk_generator, map_kwargs),
    212 
--> 213         result = list(itertools.chain.from_iterable(result))
    214 
    215         self.close()

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tqdm/std.py in __iter__(self)
   1176 
   1177         try:
-> 1178             for obj in iterable:
   1179                 yield obj
   1180                 # Update and possibly print the progressbar.

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/multiprocessing/pool.py in next(self, timeout)
    866         if success:
    867             return value
--> 868         raise value
    869 
    870     __next__ = next                    # XXX

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/multiprocessing/pool.py in worker()
    123         job, i, func, args, kwds = task
    124         try:
--> 125             result = (True, func(*args, **kwds))
    126         except Exception as e:
    127             if wrap_exception and func is not _helper_reraises_exception:

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in _function_with_partly_reduce()
     41     kwargs = kwargs or {}
     42     results = (map_function(chunk, **kwargs) for chunk in chunk_list)
---> 43     results = list(itertools.chain.from_iterable(results))
     44     return results
     45 

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/utilities/distribution.py in <genexpr>()
     40     """
     41     kwargs = kwargs or {}
---> 42     results = (map_function(chunk, **kwargs) for chunk in chunk_list)
     43     results = list(itertools.chain.from_iterable(results))
     44     return results

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _do_extraction_on_chunk()
    335             warnings.simplefilter("default")
    336 
--> 337         return list(_f())

~/.pyenv/versions/anaconda3-5.3.1/envs/Test/lib/python3.8/site-packages/tsfresh/feature_extraction/extraction.py in _f()
    294     def _f():
    295         for function_name, parameter_list in fc_parameters.items():
--> 296             func = getattr(feature_calculators, function_name)
    297 
    298             # If the function uses the index, pass is at as a pandas Series.

AttributeError: module 'tsfresh.feature_extraction.feature_calculators' has no attribute 'last'

nobuyukioishi avatar Apr 16 '21 16:04 nobuyukioishi

Thanks for all your input. After some thoughts and reading your ideas I have now tried to implement a possible solution in #845. Could anyone of you have a look, @dbarbier, @kmax12, @nobuyukioishi @michetonu? I am now using cloudpickle under the hood to make the settings pickle-able and support user-defined functions in the settings dictionary directly.

nils-braun avatar Apr 17 '21 19:04 nils-braun

@nils-braun amazing! That seems to be working perfectly :)

Slightly convoluted having to create a dictionary with both function names and callables as keys. For a slight more elegant solution (as a future improvement) maybe we could think about passing the module containing the functions as an extra parameter to extract_features instead, and fetch the functions under the hood. Not a huge problem and I'm not sure if this would create problems with the pickling, so for now this is fine I'd say.

Good to go from my end, nice job 👍

michetonu avatar Apr 19 '21 12:04 michetonu

@nils-braun looks great, but I can only test on Linux; maybe you could run Github actions on MacOS and/or Windows?

dbarbier avatar Apr 19 '21 17:04 dbarbier

@dbarbier I tested it on both MacOS and Linux and it seems to work fine on both!

michetonu avatar Apr 19 '21 17:04 michetonu

@nils-braun Thank you for your quick and great work! (Sorry for the late response)

nobuyukioishi avatar May 03 '21 09:05 nobuyukioishi

Closed in #845 and described in https://tsfresh.readthedocs.io/en/latest/text/how_to_add_custom_feature.html.

nils-braun avatar Feb 19 '23 16:02 nils-braun