dask-ml icon indicating copy to clipboard operation
dask-ml copied to clipboard

Lazy fitting

Open TomAugspurger opened this issue 6 years ago • 14 comments

Should we add a compute keyword to all the fit / partial_fit on estimators we implement?

This would aid with

  • debugging the graphs we build
  • Better scheduling (any examples here?)

This is somewhat difficult. As @jcrist summarized in https://github.com/dask/dask-searchcv/issues/19

Lazy evaluation conflicts with scikit-learn's api, and is tricky to support here in a way that is both intuitive and robust. For some configurations there is also the need to call get multiple times, which prevents lazy evaluation.

xref https://github.com/dask/dask-ml/pull/259

TomAugspurger avatar Jul 03 '18 20:07 TomAugspurger

Better scheduling (any examples here?)

Incremental with many parameters. We're prefer to build one large graph that gets a bit of data, passes it around to all of the models, gets the next bit of data, etc.. I think that we (you) did this manually using a compute=False bit of code while at BIDS.

mrocklin avatar Jul 03 '18 20:07 mrocklin

Better scheduling (any examples here?)

I fit and score serially in https://github.com/dask/dask-examples/pull/15:

inc = Incremental(...)

data = []
for i in range(40):
    inc.fit(X, y)
    data += [{'score': inc.score(X, y), ...}]

There's no reason for this to be serial – it can start training the next iteration while scoring the previous iteration.

stsievert avatar Jul 04 '18 14:07 stsievert

So it looks like the current fit functions tend to both do some computation, and then also finalize things back onto the collection. This finalization step seems to be what makes this problem hard if we also want to maintian the scikit-learn API.

I wonder if we might use the full Dask collections API here and make use of __dask_finalize__.

with dask.config.set({'ml.lazy': True}):
    inc = Incremental(sgd)
    for i in range(5):
        inc = inc.fit(X, y)
    inc = inc.compute()

cc @jcrist

mrocklin avatar Jul 12 '18 12:07 mrocklin

Looks like I meant __dask_postcompute__ rather than __dask_finalize__

mrocklin avatar Jul 12 '18 12:07 mrocklin

I'll play with that a bit today to see how things feel.

What would you expect the output of Incremental(sgd).fit(X, y, compute=False) to be? A Delayed object or an Incremental (which implements the dask collections API)? I'm leaning towards an Incremental.

TomAugspurger avatar Jul 12 '18 14:07 TomAugspurger

Although, thinking a bit further, when you try to do anything with a "delayed Incremental.fit`, it feels a lot like a dask.Delayed object.

inc = Incremental(sgd)
inc.fit(X, y, compute=False)

inc.coef_  # Delayed?
inc.score(X, y)  # Delayed?

TomAugspurger avatar Jul 12 '18 14:07 TomAugspurger

I think that it would be another Incremental object. coef_ and score are probably dask delayed objects, but in some cases they might also be dask arrays. I don't think that these strongly affect what we choose to make the result of .fit()

mrocklin avatar Jul 12 '18 14:07 mrocklin

Probably getting ahead of myself but dask grid search cv will likely also need to be modified to allow for delayed fit methods. This could be desireable for the new bagging estimator

js3711 avatar Jul 19 '18 00:07 js3711

I would like to build a high level graph which performs several RandomizedSearchCVs across different estimator types in parallel, possibly with different data sets, using a distributed system. What I find is that any attempt to client.submit() a graph containing a delayed .fit() hangs on the worker with no callstack or with this callstack:

File "lib/python3.6/threading.py", line 884, in _bootstrap self._bootstrap_inner()
File "lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run()
File "lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs)
File "lib/python3.6/site-packages/distributed/threadpoolexecutor.py", line 55, in _worker task.run()
File "lib/python3.6/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run result = self.fn(*self.args, **self.kwargs)
File "lib/python3.6/site-packages/distributed/worker.py", line 3206, in apply_function result = function(*args, **kwargs)
File "lib/python3.6/site-packages/dask_ml/model_selection/_search.py", line 1251, in fit for batch in ac.batches():
File "lib/python3.6/site-packages/distributed/client.py", line 4251, in batches yield self.next_batch(block=True)
File "lib/python3.6/site-packages/distributed/client.py", line 4223, in next_batch batch = [next(self)]
File "lib/python3.6/site-packages/distributed/client.py", line 4185, in __next__ self.thread_condition.wait(timeout=0.100)
File "lib/python3.6/threading.py", line 299, in wait gotit = waiter.acquire(True, timeout)

Reading this (and other) issues, I think the behavior I see is expected and that this feature would fix it. If I am wrong and this is expected to work I'm happy to produce a concise test example.

markkoob avatar Aug 29 '19 14:08 markkoob

@markkoob that may be a bit surprising. Are you passing dask objects to the delayed fit, or NumPy / pandas objects?

TomAugspurger avatar Aug 30 '19 14:08 TomAugspurger

I'm not certain I've tried fitting with dask.DataFrame types on the distributed client with a delayed fit, but I'm pretty sure I've tried most other combinations. I'll give that one a shot and try building a testcase that shows the behavior on a local cluster object.

markkoob avatar Aug 30 '19 15:08 markkoob

Okay I've been able to reproduce a couple of things with the LocalCluster. Hopefully that means I'm just doing something wrong!

import numpy as np
import pandas as pd

from dask import delayed
import dask.dataframe as dd
from distributed import Client, LocalCluster
from dask_ml.model_selection import RandomizedSearchCV

from sklearn.tree import DecisionTreeClassifier

def start_local_cluster():
    cluster = LocalCluster(n_workers=1)

    client = Client(cluster, asynchronous=True)
    return client, cluster

def load_data(x_size, y_size):
    data = pd.DataFrame(np.random.rand(x_size, y_size))
    return data

def load_labels(x_size, y_size):
    data = pd.DataFrame(np.random.randint(0,2,size=(x_size, y_size)))
    return data

client, cluster = start_local_cluster()

hp_dist = dict(class_weight=['balanced'],
               random_state=[0],
               criterion=['gini', 'entropy'],
               splitter=['best', 'random'],
               max_depth=[5, 10, 15, None],
               max_features=['auto', 'log2', 100, None])

est = DecisionTreeClassifier()

search = RandomizedSearchCV(estimator=est,
                            param_distributions=hp_dist,
                            n_iter=1,
                            cv=3)

# hangs, interrupt trace looks like it's tokenizing a bunch of stuff
# dfx = dd.from_pandas(load_data(1000, 10), npartitions=1)
# dfy = dd.from_pandas(load_labels(1000, 1), npartitions=1)
# task = delayed(search.fit)(dfx, dfy)
# future = client.submit(task)

# assertion failure, assert not is_dask_collection(x)
# dfx = dd.from_pandas(load_data(100, 10), npartitions=1)
# dfy = dd.from_pandas(load_labels(100, 1), npartitions=1)
# search.fit(dfx, dfy)

# Evaluates immediately despite delayed operands
# dfx = delayed(load_data)(1000, 100)
# dfy = delayed(load_labels)(1000, 1)
# search.fit(dfx, dfy)

I'm still trying to figure out how to get the acquire hang to happen predictably, but I'm starting to suspect it might be specific to delayed unpickling which was a shortcut I was taking for testing purposes and not necessarily a part of my use case.

markkoob avatar Aug 30 '19 20:08 markkoob

Hey all,

IMO, lazy fit will be a fantastic addition. I had some real pain using the StandardScaler with the column transformer because it called compute for each column. I have built here a rough prototype that implements scalers as a custom collection which was hinted at by @mrocklin and perhaps an approach like this may be the future. Would love to hear your thoughts. I think it would also have the advantage the ColumnsTranformers could be built on top of this in the same way thus using the HighLevelGraph from each of the scalers.


from dask import array, dataframe
import dask

from dask.base import DaskMethodsMixin, replace_name_in_key, is_dask_collection
from dask.optimization import cull

class StandardScaler(DaskMethodsMixin):
    def __init__(self, mean=None, `std=None):`
        self._mean = mean
        self._std = std
    @property
    def mean(self):
        if self._mean is not None:
            return self._mean
        raise ValueError("Mean was none - Have you fit?")
    @mean.setter
    def mean(self, input):
        self._mean = input
        
    @property 
    def std(self):
        if self._mean is not None:
            return self._std
        raise ValueError("Std was none - Have you fit?")
    
    @std.setter
    def std(self,input):
        self._std = input
    
    def fit(self, data):
        self.mean = data.mean()
        self.std = data.std()
        return self
    
    def transform(self, data):
        return (data-self.mean)/self.std
    
    def inverse_transform(self, data):
        return data*self.std + self.mean 
        
    @property 
    def _graph(self):
        # Only compute in the case that both the mean and std are dask collections 
        if is_dask_collection(self.mean) and is_dask_collection(self.std):
            return dask.highlevelgraph.HighLevelGraph.merge(self.mean.__dask_graph__(),self.std.__dask_graph__())
        
    def __dask_graph__(self):
        return self._graph
    
    def __dask_keys__(self):
        if self._graph:
            return self.mean.__dask_keys__() + self.std.__dask_keys__()
        else:
            return []

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # The post compute rebuilds a StandardScaler with evaluated versions 
        # of the parameters
        return StandardScaler._rebuild, ()

    @staticmethod
    def _rebuild(*args):
        return Tuple(*args[0])
    
    def __repr__(self):
        return repr(f"Mean - {self.mean}, Std - {self.std}")
da = array.from_array([1,2,3])
b = StandardScaler().fit(da)
i = b.transform(da)
i = i.compute()
b = b.compute()

MatthewLennie avatar Apr 22 '21 12:04 MatthewLennie

Further to my previous comment.

A rough idea of a Column transformer implementation could be done thusly. This would just work with data in the form of dataframes, but that can obviously be extended easily enough. Also scalers can be turned into a property etc.. this is just a quick and dirty prototype, but I think a good start for discussion.

class ColumnTransformer(DaskMethodsMixin):
    def __init__(self, scalers=None):

        self._scalers = scalers
        self.non_dask_scalers = {}
        self.dask_scalers = {}


    def fit(self, data):
        for key,series in data.items():
            self._scalers[key] = self._scalers[key].fit(series)
        return self
    
    def transform(self, data):
        for key, series in data.items():
            data[key] = scaler_dicts[key].transform(series) 
        return data

    def _graph(self):
        # Only compute in the case that both the mean and std are dask collections 
        self.dask_scalers = {}
        self.non_dask_scalers = {}
        for key, scaler in self._scalers.items():
            if is_dask_collection(scaler):
                self.dask_scalers[key]= scaler
            else:
                self.non_dask_scalers[key] = scaler
                
        
    def __dask_graph__(self):
        self._graph()
        graphs = [x.__dask_graph__() for x in self.dask_scalers.values()]
        return dask.highlevelgraph.HighLevelGraph.merge(*graphs)
    
    def __dask_keys__(self):
        self._graph()
        keys = [x.__dask_keys__() for x in self.dask_scalers.values()]
        return keys

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # The post compute rebuilds a StandardScaler with evaluated versions 
        # of the parameters
        return ColumnTransformer._rebuild, ((self.dask_scalers),(self.non_dask_scalers))

    @staticmethod
    def _rebuild(*args):
        scalers = args[1]
        non_dask_scalers = args[2]
        for computed_scaler,(key, scaler) in zip(args[0],scalers.items()):
            scalers[key] = scaler._rebuild(computed_scaler)
        scalers = {**scalers, **args[2]}
        return ColumnTransformer(scalers)
    
    def __repr__(self):
        return repr(self._scalers)

MatthewLennie avatar Apr 26 '21 11:04 MatthewLennie