kombu
kombu copied to clipboard
JSON Serializer to handle numpy/frozendict
I noticed that Kombu uses a custom JSON Serializer to handle things that could be strings (like UUIDs and Django-related things). I would propose an extension to that to handle some other common things, such as Numpy numbers (which can be converted to a float, for instance) and frozendict (which can be converted back to dict). This could be achieved similar to how the Django check is done, putting dummy objects in case numpy/frozendict isn't available.
Let me know if this sounds good (or not), and any other suggestions, and I can open a PR.
can you start a proof of concept out of celery core first??
I've created a sample repo that shows this issue: https://github.com/jerr0328/celery-return-types
You can run docker-compose up --build to see the whole thing happen on a real redis/rabbitmq setup.
This results in the following:
Click to see exceptions
[2019-07-01 09:36:19,635: ERROR/ForkPoolWorker-2] Task sample.tasks.do_work[56bea187-f873-41c5-8b2a-eca4bf635cac] raised unexpected: EncodeError(TypeError("Object of type 'float32' is not JSON serializable",),)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 70, in dumps
**dict(default_kwargs, **kwargs))
File "/usr/local/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
return super(JSONEncoder, self).default(o)
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'float32' is not JSON serializable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 449, in trace_task
uuid, retval, task_request, publish_result,
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 149, in mark_as_done
self.store_result(task_id, result, state, request=request)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 342, in store_result
request=request, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 714, in _store_result
self.set(self.get_key_for_task(task_id), self.encode(meta))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 293, in encode
_, _, payload = self._encode(data)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 297, in _encode
return dumps(data, serializer=self.serializer)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 70, in dumps
**dict(default_kwargs, **kwargs))
File "/usr/local/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
return super(JSONEncoder, self).default(o)
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
kombu.exceptions.EncodeError: Object of type 'float32' is not JSON serializable
Seems like Kombu's json.py utility would be a good spot for this :)
But what about msgpack?
msgpack also chokes with numpy:
Click to see exceptions
[2019-07-03 07:23:36,985: ERROR/ForkPoolWorker-2] Task sample.tasks.do_work[f6184338-73e9-426c-99f0-c0d6ee5a976e] raised unexpected: EncodeError(TypeError("can not serialize 'numpy.float32' object",),)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 370, in pack
return packb(s, use_bin_type=True)
File "/usr/local/lib/python3.6/site-packages/msgpack/__init__.py", line 46, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 282, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 288, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 261, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'numpy.float32' object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 449, in trace_task
uuid, retval, task_request, publish_result,
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 149, in mark_as_done
self.store_result(task_id, result, state, request=request)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 348, in store_result
request=request, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 721, in _store_result
self.set(self.get_key_for_task(task_id), self.encode(meta))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 299, in encode
_, _, payload = self._encode(data)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 303, in _encode
return dumps(data, serializer=self.serializer)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 370, in pack
return packb(s, use_bin_type=True)
File "/usr/local/lib/python3.6/site-packages/msgpack/__init__.py", line 46, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 282, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 288, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 261, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer._pack
kombu.exceptions.EncodeError: can not serialize 'numpy.float32' object
It looks like there is some support for that if you also have msgpack-numpy installed and patch the msgpack serializers.