d6tflow
d6tflow copied to clipboard
Child tasks
Guys, could you please tell me how can I create child tasks? Suppose, Task1 has a list as an output (A) Then, Task2 must be executed for each value of A. It will have a list as an output (B). Then I want to run Task3 for each result of list B. How can I do it? I've create the aggregator:
class TaskAggregatorJavaFiles(d6tflow.tasks.TaskAggregator):
dir_to_search = d6tflow.Parameter()
dir_to_save = d6tflow.Parameter()
def run(self):
test_files = set(Path(self.dir_to_search).glob('**/*Test*.java'))
not_test_files = set(Path(self.dir_to_search).glob('**/*.java'))
files_without_tests = list(not_test_files.difference(test_files))
for _, file in enumerate(files_without_tests):
print(_)
yield TaskPreprocessJavaFile(file=str(file))
I see, that it iterates over files. But then the child Task is not spawned:
class TaskPreprocessJavaFile(d6tflow.tasks.TaskPickle):
file = d6tflow.Parameter()
def run(self):
# it is not executed in parallel
self.save({'bla': 'bla', "text": self.file})
The problem that they are not executed in parallel. How can I run asynchronous child (in parallel)
Can you take a look at this and tell us if this works? https://d6tflow.readthedocs.io/en/latest/advtasksdyn.html