luigi icon indicating copy to clipboard operation
luigi copied to clipboard

can not use MockTarget with dynamic dependencies

Open nesies opened this issue 2 years ago • 0 comments

The follow code throws:

Traceback (most recent call last):
  File "lib/python3.9/site-packages/luigi/worker.py", line 429, in check_complete
    is_complete = check_complete_cached(task, completion_cache)
  File "lib/python3.9/site-packages/luigi/worker.py", line 414, in check_complete_cached
    is_complete = task.complete()
  File "lib/python3.9/site-packages/luigi/task.py", line 594, in complete
    return all(map(lambda output: output.exists(), outputs))
  File "lib/python3.9/site-packages/luigi/task.py", line 594, in <lambda>
    return all(map(lambda output: output.exists(), outputs))
  File "lib/python3.9/site-packages/luigi/mock.py", line 111, in exists
    return self.path in self.fs.get_all_data()
  File "lib/python3.9/site-packages/luigi/mock.py", line 50, in get_all_data
    MockFileSystem._data = multiprocessing.Manager().dict()
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 57, in Manager
    m.start()
  File "/usr/local/lib/python3.9/multiprocessing/managers.py", line 554, in start
    self._process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
#!/usr/bin/python3
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# -*- coding: utf-8 -*-
import luigi
from luigi.mock import MockTarget


class Square(luigi.Task):
    number = luigi.IntParameter()

    def run(self):
        square = self.number * self.number
        with self.output().open("w") as fd:
            fd.write("{}\n".format(square))
        fd.close()

    def output(self):
        return MockTarget("square_{}".format(self.number))


class MainTask(luigi.Task):
    def run(self):
        data = []
        for number in range(0, 5):
            data.append(Square(number=number))
        yield data
        with self.output().open("w") as fd:
            fd.write("end")
        fd.close()

    def output(self):
        return MockTarget("MainTask")


if __name__ == '__main__':
    luigi.build(
        [
            MainTask()
        ],
        workers=4,
        parallel_scheduling=True,
        local_scheduler=True)

nesies avatar Oct 28 '22 11:10 nesies