kombu icon indicating copy to clipboard operation
kombu copied to clipboard

JSON Serializer to handle numpy/frozendict

Open jerr0328 opened this issue 6 years ago • 4 comments

I noticed that Kombu uses a custom JSON Serializer to handle things that could be strings (like UUIDs and Django-related things). I would propose an extension to that to handle some other common things, such as Numpy numbers (which can be converted to a float, for instance) and frozendict (which can be converted back to dict). This could be achieved similar to how the Django check is done, putting dummy objects in case numpy/frozendict isn't available.

Let me know if this sounds good (or not), and any other suggestions, and I can open a PR.

jerr0328 avatar Jun 28 '19 14:06 jerr0328

can you start a proof of concept out of celery core first??

auvipy avatar Jun 28 '19 18:06 auvipy

I've created a sample repo that shows this issue: https://github.com/jerr0328/celery-return-types You can run docker-compose up --build to see the whole thing happen on a real redis/rabbitmq setup.

This results in the following:

Click to see exceptions
[2019-07-01 09:36:19,635: ERROR/ForkPoolWorker-2] Task sample.tasks.do_work[56bea187-f873-41c5-8b2a-eca4bf635cac] raised unexpected: EncodeError(TypeError("Object of type 'float32' is not JSON serializable",),)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 70, in dumps
    **dict(default_kwargs, **kwargs))
  File "/usr/local/lib/python3.6/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
    return super(JSONEncoder, self).default(o)
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'float32' is not JSON serializable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 449, in trace_task
    uuid, retval, task_request, publish_result,
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 149, in mark_as_done
    self.store_result(task_id, result, state, request=request)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 342, in store_result
    request=request, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 714, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 293, in encode
    _, _, payload = self._encode(data)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 297, in _encode
    return dumps(data, serializer=self.serializer)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 70, in dumps
    **dict(default_kwargs, **kwargs))
  File "/usr/local/lib/python3.6/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
    return super(JSONEncoder, self).default(o)
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
kombu.exceptions.EncodeError: Object of type 'float32' is not JSON serializable

Seems like Kombu's json.py utility would be a good spot for this :)

jerr0328 avatar Jul 01 '19 09:07 jerr0328

But what about msgpack?

thedrow avatar Jul 02 '19 13:07 thedrow

msgpack also chokes with numpy:

Click to see exceptions
[2019-07-03 07:23:36,985: ERROR/ForkPoolWorker-2] Task sample.tasks.do_work[f6184338-73e9-426c-99f0-c0d6ee5a976e] raised unexpected: EncodeError(TypeError("can not serialize 'numpy.float32' object",),)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 370, in pack
    return packb(s, use_bin_type=True)
  File "/usr/local/lib/python3.6/site-packages/msgpack/__init__.py", line 46, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 282, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 288, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 261, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'numpy.float32' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 449, in trace_task
    uuid, retval, task_request, publish_result,
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 149, in mark_as_done
    self.store_result(task_id, result, state, request=request)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 348, in store_result
    request=request, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 721, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 299, in encode
    _, _, payload = self._encode(data)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 303, in _encode
    return dumps(data, serializer=self.serializer)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.6/site-packages/kombu/serialization.py", line 370, in pack
    return packb(s, use_bin_type=True)
  File "/usr/local/lib/python3.6/site-packages/msgpack/__init__.py", line 46, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 282, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 288, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 232, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 261, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer._pack
kombu.exceptions.EncodeError: can not serialize 'numpy.float32' object

It looks like there is some support for that if you also have msgpack-numpy installed and patch the msgpack serializers.

jerr0328 avatar Jul 03 '19 07:07 jerr0328