luigi
luigi copied to clipboard
can not use MockTarget with dynamic dependencies
The follow code throws:
Traceback (most recent call last):
File "lib/python3.9/site-packages/luigi/worker.py", line 429, in check_complete
is_complete = check_complete_cached(task, completion_cache)
File "lib/python3.9/site-packages/luigi/worker.py", line 414, in check_complete_cached
is_complete = task.complete()
File "lib/python3.9/site-packages/luigi/task.py", line 594, in complete
return all(map(lambda output: output.exists(), outputs))
File "lib/python3.9/site-packages/luigi/task.py", line 594, in <lambda>
return all(map(lambda output: output.exists(), outputs))
File "lib/python3.9/site-packages/luigi/mock.py", line 111, in exists
return self.path in self.fs.get_all_data()
File "lib/python3.9/site-packages/luigi/mock.py", line 50, in get_all_data
MockFileSystem._data = multiprocessing.Manager().dict()
File "/usr/local/lib/python3.9/multiprocessing/context.py", line 57, in Manager
m.start()
File "/usr/local/lib/python3.9/multiprocessing/managers.py", line 554, in start
self._process.start()
File "/usr/local/lib/python3.9/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
#!/usr/bin/python3
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# -*- coding: utf-8 -*-
import luigi
from luigi.mock import MockTarget
class Square(luigi.Task):
number = luigi.IntParameter()
def run(self):
square = self.number * self.number
with self.output().open("w") as fd:
fd.write("{}\n".format(square))
fd.close()
def output(self):
return MockTarget("square_{}".format(self.number))
class MainTask(luigi.Task):
def run(self):
data = []
for number in range(0, 5):
data.append(Square(number=number))
yield data
with self.output().open("w") as fd:
fd.write("end")
fd.close()
def output(self):
return MockTarget("MainTask")
if __name__ == '__main__':
luigi.build(
[
MainTask()
],
workers=4,
parallel_scheduling=True,
local_scheduler=True)