stream.py
stream.py copied to clipboard
Updates
This library is awesome and deserves to be used in the python3 ecosystem. Tests pass with these updates (at least on my python 3.10.12 version).
Thank you for the PR!
I got pickle errors running the doctest and test suite in 3.10.12.
Doctest
**********************************************************************
File "/Users/aht/src/aht/stream.py/stream.py", line 820, in __main__.ProcessPool
Failed example:
range(10) >> ProcessPool(map(lambda x: x*x)) >> sum
Exception raised:
Traceback (most recent call last):
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/doctest.py", line 1350, in __run
exec(compile(example.source, filename, "single",
File "<doctest __main__.ProcessPool[0]>", line 1, in <module>
range(10) >> ProcessPool(map(lambda x: x*x)) >> sum
File "/Users/aht/src/aht/stream.py/stream.py", line 857, in __init__
p.start()
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/Users/aht/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'ProcessPool.__init__.<locals>.work'
**********************************************************************
1 items had failures:
1 of 1 in __main__.ProcessPool
***Test Failed*** 1 failures.
Full test suite
$ pytest test/*.py
FAILED test/asyncpool.py::test_ProcessPool - AttributeError: Can't pickle local object 'ProcessPool.__init__.<locals>.work'
FAILED test/collector.py::test_PCollector - AttributeError: Can't pickle local object 'ForkedFeeder.__init__.<locals>.feed'
FAILED test/executor.py::test_ProcessPool_submit - AttributeError: Can't pickle local object 'ProcessPool.__init__.<locals>.work'
FAILED test/executor.py::test_ProcessPool_cancel - AttributeError: Can't pickle local object 'ProcessPool.__init__.<locals>.work'
FAILED test/executor.py::test_ProcessPool_shutdown - AttributeError: Can't pickle local object 'ProcessPool.__init__.<locals>.work'
FAILED test/feeder.py::test_ForkedFeeder - AttributeError: Can't pickle local object 'ForkedFeeder.__init__.<locals>.feed'
FAILED test/sorter.py::test_PSorter - AttributeError: Can't pickle local object 'ForkedFeeder.__init__.<locals>.feed'
Right - I think I forgot about the pickle errors because this pickle-based aspect of parallelism in python gives me PTSD. I'll see if I can flatten things a bit so as not to confuse it.
I am not getting the same error on range(10) >> ProcessPool(map(lambda x: x*x)) >> sum
I used
python3 stream.py
pip install -e .
pytest test/*
These all pass. Note: nose seems to throw internal errors.
I'll add a .github/workflows file to check this with multiple python versions.
As you can see from the activity above, the final results show that parallelism just doesn't work on OSX or Windows because those platforms can't properly use closures in multiprocessing:
https://github.com/frobnitzem/stream.py/actions/runs/11696956004