cypari2
cypari2 copied to clipboard
a PariThreadPool to handle multithreading via Python
Create a class PariThreadPool
hepler that allows to use the multithreading capabilities from concurrent.futures
.
Fix #107
Currently not compatible with python 3.6: https://github.com/kliem/cypari2/actions/runs/2362588414
We can of course push the required python version, but this needs to be addressed.
(Btw, the current github workflow needs to be modified such that cysignals links correctly to pari, otherwise lots of tests have a time out.)
Sage requires python >= 3.7, so there is nothing wrong with requiring this for the next release here.
Sorry for the delay: I was at a conference last week. I just checked, and this PR solves our issue in the LMFDB. Thank you very much @videlec!
When I tried the original test in #107, it still segfaults
sage: import cypari2
sage: pari = cypari2.Pari()
sage: from concurrent.futures import ThreadPoolExecutor
sage: with ThreadPoolExecutor() as e:
....: j = e.submit(pari.issquarefree, 15)
....:
sage: j.result()
...
SignalError: Segmentation fault
but the LMFDB debugger seems to work (I'm not sure where the difference is coming from).
For me, the debugger still failed.
When I tried the original test in #107, it still segfaults
What did you expect?
My fault for the confusion I think. We'd taken some measures to avoid the bug, and I tried a test that seemed to work and thought it was fixed in a very simple way. Let me try again for real and I'll report back.
I tried the example in the documentation of PariThreadPool
and got the following error:
sage: from concurrent.futures import ThreadPoolExecutor, as_completed
sage: from cypari2 import Pari, PariThreadPool
sage: pari = Pari()
sage: pari.default('nbthreads', 1)
sage: max_workers = 4
sage: pari_pool = PariThreadPool(max_workers)
sage: square_free = []
sage: with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor:
....: futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)}
....: for future in as_completed(futures):
....: n = futures[future]
....: if future.result():
....: square_free.append(n)
....:
---------------------------------------------------------------------------
SystemError Traceback (most recent call last)
<ipython-input-8-8089aed0cb94> in <module>
3 for future in as_completed(futures):
4 n = futures[future]
----> 5 if future.result():
6 square_free.append(n)
7
/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440
441 self._condition.wait(timeout)
/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py in __get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py in run(self)
56
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
60 self.future.set_exception(exc)
cypari2/auto_instance.pxi in cypari2.pari_instance.Pari_auto.issquarefree()
cypari2/gen.pyx in cypari2.gen.objtogen()
cypari2/stack.pyx in cypari2.stack.new_gen_noclear()
cypari2/stack.pyx in cypari2.stack.Gen_stack_new()
SystemError: objects on PARI stack in invalid order (first: 0x2a223bfe8; next: 0x196b23000)
I think I reinstalled cypari into Sage correctly (I added a multithreading.patch file with the contents of this PR into $SAGE_ROOT/build/pkgs/cypari/patches
before building Sage 9.7-beta1
), but apparently whatever I'm doing with homebrew screwed up my 9.6 install so I can't check whether the error message is different from an unmodifed Sage.
I have to work on some other projects today, but will return to this. Thanks for the help Vincent, and sorry for my cluelessness yesterday.
I fixed my Sage 9.6 installation but, of course, can't run this exact code there since the PariThreadPool
is new. It looks like the modification to new_gen_noclear
of adding x = gcopy(x)
isn't enough, but I don't know what constraint Pari has for objects on the stack that is being violated.
Working with @videlec at Sage Days 117, we modified the code above that failed by changing n
to pari(n)
in a couple places, and now it works:
sage: from concurrent.futures import ThreadPoolExecutor, as_completed
sage: from cypari2 import Pari, PariThreadPool
sage: pari = Pari()
sage: pari.default('nbthreads', 1)
sage: max_workers = 4
sage: pari_pool = PariThreadPool(max_workers)
sage: square_free = []
sage: with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor:
....: futures = {executor.submit(pari.issquarefree, pari(n)): pari(n) for n in range(10**6, 10**6 + 1000)}
....: for future in as_completed(futures):
....: n = futures[future]
....: if future.result():
....: square_free.append(n)
sage: len(square_free)
606
The hypothesis is that the threads are manipulating the main pari stack since there is no lock implemented.
What does pari(n)
achieve?
It means that the individual threads aren't responsible for converting python ints into pari integers, and thus don't touch the main pari stack.
As another data point, when running on linux both versions work (with and without pari(n)
). The earlier failure was on Mac OS.
I happened to run doctests on number_field.py
with this patch merged, and got the following warning before a giant gdb traceback.
doctest:warning
File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.8/threading.py", line 921, in _bootstrap_inner
self._started.set()
File "/usr/lib/python3.8/threading.py", line 528, in set
self._cond.notify_all()
File "/usr/lib/python3.8/threading.py", line 371, in notify_all
self.notify(len(self._waiters))
File "/usr/lib/python3.8/threading.py", line 354, in notify
waiters_to_notify = _deque(_islice(all_waiters, n))
File "/usr/lib/python3.8/warnings.py", line 109, in _showwarnmsg
sw(msg.message, msg.category, msg.filename, msg.lineno,
:
RuntimeWarning: cypari2 leaked 140374405617552 bytes on the PARI stack