cypari2 icon indicating copy to clipboard operation
cypari2 copied to clipboard

a PariThreadPool to handle multithreading via Python

Open videlec opened this issue 2 years ago • 14 comments

Create a class PariThreadPool hepler that allows to use the multithreading capabilities from concurrent.futures.

Fix #107

videlec avatar May 19 '22 12:05 videlec

Currently not compatible with python 3.6: https://github.com/kliem/cypari2/actions/runs/2362588414

We can of course push the required python version, but this needs to be addressed.

(Btw, the current github workflow needs to be modified such that cysignals links correctly to pari, otherwise lots of tests have a time out.)

kliem avatar May 21 '22 20:05 kliem

Sage requires python >= 3.7, so there is nothing wrong with requiring this for the next release here.

kliem avatar May 31 '22 04:05 kliem

Sorry for the delay: I was at a conference last week. I just checked, and this PR solves our issue in the LMFDB. Thank you very much @videlec!

roed314 avatar Jun 01 '22 18:06 roed314

When I tried the original test in #107, it still segfaults

sage: import cypari2
sage: pari = cypari2.Pari()
sage: from concurrent.futures import ThreadPoolExecutor
sage: with ThreadPoolExecutor() as e:
....:     j = e.submit(pari.issquarefree, 15)
....: 
sage: j.result()
...
SignalError: Segmentation fault

but the LMFDB debugger seems to work (I'm not sure where the difference is coming from).

roed314 avatar Jun 01 '22 22:06 roed314

For me, the debugger still failed.

edgarcosta avatar Jun 02 '22 01:06 edgarcosta

When I tried the original test in #107, it still segfaults

What did you expect?

videlec avatar Jun 02 '22 09:06 videlec

My fault for the confusion I think. We'd taken some measures to avoid the bug, and I tried a test that seemed to work and thought it was fixed in a very simple way. Let me try again for real and I'll report back.

roed314 avatar Jun 02 '22 12:06 roed314

I tried the example in the documentation of PariThreadPool and got the following error:

sage: from concurrent.futures import ThreadPoolExecutor, as_completed
sage: from cypari2 import Pari, PariThreadPool
sage: pari = Pari()
sage: pari.default('nbthreads', 1)
sage: max_workers = 4
sage: pari_pool = PariThreadPool(max_workers)
sage: square_free = []
sage: with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor:
....:     futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)}
....:     for future in as_completed(futures):
....:         n = futures[future]
....:         if future.result():
....:             square_free.append(n)
....: 
---------------------------------------------------------------------------
SystemError                               Traceback (most recent call last)
<ipython-input-8-8089aed0cb94> in <module>
      3     for future in as_completed(futures):
      4         n = futures[future]
----> 5         if future.result():
      6             square_free.append(n)
      7 

/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py in result(self, timeout)
    437                     raise CancelledError()
    438                 elif self._state == FINISHED:
--> 439                     return self.__get_result()
    440 
    441                 self._condition.wait(timeout)

/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py in __get_result(self)
    389         if self._exception:
    390             try:
--> 391                 raise self._exception
    392             finally:
    393                 # Break a reference cycle with the exception in self._exception

/usr/local/Cellar/[email protected]/3.9.13_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py in run(self)
     56 
     57         try:
---> 58             result = self.fn(*self.args, **self.kwargs)
     59         except BaseException as exc:
     60             self.future.set_exception(exc)

cypari2/auto_instance.pxi in cypari2.pari_instance.Pari_auto.issquarefree()

cypari2/gen.pyx in cypari2.gen.objtogen()

cypari2/stack.pyx in cypari2.stack.new_gen_noclear()

cypari2/stack.pyx in cypari2.stack.Gen_stack_new()

SystemError: objects on PARI stack in invalid order (first: 0x2a223bfe8; next: 0x196b23000)

I think I reinstalled cypari into Sage correctly (I added a multithreading.patch file with the contents of this PR into $SAGE_ROOT/build/pkgs/cypari/patches before building Sage 9.7-beta1), but apparently whatever I'm doing with homebrew screwed up my 9.6 install so I can't check whether the error message is different from an unmodifed Sage.

I have to work on some other projects today, but will return to this. Thanks for the help Vincent, and sorry for my cluelessness yesterday.

roed314 avatar Jun 02 '22 14:06 roed314

I fixed my Sage 9.6 installation but, of course, can't run this exact code there since the PariThreadPool is new. It looks like the modification to new_gen_noclear of adding x = gcopy(x) isn't enough, but I don't know what constraint Pari has for objects on the stack that is being violated.

roed314 avatar Jun 06 '22 17:06 roed314

Working with @videlec at Sage Days 117, we modified the code above that failed by changing n to pari(n) in a couple places, and now it works:

sage: from concurrent.futures import ThreadPoolExecutor, as_completed
sage: from cypari2 import Pari, PariThreadPool
sage: pari = Pari()
sage: pari.default('nbthreads', 1)
sage: max_workers = 4
sage: pari_pool = PariThreadPool(max_workers)
sage: square_free = []
sage: with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor:
....:     futures = {executor.submit(pari.issquarefree, pari(n)): pari(n) for n in range(10**6, 10**6 + 1000)}
....:     for future in as_completed(futures):
....:         n = futures[future]
....:         if future.result():
....:             square_free.append(n)
sage: len(square_free)
606

The hypothesis is that the threads are manipulating the main pari stack since there is no lock implemented.

roed314 avatar Feb 08 '23 22:02 roed314

What does pari(n) achieve?

edgarcosta avatar Feb 08 '23 23:02 edgarcosta

It means that the individual threads aren't responsible for converting python ints into pari integers, and thus don't touch the main pari stack.

roed314 avatar Feb 08 '23 23:02 roed314

As another data point, when running on linux both versions work (with and without pari(n)). The earlier failure was on Mac OS.

roed314 avatar Feb 09 '23 00:02 roed314

I happened to run doctests on number_field.py with this patch merged, and got the following warning before a giant gdb traceback.

doctest:warning
  File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.8/threading.py", line 921, in _bootstrap_inner
    self._started.set()
  File "/usr/lib/python3.8/threading.py", line 528, in set
    self._cond.notify_all()
  File "/usr/lib/python3.8/threading.py", line 371, in notify_all
    self.notify(len(self._waiters))
  File "/usr/lib/python3.8/threading.py", line 354, in notify
    waiters_to_notify = _deque(_islice(all_waiters, n))
  File "/usr/lib/python3.8/warnings.py", line 109, in _showwarnmsg
    sw(msg.message, msg.category, msg.filename, msg.lineno,
:
RuntimeWarning: cypari2 leaked 140374405617552 bytes on the PARI stack

roed314 avatar Feb 09 '23 22:02 roed314