When py-amqp tries to publish to nonexistent exchange crash with AttributeError: 'NoneType' object has no attribute 'drain_events'
Steps to reproduce:
- ensure that
test_excexchange does not exist - execute the code below
Example code:
import amqp
conn = amqp.Connection(host='localhost')
conn.connect()
ch = conn.channel(channel_id=1)
ch.basic_publish(msg=amqp.Message(body=b'Hello world'), exchange='test_exc', routing_key='rkey')
conn.close()
Executing of the script ends up with the following stacktrace:
File "test.py", line 7, in <module>
conn.close()
File "/home/matus/dev/py-amqp/amqp/connection.py", line 575, in close
wait=spec.Connection.CloseOk,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "/home/matus/dev/py-amqp/amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
listener(*args)
File "/home/matus/dev/py-amqp/amqp/channel.py", line 277, in _on_close
self._do_revive()
File "/home/matus/dev/py-amqp/amqp/channel.py", line 164, in _do_revive
self.open()
File "/home/matus/dev/py-amqp/amqp/channel.py", line 434, in open
spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
AttributeError: 'NoneType' object has no attribute 'drain_events'
I was able to replicate this issue on RabbitMQ 3.7.8 and Master branch of py-amqp.
I have found out that connection was set to None in the following line: https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/connection.py#L464
After inserting breakpoint I have found out the following tracebacks:
(Pdb) w
/home/matus/dev/py-amqp/test.py(7)<module>()
-> conn.close()
/home/matus/dev/py-amqp/amqp/connection.py(577)close() # Client sents Close Method, waits CloseOK method
-> wait=spec.Connection.CloseOk,
/home/matus/dev/py-amqp/amqp/abstract_channel.py(59)send_method()
-> return self.wait(wait, returns_tuple=returns_tuple)
/home/matus/dev/py-amqp/amqp/abstract_channel.py(79)wait()
-> self.connection.drain_events(timeout=timeout)
/home/matus/dev/py-amqp/amqp/connection.py(502)drain_events()
-> while not self.blocking_read(timeout):
/home/matus/dev/py-amqp/amqp/connection.py(508)blocking_read()
-> return self.on_inbound_frame(frame)
/home/matus/dev/py-amqp/amqp/method_framing.py(55)on_frame()
-> callback(channel, method_sig, buf, None)
/home/matus/dev/py-amqp/amqp/connection.py(512)on_inbound_method()
-> method_sig, payload, content,
/home/matus/dev/py-amqp/amqp/abstract_channel.py(124)dispatch_method()
-> listener(*args)
/home/matus/dev/py-amqp/amqp/channel.py(279)_on_close() # Client receives Close Method
-> self._do_revive()
/home/matus/dev/py-amqp/amqp/channel.py(165)_do_revive()
-> self.open()
/home/matus/dev/py-amqp/amqp/channel.py(436)open()
-> spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk, # Client sents Open method, waits for OpenOK method
/home/matus/dev/py-amqp/amqp/abstract_channel.py(59)send_method()
-> return self.wait(wait, returns_tuple=returns_tuple)
/home/matus/dev/py-amqp/amqp/abstract_channel.py(79)wait()
-> self.connection.drain_events(timeout=timeout)
/home/matus/dev/py-amqp/amqp/connection.py(502)drain_events()
-> while not self.blocking_read(timeout):
/home/matus/dev/py-amqp/amqp/connection.py(508)blocking_read()
-> return self.on_inbound_frame(frame)
/home/matus/dev/py-amqp/amqp/method_framing.py(55)on_frame()
-> callback(channel, method_sig, buf, None)
/home/matus/dev/py-amqp/amqp/connection.py(512)on_inbound_method()
-> method_sig, payload, content,
/home/matus/dev/py-amqp/amqp/abstract_channel.py(124)dispatch_method()
-> listener(*args)
/home/matus/dev/py-amqp/amqp/connection.py(666)_on_close_ok() # Client receives CloseOK method
-> self.collect()
> /home/matus/dev/py-amqp/amqp/connection.py(454)collect() # Client is setting self.channels to None
-> try:
(Pdb) c
Traceback (most recent call last):
File "test.py", line 7, in <module>
conn.close()
File "/home/matus/dev/py-amqp/amqp/connection.py", line 577, in close
wait=spec.Connection.CloseOk,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
while not self.blocking_read(timeout):
File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read
return self.on_inbound_frame(frame)
File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 512, in on_inbound_method
method_sig, payload, content,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
listener(*args)
File "/home/matus/dev/py-amqp/amqp/channel.py", line 279, in _on_close
self._do_revive()
File "/home/matus/dev/py-amqp/amqp/channel.py", line 165, in _do_revive
self.open()
File "/home/matus/dev/py-amqp/amqp/channel.py", line 436, in open
spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
while not self.blocking_read(timeout):
File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read # Client still waits for OpenOK. Receives some method but crashes, since it cleared channels
return self.on_inbound_frame(frame)
File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 511, in on_inbound_method
return self.channels[channel_id].dispatch_method(
TypeError: 'NoneType' object is not subscriptable
From the tracebacks can be seen that:
- Client sents
Closemethod to server - Client waits for
CloseOK. He starts drain_events loop. - Client receives
Closemethod instead ofCloseOk. - Clients sents
Openmethod (part of_do_revive()method) and waits forOpenOK. It starts another (!) drain_events loop - Client recieves
CloseOKmethod. It clears connection (sets self.channels = None) - Client still waits in second drain_events loop for
OpenOkmethod. Client receives some method but crashes since Connection.channels == None
In general the problem is that client executes Channel._do_revive() method even when connection is closing:
https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/channel.py#L276-L280
Possible solution is to in general will be roughly like this:
- Mark Connection as closing when
Connection.close()method is called:
def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
argsig='BsBB'):
self.closing = True
if self._transport is None:
# already closed
return
try:
return self.send_method(
spec.Connection.Close, argsig,
(reply_code, reply_text, method_sig[0], method_sig[1]),
wait=spec.Connection.CloseOk,
)
except (OSError, IOError, SSLError):
# close connection
self.collect()
raise
- Test whether connection is closing before calling
Channel._do_revive():
def _on_close(self, reply_code, reply_text, class_id, method_id):
self.send_method(spec.Channel.CloseOk)
if not self.connection.closing:
self._do_revive()
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)
After this fix correct exception is raised:
# python test.py
Traceback (most recent call last):
File "test.py", line 7, in <module>
conn.close()
File "/home/matus/dev/py-amqp/amqp/connection.py", line 578, in close
wait=spec.Connection.CloseOk,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 502, in drain_events
while not self.blocking_read(timeout):
File "/home/matus/dev/py-amqp/amqp/connection.py", line 508, in blocking_read
return self.on_inbound_frame(frame)
File "/home/matus/dev/py-amqp/amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "/home/matus/dev/py-amqp/amqp/connection.py", line 512, in on_inbound_method
method_sig, payload, content,
File "/home/matus/dev/py-amqp/amqp/abstract_channel.py", line 124, in dispatch_method
listener(*args)
File "/home/matus/dev/py-amqp/amqp/channel.py", line 282, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'test_exc' in vhost '/'
Moreover does it make sense to revive connection just before raising exception? https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/channel.py#L276-L280
Another improvement can be to ensure that after connection is closed - after calling Connection.collect() method - drain_events loop is immediately ended:
https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/connection.py#L498-L501
@auvipy you can close this issue now...
thanks!!
Running 2.4.0, we are still getting the same exception:
'NoneType' object has no attribute 'drain_events'
From amqp/abstract_channel.py in wait at line 80
while not p.ready:
self.connection.drain_events(timeout=timeout)
Anyone else still getting it ?
Thoughts on what is causing it ?
@AvnerCohen I was not able to create situation with your Exception. Could you provide some simple failing example?
:( I afraid not. We randomly get it in production, random workers would crash.
@matusvalo What's interesting is that your suggested fix is basically to expose the "correct: exception:
amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'test_exc' in vhost '/'
In our case, we seem to be getting both of this errors:
Basic.consume: (404) NOT_FOUND - no queue
Unrecoverable error: NotFound(404, u"NOT_FOUND - no queue ' [email protected]' in vhost '/'", (60, 20), u'Basic.consume')
From amqp/channel.py in _on_close at line 282:
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)
The 2 errors would happen in parallel and random workers will fail on it.
maybe this is now no longer an amqp issue but a core celery issue?
@AvnerCohen I was afraid that it is random issue. I am not sure about problem on kombu/celery side because the exception is coming from py-amqp. If there is issue on celery/kombu side in worst case py-amqp must be able handle correctly wrong usage. Could you post here full tracebacks for not found errors and also attribute error? + how often do you get this errors?
At least I know about one problem on py-amqp side - it is not fully following amqp standards because it processes incoming messages in drain_events() even when close() method is received from broker... This should be fixed but we need to be careful to not broke something else :-)
@matusvalo thanks so much for the insights and time.
We are getting this during deploy of new code changes or any maintenance that involves large scale stop/start of workers (btw - which brings this item as could be relevant - https://github.com/celery/celery/issues/4618 - bot looks like there is no extra info there). So it is very much dependent on the count of deploys we make.
Here are the two raw stack traces:
AttributeError: 'NoneType' object has no attribute 'drain_events'
File "celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/bootsteps.py", line 369, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "celery/worker/loops.py", line 91, in asynloop
next(loop)
File "kombu/asynchronous/hub.py", line 354, in create_loop
cb(*cbargs)
File "kombu/transport/base.py", line 236, in on_readable
reader(loop)
File "kombu/transport/base.py", line 218, in _read
drain_events(timeout=0)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 79, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 1616, in _on_basic_deliver
fun(msg)
File "kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "celery/worker/pidbox.py", line 51, in on_message
self.reset()
File "celery/worker/pidbox.py", line 66, in reset
self.stop(self.c)
File "celery/worker/pidbox.py", line 63, in stop
self.consumer = self._close_channel(c)
File "celery/worker/pidbox.py", line 71, in _close_channel
ignore_errors(c, self.node.channel.close)
File "kombu/common.py", line 298, in ignore_errors
return fun(*args, **kwargs)
File "amqp/channel.py", line 226, in close
wait=spec.Channel.CloseOk,
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 79, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 1616, in _on_basic_deliver
fun(msg)
File "kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "celery/worker/pidbox.py", line 51, in on_message
self.reset()
File "celery/worker/pidbox.py", line 67, in reset
self.start(self.c)
File "celery/worker/pidbox.py", line 54, in start
self.node.channel = c.connection.channel()
File "kombu/connection.py", line 266, in channel
chan = self.transport.create_channel(self.connection)
File "kombu/transport/pyamqp.py", line 100, in create_channel
return connection.channel()
File "amqp/connection.py", line 491, in channel
channel.open()
File "amqp/channel.py", line 437, in open
spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
And:
NotFound: Basic.consume: (404) NOT_FOUND - no queue ' [email protected]' in vhost '/'
File "celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/bootsteps.py", line 370, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 316, in start
blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/worker/pidbox.py", line 55, in start
self.consumer = self.node.listen(callback=self.on_message)
File "kombu/pidbox.py", line 91, in listen
consumer.consume()
File "kombu/messaging.py", line 477, in consume
self._basic_consume(T, no_ack=no_ack, nowait=False)
File "kombu/messaging.py", line 598, in _basic_consume
no_ack=no_ack, nowait=nowait)
File "kombu/entity.py", line 737, in consume
arguments=self.consumer_arguments)
File "amqp/channel.py", line 1572, in basic_consume
returns_tuple=True
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 282, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
@matusvalo Anything we can help or provide information additional info on that?
Thank you @AvnerCohen. I need to have to fine some spare time to have a look on that... For now no.
Anything new on this issue?
@thedrow for now no. I am not able to reproduce the issue. There are multiple issues present which points to using closed connection/channel - e.g. another issue: https://github.com/celery/kombu/issues/1027
There are only two places where connection or channel is set to None:
https://github.com/celery/py-amqp/blob/756d60d225f92c47f1aae3f38f8e9190c619dbcd/amqp/channel.py#L145-L156
https://github.com/celery/py-amqp/blob/756d60d225f92c47f1aae3f38f8e9190c619dbcd/amqp/connection.py#L452-L464
Connection.collect() is called when:
Connection.connect()fails - this is not the case of issuesClose-OKis received from broker. But this should be a reply toClosemessage sent by client.Closereceived by broker, but this case raises an exception so it should not yield tracebacks as we have
Channel.collect() is called only when Close-OK is received. This is also after client sends Close.
One possible option can be if multiple threads shares one single connection...
One possibility came to my mind: From stacktrace:
File "celery/worker/pidbox.py", line 63, in stop
self.consumer = self._close_channel(c)
File "celery/worker/pidbox.py", line 71, in _close_channel
ignore_errors(c, self.node.channel.close)
File "kombu/common.py", line 298, in ignore_errors
return fun(*args, **kwargs)
File "amqp/channel.py", line 226, in close
wait=spec.Channel.CloseOk,
The other possiblity is that when Client sends Close method it waits for Close-OK but in the meanwhile another message comes requesting waiting which yields another message with wating etc... In general AMQP spec requires that after close method is sent client must process only Close-OK replies [1]:
After sending this method, any received methods except Close and Close-OK MUST be discarded. The > response to receiving a Close after sending Close must be to send Close-Ok.
Unfortunately py-amqp library does not conform specs in this case.
[1] https://www.rabbitmq.com/amqp-0-9-1-reference.html
Hmm so how do we fix that?
@thedrow see #280.
@AvnerCohen could you please check master branch if your problem still occurs?
@matusvalo On our end, we have seen this behavior when starting some 80 celery workers (with anywhere between 2 to 8 concurrency) in parallel. To work around the issue, we broke this down and are no longer doing that (at max we start 20 workers at the same time) and this seems to have solved the issue for us.
Hi everyone, I found this issue after looking for a solution for this problem.
Env: Celery 5.1.2 Rabbitmq: 3.9.4
From rabbit 3.8.15, they introduced this new feature: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout
When the timeout is triggered, the 'NoneType' object has no attribute 'drain_events' error appears again, causing the celery worker to stop working
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
blueprint.start(self)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
next(loop)
File "/usr/local/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 235, in on_readable
reader(loop)
File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 217, in _read
drain_events(timeout=0)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 277, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/celery/__main__.py", line 15, in main
sys.exit(_main())
File "/usr/local/lib/python3.8/site-packages/celery/bin/celery.py", line 213, in main
return celery(auto_envvar_prefix="CELERY")
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/celery/bin/base.py", line 133, in caller
return f(ctx, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/celery/bin/worker.py", line 346, in worker
worker.start()
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 208, in start
self.stop(exitcode=EX_FAILURE)
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 251, in stop
self._shutdown(warm=True)
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 266, in _shutdown
self.blueprint.stop(self, terminate=not warm)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 174, in stop
self.on_stopped()
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 162, in on_stopped
self.consumer.shutdown()
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 370, in shutdown
self.blueprint.shutdown(self)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 168, in shutdown
self.send_all(parent, 'shutdown')
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 148, in send_all
fun(parent, *args)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 29, in shutdown
ignore_errors(connection, connection.close)
File "/usr/local/lib/python3.8/site-packages/kombu/common.py", line 325, in ignore_errors
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 375, in release
self._close()
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 341, in _close
self._do_close_self()
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 331, in _do_close_self
self.maybe_close_channel(self._default_channel)
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 323, in maybe_close_channel
channel.close()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 219, in close
return self.send_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 276, in _on_close
self._do_revive()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 161, in _do_revive
self.open()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 432, in open
return self.send_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
self.connection.drain_events(timeout=timeout)
AttributeError: 'NoneType' object has no attribute 'drain_events'
Do you know how to prevent this? Celery should not break for timeout errors
For me no permissions to exchange (or virtual host) ends up with the behavior above.
@matusvalo I'm able to reproduce the NotFound: Basic.consume: (404) NOT_FOUND - no queue issue with https://github.com/povilasb/celery-issues/ . Hope that helps :)
I've also created a py-amqp only example to reproduce the issue: https://github.com/povilasb/celery-issues/#py-amqp-example
What happens, I believe, is that RabbitMQ responds with "Channel Close" message when we're trying to publish to non-existing queue. Then [Channel._on_close()` is called](https://github.com/celery/py-amqp/blob/98f6d364188215c2973693a79e461c7e9b54daef/amqp/channel.py#L142).
Upon reading the docs I tend to think that this may be expected from the AMQP library:
Certain scenarios are assumed to be recoverable ("soft") errors in the protocol. They render the channel closed but applications can open another one and try to recover or retry a number of times. Most common examples are:
* Consuming from a queue that does not exist will fail with a 404 NOT_FOUND error
* Publishing to an exchange that does not exist will fail with a 404 NOT_FOUND error
Anyway, I'm gonna stay away from this issue now since I guess this is more of a Kombu/Celery issue for not reopening a new channel. My issue may not be related to the original one as well.