Long tasks are executed multiple times
Hi,
We are using celery with tasks that may take up a lot of time (hours or even days). The problem is that after a couple of hours task is sent back to the queue and run from another worker (or the same one after it finishes).
Looks like the task is not acknowledged and after that 2h limit it is readded (by rabbit?). If that is the problem, is there a way to acknowledge them in the begining of the task or disable this behaviour?
We are using celery 4.0.0 and RabbitMQ 3.6.3 with the following celery config:
CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Etc/UTC' CELERY_DISABLE_RATE_LIMITS = True CELERY_IGNORE_RESULT = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_SEND_EVENTS = False
Thanks
Celery acknowledges tasks before they execute by default, are you sure you don't have CELERY_ACKS_LATE or @task(acks_late=True) enabled?
Yeah, when a new task starts I can see an unacked message in the rabbit queue until it finishes. This should be the behaviour when having CELERY_ACKS_LATE right? but I don't have it enabled. Just tested with this code:
from celery import Celery celery = Celery('PyFaster', broker="xxxx") celery.config_from_object('celeryconfig')
tests_ack.py @celery.task(queue='tests_ack', name="test") def test(): print "Doing task .. waiting 20s" time.sleep(20)
if name == "main": test.apply_async()
celeryconfig.py: CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Etc/UTC'
launching worker with: celery worker -A task -Q tests_ack -c 1 --pool=solo -n tests_ack --loglevel debug
some debug log:
[2016-09-07 18:07:10,083: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@CVC95', 'platform': 'Erlang/OTP', 'version': '3.6.3'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
[2016-09-07 18:07:58,018: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fd20f8bd398> (args:('test', '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', {'origin': 'gen18378@CVC95', 'lang': 'py', 'task': 'test', 'group': None, 'root_id': None, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'tests_ack', u'exchange': u''}, 'expires': None, u'correlation_id': '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', 'retries': 0, 'timelimit': [None, None], 'argsrepr': '()', 'eta': None, 'parent_id': None, u'reply_to': '69bf79be-b51b-3a39-ad67-45863d7c3004', 'id': '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', 'kwargsrepr': '{}'}, u'[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{}) [2016-09-07 18:07:58,019: DEBUG/MainProcess] Task accepted: test[13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff] pid:17017
I think after 2h the same task came with the redelivered flag set to true. I'm running it now with a sleep of 3h to verify
Is there anything in the logs to suggest the ack operation failed? It should be error severity and say something like connection lost
Yes, just checked the worker. It also looks like ack was sent in the end of the task instead of the beginning?
[2016-09-07 18:31:06,274: WARNING/MainProcess] Doing task ... waiting 3h [2016-09-07 21:31:06,363: INFO/MainProcess] Task test[53591cbc-53f2-4639-82a9-cb834020c7cb] succeeded in 10800.0896712s: None [2016-09-07 21:31:06,364: CRITICAL/MainProcess] Couldn't ack 4, reason:error(104, 'Connection reset by pe er') Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/message.py", line 96, in ac k_log_error self.ack(multiple=multiple) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/message.py", line 91, in ac k self.channel.basic_ack(self.delivery_tag, multiple=multiple) File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1425, in basic_ack spec.Basic.Ack, argsig, (delivery_tag, multiple), File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 62, in send_method conn.frame_writer.send((1, self.channel_id, sig, args, content)) File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 165, in frame_writer write(view[:offset]) File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 253, in write self._write(s) File "/usr/lib/python2.7/socket.py", line 224, in meth return getattr(self._sock,name)(_args) error: [Errno 104] Connection reset by peer [2016-09-07 21:31:06,387: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establis h the connection... Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/consumer/consumer. py", line 310, in start blueprint.start(self) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/bootsteps.py", line 120, in start step.start(parent) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/consumer/consumer. py", line 576, in start c.loop(_c.loop_args()) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/loops.py", line 88 , in asynloop next(loop) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/async/hub.py", line 280, in create_loop item() File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 134, in call return self.throw() File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 131, in call retval = self.fun(_final_args, *_final_kwargs) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/transport/base.py", line 15 3, in _read raise RecoverableConnectionError('Socket was disconnected') RecoverableConnectionError: Socket was disconnected [2016-09-07 21:31:06,387: DEBUG/MainProcess] | Consumer: Restarting event loop... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Heart... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Control... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Tasks... [2016-09-07 21:31:06,388: DEBUG/MainProcess] Canceling task consumer... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Gossip... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Mingle... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Events... [2016-09-07 21:31:06,389: DEBUG/MainProcess] | Consumer: Restarting Connection... [2016-09-07 21:31:06,389: DEBUG/MainProcess] | Consumer: Starting Connection [2016-09-07 21:31:06,396: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.b locked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_co nsumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': Tru e}, 'cluster_name': 'rabbit@CVC95', 'platform': 'Erlang/OTP', 'version': '3.6.3'}, mechanisms: [u'AMQPLAI N', u'PLAIN'], locales: [u'en_US']
Rabbit and the celery worker are in the same machine
I've tried same code with celery 3.1.23 and kombu 3.0.35 and it worked as expected
@MarcSalvat you mean that this problem is new in celery4.0.0 and kombu 4.0 ? And If we use celery 3.1.23 and kombu3.0.35, this problem is not occured ?
@Sunrry Correct. Last time I checked was with v4 rc3
I am seeing the same issue. Based on my testing, it appears to be caused by the solo setting. Does celery ack late automatically with the solo option on or is it delayed by the task itself?
Probably a dupe of #3768
Yup having same issue with solo,
The task takes few minutes to completely but in the meantime the connection is reset and it does not acks and simply reruns.
[2017-05-28 19:57:43,988: INFO/MainProcess] Task perform_ssd_detection_by_id[2bd234fe-d7bb-426b-bda5-7e8f05f2a7dd] succeeded in 229.236948594s: 0
[2017-05-28 19:57:43,989: CRITICAL/MainProcess] Couldn't ack 6, reason:error(104, 'Connection reset by peer')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 130, in ack_log_error
self.ack(multiple=multiple)
File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 125, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1408, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 64, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 174, in write_frame
write(view[:offset])
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 269, in write
self._write(s)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer
anyone provide a testcase to start work towards fix?
This issue happens in both celery 4.1.0 and 4.2.0rc2 using all default settings except for the solo mode (Python 3.6.4).
It seems to me that:
- in the solo mode task execution prevents heart beats and the connection times out when the task takes too long (about 3 minutes with default config), and
- acks_late is not respected in solo mode, the task is not acked before it is executed.
The code needed to reproduce the issue:
import time
from celery import Celery
app = Celery('test', broker='amqp://test:test@rabbitmq/test')
@app.task
def foo():
print('sleep time!')
time.sleep(180)
print('all good')
Starting worker with:
celery worker -l INFO -A tasks -P solo
Then, schedule the task and watch the worker get stuck in a permanent loop.
Log from 4.1.0:
[2018-04-26 15:20:36,954: INFO/MainProcess] Received task: tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e]
[2018-04-26 15:20:36,954: WARNING/MainProcess] sleep time!
[2018-04-26 15:23:36,960: WARNING/MainProcess] all good
[2018-04-26 15:23:36,960: INFO/MainProcess] Task tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e] succeeded in 180.0062870910001s: None
[2018-04-26 15:23:36,961: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
self.ack(multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:23:36,986: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 320, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 88, in asynloop
next(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/async/hub.py", line 354, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
reader(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-04-26 15:23:36,999: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:23:37,019: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:23:38,030: INFO/MainProcess] mingle: all alone
[2018-04-26 15:23:38,071: INFO/MainProcess] Received task: tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e]
[2018-04-26 15:23:38,073: WARNING/MainProcess] sleep time!
Log from 4.2.0rc2:
[2018-04-26 15:26:52,350: INFO/MainProcess] Received task: tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a]
[2018-04-26 15:26:52,350: WARNING/MainProcess] sleep time!
[2018-04-26 15:29:52,431: WARNING/MainProcess] all good
[2018-04-26 15:29:52,432: INFO/MainProcess] Task tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a] succeeded in 180.0813092110002s: None
[2018-04-26 15:29:52,432: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
self.ack(multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:29:52,444: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 598, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/async/hub.py", line 354, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
reader(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-04-26 15:29:52,453: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:29:52,473: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:29:52,477: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 40, in start
self.sync(c)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 44, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 57, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 143, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 95, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 454, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:29:52,490: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:29:52,497: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:29:53,512: INFO/MainProcess] mingle: all alone
[2018-04-26 15:29:53,523: INFO/MainProcess] Received task: tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a]
[2018-04-26 15:29:53,524: WARNING/MainProcess] sleep time!
plz wait untill we release the latest pyamqp and kombu to be checked with celery master
can anyone let us know its ststus with kombu 4.2 and celery 4.2rc4?
@auvipy I did not notice any difference compared to 4.2rc2. RabbitMQ admin still shows the task as unacked while it is being executed by worker.
Using the same task:
import time
from celery import Celery
app = Celery('test', broker='amqp://test:test@rabbitmq/test')
@app.task
def foo():
print('sleep time!')
time.sleep(180)
print('all good')
and
root@2e6a9b563d25:/home/app# pip freeze | grep -E "celery|kombu"
celery==4.2.0rc4
kombu==4.2.0
worker still hangs in an infinite loop of processing a single task:
root@2e6a9b563d25:/home/app# celery worker -l INFO -A tracker.tasks -P solo
(...)
[2018-05-27 10:50:39,678: INFO/MainProcess] celery@2e6a9b563d25 ready.
[2018-05-27 10:50:50,422: INFO/MainProcess] Received task: tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae]
[2018-05-27 10:50:50,423: WARNING/MainProcess] sleep time!
[2018-05-27 10:53:50,522: WARNING/MainProcess] all good
[2018-05-27 10:53:50,522: INFO/MainProcess] Task tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae] succeeded in 180.09944431299982s: None
[2018-05-27 10:53:50,523: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
self.ack(multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-05-27 10:53:50,549: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 598, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 354, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
reader(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-05-27 10:53:50,577: INFO/MainProcess] Connected to amqp://pprof:**@rabbitmq:5672/pprof
[2018-05-27 10:53:50,602: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 10:53:50,610: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 40, in start
self.sync(c)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 44, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 57, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 143, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 95, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 454, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-05-27 10:53:50,634: INFO/MainProcess] Connected to amqp://pprof:**@rabbitmq:5672/pprof
[2018-05-27 10:53:50,657: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 10:53:51,694: INFO/MainProcess] mingle: all alone
[2018-05-27 10:53:51,734: INFO/MainProcess] Received task: tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae]
[2018-05-27 10:53:51,736: WARNING/MainProcess] sleep time!
do you have any fix to apply as pr?
Is there any workaround for this issue, before it fixed? It really hurts. thank you
Any news on this issue, please? Having the same here with the solo worker… Maybe we should add a warning in the documentation?
Is there any update?
Does this issue need a PR?
A test with a Fix, yes.
As far as I understand the issue is caused by the solo pool blocking behavior, which prevents heartbeats, as @vanzi noted:
in the solo mode task execution prevents heart beats and the connection times out when the task takes too long (about 3 minutes with default config), ...
Related:
- https://github.com/celery/py-amqp/issues/265
- https://www.rabbitmq.com/heartbeats.html#disabling
Hi , we are still facing this issue with a longer running task and it is quite blocking - do you have any possible updates/workarounds for it ?
Any updates? If I run celery without the -P solo arg, it doesn't execute my tasks/ stops in the middle of the task. With the -P solo arg, it keeps re-running the tasks and end with Broken Pipe Error no 32 and the task never gets ack'd.
Any updates? If I run celery without the -P solo arg, it doesn't execute my tasks/ stops in the middle of the task. With the -P solo arg, it keeps re-running the tasks and end with Broken Pipe Error no 32 and the task never gets ack'd.
please feel free to debug and provide a fix as you hit the issue on production.
I debugged celery in a docker container and I'm sharing the debug logs here. Maybe someone who knows more about the underlying code of celery and rabbitmq might be able to figure it out.
Versions: celery==4.4.6 RabbitMQ version: 3.8.5
celery docker entrypoint: celery -A proj worker -B -P solo --loglevel=DEBUG
Re-Running task debug log: https://gist.github.com/shenoy-anurag/bb723684df37c82c85c7b44cbc7b3c7c
Although this task is 30s long, the process of debugging caused it to go beyond 120s which is approx. the amount of time required for rabbitmq to give a missed heartbeats error, after which celery is unable to get the task acknowledged with the broker.
rabbitmq_1 | closing AMQP connection <0.28344.0> (172.18.0.5:46918 -> 172.18.0.3:5672): rabbitmq_1 | missed heartbeats from client, timeout: 60s
I hope this helps.
I am currently able to get my long running tasks to run only once without re-running in a loop by setting the
BROKER_HEARTBEAT = None
configuration variable.
I think my scheduled periodic tasks aren't running anymore but am currently testing things.
Edit:
Setting
BROKER_HEARTBEAT = 600
causes celery conn to reset once and thus repeat the long task once.
Second time around however, the long task is acknowledged and there is no
missed heartbeats; timeout 60s
message.
Configuration is same as my previous comment.
Other config variables:
CELERYD_PREFETCH_MULTIPLIER = 1 # Doesn't seem to work. Flower shows the multiplier as 4. BROKER_CONNECTION_TIMEOUT = 720 BROKER_POOL_LIMIT = None
debug logs: https://gist.github.com/shenoy-anurag/783e78896e4888c118e40222c63b3354
@shenoy-anurag Regarding as CELERYD_PREFETCH_MULTIPLIER =1 settings. you have to also set CELERY_TASK_ACKS_LATE = True. please check https://docs.celeryproject.org/en/latest/userguide/optimizing.html#reserve-one-task-at-a-time
If you use django and app.config_from_object('django.conf:settings', namespace='CELERY'), you must use CELERY_WORKER_PREFETCH_MULTIPLIER as parameter name.
I've had similar issues with this. Depending on why you're running -P solo , you might consider running -P gevent --concurrency 1: that'll allow using gevent within the task to yield execution to allow sending heartbeats (but keeping the task execution context "single threaded".) Our large tasks were loops with each loop taking up to 10 seconds. Adding gevent.sleep(0) will allow sending heartbeats mid-task if needed:
@app.task(track_started=True, acks_late=True, bind=True)
def process_upload(self, rows):
import gevent
for row in rows:
# allow a context switch to send a heartbeat if necessary
gevent.sleep(0)
# ... process the row, ~10s
Other than this, I'm not sure this will ever be truly "solved": celery is doing exactly what it's told and running in a single-threaded context, and can't send heartbeats mid-task execution.
@andytumelty is this the solution for error: [Errno 104] Connection reset by peer even if we have disabled heartbeats and prefetch (btw it seems not working)?