pyorient icon indicating copy to clipboard operation
pyorient copied to clipboard

OrientPy vs. concurrent.futures' ProcessPoolExecutor (pickling?)

Open songololo opened this issue 10 years ago • 5 comments

I am using a concurrent.futures instance of a ProcessPoolExecutor in a script. In the same script, I am using OrientPy to make queries to a database.

The script instantiates a class, inside of which, the query is made first, then the results are retrieved and sorted into a list, and then a ProcessPool Executor instance is created for processing the results in parallel.

However, this error message occurs, presumably due to pickling issues, even though OrientPy itself is not needed for the ensuing function that processes the results: Traceback (most recent call last): File "/Users/shongololo/anaconda/lib/python3.4/site-packages/pyorient-1.4-py3.4.egg/pyorient/orient.py", line 331, in get_message if command is not None and self._Messages[command]: KeyError: 'GetstateMessage'

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/shongololo/anaconda/lib/python3.4/multiprocessing/queues.py", line 242, in _feed obj = ForkingPickler.dumps(obj) File "/Users/shongololo/anaconda/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps cls(buf, protocol).dump(obj) File "/Users/shongololo/anaconda/lib/python3.4/site-packages/pyorient-1.4-py3.4.egg/pyorient/orient.py", line 188, in getattr _Message = self.get_message(_names + "Message") File "/Users/shongololo/anaconda/lib/python3.4/site-packages/pyorient-1.4-py3.4.egg/pyorient/orient.py", line 349, in get_message "Unable to find command " + str(e), [] pyorient.exceptions.PyOrientBadMethodCallException: Unable to find command 'GetstateMessage'

Since this exists inside a Class, I suspect that the reason is that ProcessPoolExecutor is pickling the entire class (or at least selected other portions?) whilst creating the pickled function.

One solution would be to make the function fully stand-alone outside of the class (and initial tests seem to indicate that this might work), however, I'm wondering if it is possible to make OrientPy somehow more generally compatible with pickling for cases when this is not possible?

songololo avatar Apr 30 '15 09:04 songololo

I think that

"/Users/shongololo/anaconda/lib/python3.4/multiprocessing/queues.py", line 242, in feed obj = ForkingPickler.dumps(obj) File 

tried to call the driver in this manner:

OrientDB.get_message("GetstateMessage")

# or

getattr( OrientDB , 'Getstate' )

Can you provide a simplified snippet of code to reproduce the issue?

Thank you in advance.

Ostico avatar Apr 30 '15 14:04 Ostico

It seems the problem is that I was passing in a queue.Queue() object, which is designed to work with threading (vs. process pools), thus can't share locking etc. across separate processes, etc. Once removing the queue object, then the ProcessPoolExecutor works fine as long as I am not using PyOrient internally to the ProcessPoolExecutor.

If I do use PyOrient internally to the process pool, then I get the error message, presumably because PyOrient uses queue internally?

Thanks.

songololo avatar May 01 '15 14:05 songololo

Hi @shongololo ,

pyorient don'tt use queue, there is some strange conflict on the internal pyorient methods and the ProcessPoolExecutor.

Please, send me a little snippet of code to raise the exception if you can, i'd try to investigate.

Ostico avatar May 01 '15 20:05 Ostico

Hi, here is a snippet, would be great there is a way to resolve:

import pyorient

db_name = 'db_name'
db_port = 2424
db_host = 'localhost'
db_client = pyorient.OrientDB(db_host, db_port)
db_client.db_open(db_name, 'root', 'pw')
print('...connected to database: {0}, {1}, {2}'.format(db_name, db_host, db_port))

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as process_pool:
    a_query = 'select from my_class'
    data = process_pool.submit(db_client.query, a_query)  #passes a_query into pyorient.query()
    print(data)

Here is the console output / error message:

...connected to database: db_name, localhost, 2424
<Future at 0x104393be0 state=running>
Traceback (most recent call last):
  File "/anaconda/lib/python3.4/site-packages/pyorient/orient.py", line 332, in get_message
    if command is not None and self._Messages[command]:
KeyError: 'GetstateMessage'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/lib/python3.4/multiprocessing/queues.py", line 242, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/anaconda/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
  File "/anaconda/lib/python3.4/site-packages/pyorient/orient.py", line 189, in __getattr__
    _Message = self.get_message(_names + "Message")
  File "/anaconda/lib/python3.4/site-packages/pyorient/orient.py", line 350, in get_message
    "Unable to find command " + str(e), []
pyorient.exceptions.PyOrientBadMethodCallException: Unable to find command 'GetstateMessage'

songololo avatar May 23 '15 20:05 songololo

@shongololo @Ostico it is still an issue?

mogui avatar Jun 23 '16 19:06 mogui