rabbitmq-message-deduplication
rabbitmq-message-deduplication copied to clipboard
Celery lost connection to broker
Hi, I'm using rabbitmq 3.13.0 Erlang 26.2.2 rabbitmq-message-deduplication 0.6.2
celery 5.3.6 django-celery-beat 2.6.0
When the the argument x-message-deduplication
is set to true
for the celery queue, then all the messages sent to the queue causes celery to lost the connection:
consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 742, in start
c.loop(*c.loop_args())
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop
cb(*cbargs)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/transport/base.py", line 248, in on_readable
reader(loop)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/kombu/transport/base.py", line 230, in _read
drain_events(timeout=0)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 532, in blocking_read
return self.on_inbound_frame(frame)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/connection.py", line 538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "/home/ale/PycharmProjects/venv_cargo/lib/python3.8/site-packages/amqp/channel.py", line 1467, in _on_basic_cancel
raise ConsumerCancelled(consumer_tag, spec.Basic.Cancel)
amqp.exceptions.ConsumerCancelled: Basic.cancel: (0) None8
What it could be?