moabb
moabb copied to clipboard
Improve parallelisation of evaluations
After discussions at the braindecode code sprint and following up on #460, I think we should break down the evaluations into something like that:
class BaseEvaluation:
def __init__(
self,
...
n_nodes=1, # number of data chunks to load in memory in parallel.
n_jobs=1, # number of jobs per data chunk. One job fits one pipeline on one fold.
):
self.n_nodes = n_nodes
self.n_jobs = n_jobs
@abc.abstractmethod
def get_splits(self) -> list[dict, list[dict, list[int], list[int]]]:
"""
Return a list of pairs with:
* a dict of arguments to pass to self.paradigm.get_data to load a minimal data chunk
* a list of splits for this data chunk, i.e. triplets with:
- dict describing the split,
- list of train indices,
- list of test indices.
"""
pass
def process(self, pipelines):
splits = self.get_splits()
splits_todo = []
for datachunk_args, chunk_splits in splits:
missing_results = self.results.not_yet_computed(datachunk_args, chunk_splits, pipelines)
if missing_results:
splits_todo.append((datachunk_args, chunk_splits, missing_results))
Parallel(n_jobs=self.n_nodes)(delayed(self.process_datachunk)(pipelines, *args) for args in splits_todo)
return self.results.to_dataframe(pipelines=pipelines, ...)
def process_datachunk(self, pipelines, datachunk_args, chunk_splits, missing_results):
X, y, metadata = self.paradigm.get_data(**datachunk_args)
Parallel(n_jobs=self.n_jobs)(delayed(self.process_split)(p, X, y, metadata, *split) for split in chunk_splits for p in pipelines)
def process_split(self, clf, X, y, metadata, split_args, train_idx, test_idx):
clf = deepcopy(clf)
clf.fit(X[train_idx], y[train_idx])
score = clf.score(X[test_idx], y[test_idx])
self.results.add(datachunk_args, split_args, clf, score)
This would remove all the for loops we have in the different evaluations and allow for larger parallelisation.
Ping @tomMoral, to join the conversation
The proposed pattern couples the code that perform the evaluation (run the code + parallelization) from the process that decide the split. I would recommend to further decouple them, in light of what scikit-learn does, so that the API is similar, making it easy to grasp the various concepts.
Basically, the get_split is serving the same functionality as the BaseCrossValidator object in scikit-learn.
The API works with three methods:
__init__: this setup the parameters of the split if any.get_n_split: this method would take a dataset and returns the number of splits (for instance with leave one subject out, the number of subjects).split: This method is a generator, which takes the dataset as input and when iterated on, gives thetrain_idx, test_idx.
Taking back the Evaluation object, you would have a single one I guess, such that:
memory = joblib.Memory(location="__cache__")
class Evaluation:
def __init__(
self,
...
n_nodes=1, # number of data chunks to load in memory in parallel.
n_jobs=1, # number of jobs per data chunk. One job fits one pipeline on one fold.
cv="intersubject",
):
self.n_nodes = n_nodes
self.n_jobs = n_jobs
if isinstance(cv, str): # make it easy if you want default parameters for cv
cv = CV_CLASSES[cv]()
self.cv= cv
def process(self, pipelines, datasets):
results = Parallel(n_jobs=self.n_jobs)(
delayed(self.process_split)(p, d, metadata, train_idx, test_idx)
for p in pipelines for d in datasets
for (train_idx, test_idx) in self.cv.split(d)
)
return pd.DataFrame(results)
@memory.cache
def process_split(self, clf, dataset, metadata, split_args, train_idx, test_idx):
clf = deepcopy(clf)
X_train, X_test, y_train, y_test, metadata = self.paradigm.get_data(
**datachunk_args, train_idx, test_idx
)
clf.fit(X_train, y_train)
score = clf.score(X_test, y_test)
return {'metadata': datachunk_args, 'clf': clf, 'score': score}
Note that I changed the manual caching to use joblib.Memory which is done for caching calls a a function and I flattened the parallelism (joblib is bad with nested parallelism).
Thanks @tomMoral for your feedback!! But not sure if this would completely work because have some quite specific constraints:
- One of the most expensive steps is the call to
paradigm.get_databecause it loads from disk and pre-processes the data, so we would like to call it only once for all the splits and pipelines. Do you think this could be achieved throughjoblib.Memory? - Additionally, some datasets are quite large (more than 50GB), so we need to be able to:
- only load the minimal amount of data, i.e. one subject (except for cross-subject case),
- do all the evaluations on it,
- free the memory,
- load the next minimal data chunk...
- Finally, I don't think we can define
test_idxandtrain_idxbefore loading the data because the only info we have about the datasets is the number of subjects they contain. We don't know the number of sessions or the number of examples per session before loading the data. Maybe we should try to change that? @bruAristimunha @sylvchev
This is why I proposed this nested parallelism. Maybe an in-between would be to implement BaseCrossValidators but that would receive only the data of one subject as input instead of a whole dataset?