Fix RecursionError because of repeated channel reconnections.
Fix RecursionError because of repeated channel reconnections.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/celery/app/trace.py", line 441, in trace_task
task.backend.store_result(
File "/usr/local/lib/python3.8/dist-packages/celery/backends/rpc.py", line 202, in store_result
producer.publish(
File "/usr/local/lib/python3.8/dist-packages/kombu/messaging.py", line 177, in publish
return _publish(
File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 524, in _ensured
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/kombu/messaging.py", line 199, in _publish
return channel.basic_publish(
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 1775, in _basic_publish
self.connection.drain_events(timeout=0)
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 276, in _on_close
self._do_revive()
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 161, in _do_revive
self.open()
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 432, in open
return self.send_method(
File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 66, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 86, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 276, in _on_close
self._do_revive()
File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 161, in _do_revive
self.open()
Finally raise RecursionError:
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read return self.on_inbound_frame(frame) File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame callback(channel, method_sig, buf, None) File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method return self.channels[channel_id].dispatch_method( File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 131, in dispatch_method one_shot = self._pending.pop(method_sig) RecursionError: maximum recursion depth exceeded while calling a Python object
@pawl @michael-lazar can you guys please try this patch?
@auvipy Thanks. But I think this may lead some other problem. Because when I use it in my project, it makes the celery worker can't consume tasks. I think my solution is wrong and lead some other problems.
I don't know much about this project. I think it is a problem because it repeats in my production environment. But I don't know how to reproduce it and really solve it.
@liuyaqiu Is this a recent issue for you or has this been happening for a while?
@liuyaqiu Is this a recent issue for you or has this been happening for a while?
This has been happening for a while. But it now always repeats on my production environment. I think it is because of:
- The celery worker(client) wanted to publish a message.
- the client found that the channel is closing, but its connection is not closed. It found a
S:CLOSEframe. - the client call
on_close(), sent aspec.Channel.CloseOk. Then because the connection is not closed, it wants to revive the channel. call_do_revive() - During
_do_revive(), callingopen()and sendspec.Channel.Openand wait forspec.Channel.OpenOk - but the next frame is still a
S:CLOSEframe. so go to the setp 3.
I think the rabbitmq may be in wrong status, so the client received too much S:CLOSE frame, and leads that too many on_close() is called.
Then, the RecurssionError is not captured by the celery framework, celery think it is a task's runtime error, so it reported the task failed. In fact, the task didn't event start. (The task failed when it publish task's status to rabbitmq backend).
I think now my solution is a quick fix for this problem. When the client found too much on_close, it should stop channel reviving and raise ChannelError, rather than repeated to cause a RecurssionError which can't be catched by downstream application.
A better idea may be:
When a channel is reviving, ignore all frame other than S:OPEN-OK. Then the channel should not auto reviving after too much open operation during a period, and then exit to avoid infinity loop.
Me personally, I prefer to have final fix. this PR is honestly just dirty fix which can lead to other hidden problems.
Me personally, I prefer to have final fix. this PR is honestly just dirty fix which can lead to other hidden problems.
Thanks. I will try to solve it in a better way.
@pawl if you have time in coming days
@auvipy @liuyaqiu @matusvalo Hello guys, I had this issue too, any updates about it ?
@matusvalo Hello guys, I had this issue too, any updates about it ?
I don't know your problem context. Previously I call a subtask synchronously in a parent task and use the rpc result backend to store task state and result, I try to get subtask's state and result in the parent task. But now I don't use rpc result backend and use the mongodb result backend. Previously my error is encountered when I get the subtask's state and result from rpc result backend. And now I just use RabbitMQ as broker, there is no such error.
And you should not use the rpc result backend in production environment because the rpc result backend will create a unique queue for every task to store its state and result. Then this leads to too many result queues in RabbitMQ, which will waste resource of RabbitMQ and harm RabbitMQ's performance.
@liuyaqiu What you're describing is the old AMQP backend. The RPC backend uses RabbitMQ's Pub/Sub capabilities.
@liuyaqiu What you're describing is the old AMQP backend. The RPC backend uses RabbitMQ's Pub/Sub capabilities.
What I am describing remains in the version v5.2.1. Is this changed in the master?
@liuyaqiu What you're describing is the old AMQP backend. The RPC backend uses RabbitMQ's Pub/Sub capabilities.
What I am describing remains in the version v5.2.1. Is this changed in the master?
no you are right, that didn't changed