d6tflow icon indicating copy to clipboard operation
d6tflow copied to clipboard

Reuse all later tasks to keep the DRY principle?

Open DOH-Manada opened this issue 3 years ago • 0 comments

I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.

# _tasks.py

import d6tflow

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)

@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels

@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py

import _tasks

import d6tflow
from sklearn import svm

@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
    kernel = d6tflow.Parameter(default = 'linear')
    
    def run(self):
        data = self.inputLoad()
        model_svm = svm.SVR(kernel = self.kernel)
        model_svm.fit(data['train_data'], data['train_labels'])
        model_svm.score(data['valid_data'], data['valid_labels'])
        # TODO: self.save model artifacts

Context: I would obviously want to reuse this as much as possible.

Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow? Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?

image

Solution A: It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:

  1. Preprocess the dataframe
  2. Export to a pickle (or csv)
  3. Read the path to the exported file in as a parameter to the TaskLoadDataframe so I could run the WorkFlow, and continue on.

Solution B: I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):

    def run(self, dataframe):
        self.save(dataframe)

I don't think the source code allows for this, what would the syntax to run that as a workfow?

Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.

Thoughts? Great work on this by the way.

DOH-Manada avatar Aug 22 '21 23:08 DOH-Manada