Add delivery_info key to message to fix sending tasks over Redis
Hi,
I want to use rusty-celery with the Redis backend. When trying a minimal example (like the one in /examples with a Redis broker instead of RabbitMQ), a Python worker errors for each received task with a KeyError: 'exchange'.
With the patch in this PR, the error disappears.
I tried to dig into why this is needed etc - but did not get too far because Celery docs and internals are ... hard to grasp.
With this PR, though, I successfully can sent tasks from Rust to Python over Redis.
Here's the log of a python worker without this patch:
Click to expand
[2021-07-13 15:58:45,406: WARNING/MainProcess] (b'celery', b'{"body":"W1tdL[...]XQ==","content-encoding":"utf-8","content-type":"application/json","headers":{"argsrepr":null,"eta":null,"expires":null,"group":null,"id":"14232fc9-38ac-4cf0-bacb-a14b44b5fce3","kwargsrepr":null,"lang":null,"meth":null,"origin":"gen163934@tok","parent_id":null,"retries":null,"root_id":null,"shadow":null,"task":"transcribe","timelimit":[null,null]},"properties":{"body_encoding":"base64","correlation_id":"14232fc9-38ac-4cf0-bacb-a14b44b5fce3","delivery_tag":"1c5c4efd-19ad-488f-9f07-0c2ab69d8a32","reply_to":null}}')
[2021-07-13 15:58:45,406: WARNING/MainProcess]
[2021-07-13 15:58:45,406: WARNING/MainProcess] <Message object at 0x7fc2766d2ee0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '1c5c4efd-19ad-488f-9f07-0c2ab69d8a32', 'body_length': 172, 'properties': {'correlation_id': '14232fc9-38ac-4cf0-bacb-a14b44b5fce3'}, 'delivery_info': {}}>
[2021-07-13 15:58:45,406: WARNING/MainProcess]
[2021-07-13 15:58:45,406: CRITICAL/MainProcess] Unrecoverable error: KeyError('exchange')
Traceback (most recent call last):
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
blueprint.start(self)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
c.loop(*c.loop_args())
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
next(loop)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
cb(*cbargs)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/redis.py", line 1130, in on_readable
self.cycle.on_readable(fileno)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/redis.py", line 399, in on_readable
chan.handlers[type]()
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/redis.py", line 791, in _brpop_read
self.connection._deliver(loads(bytes_to_str(item)), dest)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 980, in _deliver
callback(message)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 629, in _callback
self.qos.append(message, message.delivery_tag)
File "/home/bit/.cache/pypoetry/virtualenvs/oas-core-rJpEbOGy-py3.8/lib/python3.8/site-packages/kombu/transport/redis.py", line 194, in append
EX, RK = delivery['exchange'], delivery['routing_key']
KeyError: 'exchange'
[2021-07-13 15:58:45,412: DEBUG/MainProcess] | Worker: Closing Hub...
[2021-07-13 15:58:45,412: DEBUG/MainProcess] | Worker: Closing Pool...
[2021-07-13 15:58:45,412: DEBUG/MainProcess] | Worker: Closing Consumer...
[2021-07-13 15:58:45,412: DEBUG/MainProcess] | Worker: Stopping Consumer...
Here's the log of the python worker with this PR applied:
Click to expand
[2021-07-13 16:06:06,359: WARNING/MainProcess] dest__item
[2021-07-13 16:06:06,359: WARNING/MainProcess]
[2021-07-13 16:06:06,359: WARNING/MainProcess] (b'celery', b'{"body":"W1t[...]9XQ==","content-encoding":"utf-8","content-type":"application/json","headers":{"argsrepr":null,"eta":null,"expires":null,"group":null,"id":"2b640023-7225-46e0-b3a8-80058f6cbb3e","kwargsrepr":null,"lang":null,"meth":null,"origin":"gen168169@tok","parent_id":null,"retries":null,"root_id":null,"shadow":null,"task":"transcribe","timelimit":[null,null]},"properties":{"body_encoding":"base64","correlation_id":"2b640023-7225-46e0-b3a8-80058f6cbb3e","delivery_info":{"exchange":"","routing_key":"celery"},"delivery_tag":"9c27e275-4273-4da1-8e94-705ce401a84c","reply_to":null}}')
[2021-07-13 16:06:06,360: WARNING/MainProcess]
[2021-07-13 16:06:06,360: WARNING/MainProcess] <Message object at 0x7f5d87ae1ee0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '9c27e275-4273-4da1-8e94-705ce401a84c', 'body_length': 172, 'properties': {'correlation_id': '2b640023-7225-46e0-b3a8-80058f6cbb3e'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
[2021-07-13 16:06:06,360: WARNING/MainProcess]
[2021-07-13 16:06:06,362: INFO/MainProcess] Task transcribe[2b640023-7225-46e0-b3a8-80058f6cbb3e] received
[2021-07-13 16:06:06,362: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7f5d8b6f9670> (args:('transcribe', '2b640023-7225-46e0-b3a8-80058f6cbb3e', {'argsrepr': None, 'eta': None, 'expires': None, 'group': None, 'id': '2b640023-7225-46e0-b3a8-80058f6cbb3e', 'kwargsrepr': None, 'lang': None, 'meth': None, 'origin': 'gen168169@tok', 'parent_id': None, 'retries': None, 'root_id': None, 'shadow': None, 'task': 'transcribe', 'timelimit': [None, None], 'reply_to': None, 'correlation_id': '2b640023-7225-46e0-b3a8-80058f6cbb3e', 'hostname': 'celery@tok', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': None, 'redelivered': None}, 'args': [], 'kwargs': {'args': {'media_id': 'b2tt0d0zaytk3nq7efpmad4ezw', 'media_url': 'http://localhost:8101/bela1.mp3'}, 'opts': {}}}, b'[[],{"args":{"media_id":"b2tt0d0zaytk3nq7efpmad4ezw","media_url":"http://localhost:8101/bela1.mp3"},"opts":{}},{"callbacks":null,"errbacks":null,"chain":null,"chord":null}]', 'application/json', 'utf-8') kwargs:{})
[2021-07-13 16:06:06,364: WARNING/ForkPoolWorker-7] start transcribe task with args
[2021-07-13 16:06:06,365: WARNING/ForkPoolWorker-7]
[2021-07-13 16:06:06,365: WARNING/ForkPoolWorker-7] {'media_id': 'b2tt0d0zaytk3nq7efpmad4ezw', 'media_url': 'http://localhost:8101/bela1.mp3'}
[2021-07-13 16:06:06,365: WARNING/ForkPoolWorker-7]
[2021-07-13 16:06:06,366: WARNING/ForkPoolWorker-7] opts
[2021-07-13 16:06:06,366: WARNING/ForkPoolWorker-7]
[2021-07-13 16:06:06,366: WARNING/ForkPoolWorker-7] {}
[2021-07-13 16:06:06,366: WARNING/ForkPoolWorker-7]
[2021-07-13 16:06:06,420: WARNING/MainProcess] dest__item
[2021-07-13 16:06:06,421: WARNING/MainProcess]
[2021-07-13 16:06:06,421: INFO/ForkPoolWorker-7] Task transcribe[2b640023-7225-46e0-b3a8-80058f6cbb3e] succeeded in 0.056548241002019495s: <AsyncResult: 1b2c6e22-ae0f-491e-9f62-e1b310ac2667>
Hey @Frando, thank you so much for figuring this out and I apologize for the delay in getting back to you! I'm reviewing now
I'm not able to update this branch for some reason. Can you do it when you get a chance? That should fix the CI errors.