d6tflow icon indicating copy to clipboard operation
d6tflow copied to clipboard

Child tasks

Open lyriccoder opened this issue 4 years ago • 1 comments

Guys, could you please tell me how can I create child tasks? Suppose, Task1 has a list as an output (A) Then, Task2 must be executed for each value of A. It will have a list as an output (B). Then I want to run Task3 for each result of list B. How can I do it? I've create the aggregator:

class TaskAggregatorJavaFiles(d6tflow.tasks.TaskAggregator):
    dir_to_search = d6tflow.Parameter()
    dir_to_save = d6tflow.Parameter()

    def run(self):
        test_files = set(Path(self.dir_to_search).glob('**/*Test*.java'))
        not_test_files = set(Path(self.dir_to_search).glob('**/*.java'))
        files_without_tests = list(not_test_files.difference(test_files))
        for _, file in enumerate(files_without_tests):
            print(_)
            yield TaskPreprocessJavaFile(file=str(file))

I see, that it iterates over files. But then the child Task is not spawned:

class TaskPreprocessJavaFile(d6tflow.tasks.TaskPickle):
    file = d6tflow.Parameter()

    def run(self):
        # it is not executed in parallel
        self.save({'bla': 'bla', "text": self.file})

The problem that they are not executed in parallel. How can I run asynchronous child (in parallel)

lyriccoder avatar Feb 02 '21 11:02 lyriccoder

Can you take a look at this and tell us if this works? https://d6tflow.readthedocs.io/en/latest/advtasksdyn.html

d6tdev avatar Apr 28 '21 02:04 d6tdev