d6tflow
d6tflow copied to clipboard
Reuse all later tasks to keep the DRY principle?
I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.
# _tasks.py
import d6tflow
class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)
@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels
@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py
import _tasks
import d6tflow
from sklearn import svm
@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
kernel = d6tflow.Parameter(default = 'linear')
def run(self):
data = self.inputLoad()
model_svm = svm.SVR(kernel = self.kernel)
model_svm.fit(data['train_data'], data['train_labels'])
model_svm.score(data['valid_data'], data['valid_labels'])
# TODO: self.save model artifacts
Context: I would obviously want to reuse this as much as possible.
Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow? Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?
Solution A: It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:
- Preprocess the dataframe
- Export to a pickle (or csv)
- Read the path to the exported file in as a parameter to the TaskLoadDataframe so I could run the WorkFlow, and continue on.
Solution B: I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?
class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
def run(self, dataframe):
self.save(dataframe)
I don't think the source code allows for this, what would the syntax to run that as a workfow?
Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.
Thoughts? Great work on this by the way.