python-mysql-replication icon indicating copy to clipboard operation
python-mysql-replication copied to clipboard

event can't added multiprocessing.Queue

Open daiguadaidai opened this issue 8 years ago • 2 comments

Hello! I want to Real-time statistics with multiprocessing, but can't put event to multiprocessing.Queue. I think it's because event could not be pickle.

>>> import multiprocessing
>>> q = multiprocessing.Queue(maxsize = -1)
>>>
>>> q.put(binlogevent)
>>> Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
PicklingError: Can't pickle <type 'NoneType'>: attribute lookup __builtin__.NoneType failed

>>> import pickle
>>> 
>>> pickle.dumps(binlogevent)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib64/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 615, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/lib64/python2.7/copy_reg.py", line 77, in _reduce_ex
    raise TypeError("a class that defines __slots__ without "
TypeError: a class that defines __slots__ without defining __getstate__ cannot be pickled

daiguadaidai avatar Dec 09 '16 01:12 daiguadaidai

To add a little more info to this, I tried to get around it by removing the database connection attributes from the event:

del event._ctl_connection, event.packet

But this makes the rows property empty on the other side of the queue. I assume this is because the rows are dynamically calculated when requested.

The database I'm working with has high volume continuously coming through replication, so I need to send the events off to other processes as quickly as possible.

matt-schwartz avatar May 20 '17 19:05 matt-schwartz

Workaround

I was able to pass events through the multiprocessing queue by letting the library evaluate the rows, then removing the database connection. I also remove the packet since I no longer need it.

assert event.rows
del event._ctl_connection, event.packet

matt-schwartz avatar May 21 '17 20:05 matt-schwartz