python-mysql-replication
python-mysql-replication copied to clipboard
event can't added multiprocessing.Queue
Hello!
I want to Real-time statistics with multiprocessing
, but can't put event to multiprocessing.Queue
.
I think it's because event could not be pickle.
>>> import multiprocessing
>>> q = multiprocessing.Queue(maxsize = -1)
>>>
>>> q.put(binlogevent)
>>> Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
PicklingError: Can't pickle <type 'NoneType'>: attribute lookup __builtin__.NoneType failed
>>> import pickle
>>>
>>> pickle.dumps(binlogevent)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib64/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/lib64/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python2.7/pickle.py", line 419, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 663, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 615, in _batch_appends
save(x)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/lib64/python2.7/copy_reg.py", line 77, in _reduce_ex
raise TypeError("a class that defines __slots__ without "
TypeError: a class that defines __slots__ without defining __getstate__ cannot be pickled
To add a little more info to this, I tried to get around it by removing the database connection attributes from the event:
del event._ctl_connection, event.packet
But this makes the rows property empty on the other side of the queue. I assume this is because the rows are dynamically calculated when requested.
The database I'm working with has high volume continuously coming through replication, so I need to send the events off to other processes as quickly as possible.
Workaround
I was able to pass events through the multiprocessing queue by letting the library evaluate the rows, then removing the database connection. I also remove the packet since I no longer need it.
assert event.rows
del event._ctl_connection, event.packet