the _confirm_selected of Channel shouble reset to be False when channel reopen
If the channel is reopened (in the event of a channel error, such as exchange not found), _confirm_selected should be set to False again, otherwise the client will continue to wait for the ACK, but the server will not reply.
I encountered this issue with amqp==5.0.9, celery==5.2.3, kombu==5.2.4.
It can be recreated with the following run.py script with the celeryconfig.py file. The script (successfully) publishes a message, then (unsuccessfully) tries to purge a non-existent queue, then publishes a second message. The publishing of the second message hangs forever.
# celeryconfig.py
BROKER_URL = 'amqp://guest:guest@localhost:5672'
BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
RESULT_BACKEND = None
RESULT_PERSISTENT = False
# run.py
from amqp import NotFound
from celery import Celery, current_app
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def dummy_task():
pass
def purge_queue(queue_name):
with current_app.connection_or_acquire() as connection:
channel = connection.default_channel
channel.queue_purge(queue_name)
def main():
dummy_task.apply_async()
try:
purge_queue('non-existent-queue')
except NotFound:
pass
print('Before 2nd apply_sync', flush=True)
dummy_task.apply_async()
print('After 2nd apply_sync', flush=True)
if __name__ == '__main__':
main()
It seems like the problem is:
amqp.channel.Channel.basic_publish_confirmcallsself.confirm_select()when the first message is posted, enabling publisher confirms on the channel.- Purging a non-existent queue causes the channel to be closed and re-opened (
amqp.channel.Channel._on_closeis called at some point). The channel is re-opened without publisher confirms enabled. apply_async/basic_publish_confirmstill use the sameamqp.channel.Channelobject, which hasself._confirm_selected = True, even though the channel itself doesn't have publisher confirms enabled.basic_publish_confirmwaits forever for a publish confirmation that will never arrive.
This can be worked around by adding a timeout argument to the hanging apply_async call. This stops the apply_async call from hanging forever, but does end up publishing the message twice. This doesn't feel like the real fix though. I think the real fix might be to reset self._confirm_selected back to False after a channel has been re-opened. I'm not super familiar with this codebase though so I might be missing something.