celery icon indicating copy to clipboard operation
celery copied to clipboard

Long tasks are executed multiple times

Open MarcSalvat opened this issue 9 years ago • 40 comments

Hi,

We are using celery with tasks that may take up a lot of time (hours or even days). The problem is that after a couple of hours task is sent back to the queue and run from another worker (or the same one after it finishes).

Looks like the task is not acknowledged and after that 2h limit it is readded (by rabbit?). If that is the problem, is there a way to acknowledge them in the begining of the task or disable this behaviour?

We are using celery 4.0.0 and RabbitMQ 3.6.3 with the following celery config:

CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Etc/UTC' CELERY_DISABLE_RATE_LIMITS = True CELERY_IGNORE_RESULT = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_SEND_EVENTS = False

Thanks

MarcSalvat avatar Sep 06 '16 10:09 MarcSalvat

Celery acknowledges tasks before they execute by default, are you sure you don't have CELERY_ACKS_LATE or @task(acks_late=True) enabled?

ask avatar Sep 06 '16 17:09 ask

Yeah, when a new task starts I can see an unacked message in the rabbit queue until it finishes. This should be the behaviour when having CELERY_ACKS_LATE right? but I don't have it enabled. Just tested with this code:

from celery import Celery celery = Celery('PyFaster', broker="xxxx") celery.config_from_object('celeryconfig')

tests_ack.py @celery.task(queue='tests_ack', name="test") def test(): print "Doing task .. waiting 20s" time.sleep(20)

if name == "main": test.apply_async()

celeryconfig.py: CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Etc/UTC'

launching worker with: celery worker -A task -Q tests_ack -c 1 --pool=solo -n tests_ack --loglevel debug

some debug log:

[2016-09-07 18:07:10,083: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@CVC95', 'platform': 'Erlang/OTP', 'version': '3.6.3'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']

[2016-09-07 18:07:58,018: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fd20f8bd398> (args:('test', '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', {'origin': 'gen18378@CVC95', 'lang': 'py', 'task': 'test', 'group': None, 'root_id': None, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'tests_ack', u'exchange': u''}, 'expires': None, u'correlation_id': '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', 'retries': 0, 'timelimit': [None, None], 'argsrepr': '()', 'eta': None, 'parent_id': None, u'reply_to': '69bf79be-b51b-3a39-ad67-45863d7c3004', 'id': '13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff', 'kwargsrepr': '{}'}, u'[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{}) [2016-09-07 18:07:58,019: DEBUG/MainProcess] Task accepted: test[13fe31b9-e1e6-47cf-82f3-eb0caa33c7ff] pid:17017

I think after 2h the same task came with the redelivered flag set to true. I'm running it now with a sleep of 3h to verify

MarcSalvat avatar Sep 07 '16 16:09 MarcSalvat

Is there anything in the logs to suggest the ack operation failed? It should be error severity and say something like connection lost

ask avatar Sep 07 '16 19:09 ask

Yes, just checked the worker. It also looks like ack was sent in the end of the task instead of the beginning?

[2016-09-07 18:31:06,274: WARNING/MainProcess] Doing task ... waiting 3h [2016-09-07 21:31:06,363: INFO/MainProcess] Task test[53591cbc-53f2-4639-82a9-cb834020c7cb] succeeded in 10800.0896712s: None [2016-09-07 21:31:06,364: CRITICAL/MainProcess] Couldn't ack 4, reason:error(104, 'Connection reset by pe er') Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/message.py", line 96, in ac k_log_error self.ack(multiple=multiple) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/message.py", line 91, in ac k self.channel.basic_ack(self.delivery_tag, multiple=multiple) File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1425, in basic_ack spec.Basic.Ack, argsig, (delivery_tag, multiple), File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 62, in send_method conn.frame_writer.send((1, self.channel_id, sig, args, content)) File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 165, in frame_writer write(view[:offset]) File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 253, in write self._write(s) File "/usr/lib/python2.7/socket.py", line 224, in meth return getattr(self._sock,name)(_args) error: [Errno 104] Connection reset by peer [2016-09-07 21:31:06,387: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establis h the connection... Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/consumer/consumer. py", line 310, in start blueprint.start(self) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/bootsteps.py", line 120, in start step.start(parent) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/consumer/consumer. py", line 576, in start c.loop(_c.loop_args()) File "/usr/local/lib/python2.7/dist-packages/celery-4.0.0rc3-py2.7.egg/celery/worker/loops.py", line 88 , in asynloop next(loop) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/async/hub.py", line 280, in create_loop item() File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 134, in call return self.throw() File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 131, in call retval = self.fun(_final_args, *_final_kwargs) File "/usr/local/lib/python2.7/dist-packages/kombu-4.0.0rc3-py2.7.egg/kombu/transport/base.py", line 15 3, in _read raise RecoverableConnectionError('Socket was disconnected') RecoverableConnectionError: Socket was disconnected [2016-09-07 21:31:06,387: DEBUG/MainProcess] | Consumer: Restarting event loop... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Heart... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Control... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Tasks... [2016-09-07 21:31:06,388: DEBUG/MainProcess] Canceling task consumer... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Gossip... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Mingle... [2016-09-07 21:31:06,388: DEBUG/MainProcess] | Consumer: Restarting Events... [2016-09-07 21:31:06,389: DEBUG/MainProcess] | Consumer: Restarting Connection... [2016-09-07 21:31:06,389: DEBUG/MainProcess] | Consumer: Starting Connection [2016-09-07 21:31:06,396: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.b locked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_co nsumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': Tru e}, 'cluster_name': 'rabbit@CVC95', 'platform': 'Erlang/OTP', 'version': '3.6.3'}, mechanisms: [u'AMQPLAI N', u'PLAIN'], locales: [u'en_US']

Rabbit and the celery worker are in the same machine

MarcSalvat avatar Sep 07 '16 20:09 MarcSalvat

I've tried same code with celery 3.1.23 and kombu 3.0.35 and it worked as expected

MarcSalvat avatar Sep 08 '16 14:09 MarcSalvat

@MarcSalvat you mean that this problem is new in celery4.0.0 and kombu 4.0 ? And If we use celery 3.1.23 and kombu3.0.35, this problem is not occured ?

Sunrry avatar Dec 09 '16 01:12 Sunrry

@Sunrry Correct. Last time I checked was with v4 rc3

MarcSalvat avatar Dec 09 '16 10:12 MarcSalvat

I am seeing the same issue. Based on my testing, it appears to be caused by the solo setting. Does celery ack late automatically with the solo option on or is it delayed by the task itself?

taylor-cedar avatar Apr 24 '17 19:04 taylor-cedar

Probably a dupe of #3768

pnovotnak avatar Apr 25 '17 04:04 pnovotnak

Yup having same issue with solo,

The task takes few minutes to completely but in the meantime the connection is reset and it does not acks and simply reruns.

[2017-05-28 19:57:43,988: INFO/MainProcess] Task perform_ssd_detection_by_id[2bd234fe-d7bb-426b-bda5-7e8f05f2a7dd] succeeded in 229.236948594s: 0
[2017-05-28 19:57:43,989: CRITICAL/MainProcess] Couldn't ack 6, reason:error(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 130, in ack_log_error
    self.ack(multiple=multiple)
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 125, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1408, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 64, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 174, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 269, in write
    self._write(s)
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer

ghost avatar May 28 '17 20:05 ghost

anyone provide a testcase to start work towards fix?

auvipy avatar Jan 17 '18 11:01 auvipy

This issue happens in both celery 4.1.0 and 4.2.0rc2 using all default settings except for the solo mode (Python 3.6.4).

It seems to me that:

  • in the solo mode task execution prevents heart beats and the connection times out when the task takes too long (about 3 minutes with default config), and
  • acks_late is not respected in solo mode, the task is not acked before it is executed.

The code needed to reproduce the issue:

import time

from celery import Celery

app = Celery('test', broker='amqp://test:test@rabbitmq/test')

@app.task
def foo():
    print('sleep time!')
    time.sleep(180)
    print('all good')

Starting worker with:

celery worker -l INFO -A tasks -P solo

Then, schedule the task and watch the worker get stuck in a permanent loop.

Log from 4.1.0:

[2018-04-26 15:20:36,954: INFO/MainProcess] Received task: tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e]
[2018-04-26 15:20:36,954: WARNING/MainProcess] sleep time!
[2018-04-26 15:23:36,960: WARNING/MainProcess] all good
[2018-04-26 15:23:36,960: INFO/MainProcess] Task tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e] succeeded in 180.0062870910001s: None
[2018-04-26 15:23:36,961: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
    self.ack(multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:23:36,986: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 320, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 88, in asynloop
    next(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/async/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
    reader(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
    raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-04-26 15:23:36,999: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:23:37,019: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:23:38,030: INFO/MainProcess] mingle: all alone
[2018-04-26 15:23:38,071: INFO/MainProcess] Received task: tasks.foo[1d8b142d-92dc-4cce-9d4b-20decb3d006e]
[2018-04-26 15:23:38,073: WARNING/MainProcess] sleep time!

Log from 4.2.0rc2:

[2018-04-26 15:26:52,350: INFO/MainProcess] Received task: tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a]
[2018-04-26 15:26:52,350: WARNING/MainProcess] sleep time!
[2018-04-26 15:29:52,431: WARNING/MainProcess] all good
[2018-04-26 15:29:52,432: INFO/MainProcess] Task tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a] succeeded in 180.0813092110002s: None
[2018-04-26 15:29:52,432: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
    self.ack(multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:29:52,444: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 598, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/async/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
    reader(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
    raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-04-26 15:29:52,453: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:29:52,473: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:29:52,477: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 40, in start
    self.sync(c)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 44, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 57, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 143, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 95, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 454, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-04-26 15:29:52,490: INFO/MainProcess] Connected to amqp://test:**@rabbitmq:5672/test
[2018-04-26 15:29:52,497: INFO/MainProcess] mingle: searching for neighbors
[2018-04-26 15:29:53,512: INFO/MainProcess] mingle: all alone
[2018-04-26 15:29:53,523: INFO/MainProcess] Received task: tasks.foo[f1964479-31b6-45b6-b7e9-d5a01bd7397a]
[2018-04-26 15:29:53,524: WARNING/MainProcess] sleep time!

vanzi avatar Apr 26 '18 15:04 vanzi

plz wait untill we release the latest pyamqp and kombu to be checked with celery master

auvipy avatar Apr 26 '18 17:04 auvipy

can anyone let us know its ststus with kombu 4.2 and celery 4.2rc4?

auvipy avatar May 25 '18 19:05 auvipy

@auvipy I did not notice any difference compared to 4.2rc2. RabbitMQ admin still shows the task as unacked while it is being executed by worker.

Using the same task:

import time

from celery import Celery

app = Celery('test', broker='amqp://test:test@rabbitmq/test')

@app.task
def foo():
    print('sleep time!')
    time.sleep(180)
    print('all good')

and

root@2e6a9b563d25:/home/app# pip freeze | grep -E "celery|kombu"
celery==4.2.0rc4
kombu==4.2.0

worker still hangs in an infinite loop of processing a single task:

root@2e6a9b563d25:/home/app# celery worker -l INFO -A tracker.tasks -P solo
(...)
[2018-05-27 10:50:39,678: INFO/MainProcess] celery@2e6a9b563d25 ready.
[2018-05-27 10:50:50,422: INFO/MainProcess] Received task: tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae]
[2018-05-27 10:50:50,423: WARNING/MainProcess] sleep time!
[2018-05-27 10:53:50,522: WARNING/MainProcess] all good                                      
[2018-05-27 10:53:50,522: INFO/MainProcess] Task tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae] succeeded in 180.09944431299982s: None
[2018-05-27 10:53:50,523: CRITICAL/MainProcess] Couldn't ack 1, reason:ConnectionResetError(104, 'Connection reset by peer')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 130, in ack_log_error
    self.ack(multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack      
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1394, in basic_ack 
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame 
    write(view[:offset])                                     
  File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write           
    self._write(s)      
ConnectionResetError: [Errno 104] Connection reset by peer                           
[2018-05-27 10:53:50,549: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):                        
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)                                                  
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)                                                                                            
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 598, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
    reader(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/base.py", line 216, in _read
    raise RecoverableConnectionError('Socket was disconnected')
amqp.exceptions.RecoverableConnectionError: Socket was disconnected
[2018-05-27 10:53:50,577: INFO/MainProcess] Connected to amqp://pprof:**@rabbitmq:5672/pprof
[2018-05-27 10:53:50,602: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 10:53:50,610: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 40, in start
    self.sync(c)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 44, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 57, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 143, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 95, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python3.6/site-packages/celery/app/control.py", line 454, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/usr/local/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-05-27 10:53:50,634: INFO/MainProcess] Connected to amqp://pprof:**@rabbitmq:5672/pprof
[2018-05-27 10:53:50,657: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 10:53:51,694: INFO/MainProcess] mingle: all alone
[2018-05-27 10:53:51,734: INFO/MainProcess] Received task: tracker.tasks.foo[3d30a02f-fb05-452f-97f0-75dedac8d8ae]
[2018-05-27 10:53:51,736: WARNING/MainProcess] sleep time!

vanzi avatar May 27 '18 11:05 vanzi

do you have any fix to apply as pr?

auvipy avatar Aug 12 '18 05:08 auvipy

Is there any workaround for this issue, before it fixed? It really hurts. thank you

rrader avatar Dec 03 '18 10:12 rrader

Any news on this issue, please? Having the same here with the solo worker… Maybe we should add a warning in the documentation?

syenchuk avatar Jun 20 '19 09:06 syenchuk

Is there any update?

vicchien avatar Sep 29 '19 02:09 vicchien

Does this issue need a PR?

zubairalam avatar Oct 07 '19 06:10 zubairalam

A test with a Fix, yes.

auvipy avatar Oct 07 '19 12:10 auvipy

As far as I understand the issue is caused by the solo pool blocking behavior, which prevents heartbeats, as @vanzi noted:

in the solo mode task execution prevents heart beats and the connection times out when the task takes too long (about 3 minutes with default config), ...

Related:

  • https://github.com/celery/py-amqp/issues/265
  • https://www.rabbitmq.com/heartbeats.html#disabling

georgepsarakis avatar Oct 14 '19 16:10 georgepsarakis

Hi , we are still facing this issue with a longer running task and it is quite blocking - do you have any possible updates/workarounds for it ?

CatarauCorina avatar Jun 15 '20 13:06 CatarauCorina

Any updates? If I run celery without the -P solo arg, it doesn't execute my tasks/ stops in the middle of the task. With the -P solo arg, it keeps re-running the tasks and end with Broken Pipe Error no 32 and the task never gets ack'd.

shenoy-anurag avatar Jul 31 '20 09:07 shenoy-anurag

Any updates? If I run celery without the -P solo arg, it doesn't execute my tasks/ stops in the middle of the task. With the -P solo arg, it keeps re-running the tasks and end with Broken Pipe Error no 32 and the task never gets ack'd.

please feel free to debug and provide a fix as you hit the issue on production.

auvipy avatar Jul 31 '20 10:07 auvipy

I debugged celery in a docker container and I'm sharing the debug logs here. Maybe someone who knows more about the underlying code of celery and rabbitmq might be able to figure it out.

Versions: celery==4.4.6 RabbitMQ version: 3.8.5

celery docker entrypoint: celery -A proj worker -B -P solo --loglevel=DEBUG

Re-Running task debug log: https://gist.github.com/shenoy-anurag/bb723684df37c82c85c7b44cbc7b3c7c

Although this task is 30s long, the process of debugging caused it to go beyond 120s which is approx. the amount of time required for rabbitmq to give a missed heartbeats error, after which celery is unable to get the task acknowledged with the broker. rabbitmq_1 | closing AMQP connection <0.28344.0> (172.18.0.5:46918 -> 172.18.0.3:5672): rabbitmq_1 | missed heartbeats from client, timeout: 60s

I hope this helps.

shenoy-anurag avatar Aug 04 '20 14:08 shenoy-anurag

I am currently able to get my long running tasks to run only once without re-running in a loop by setting the BROKER_HEARTBEAT = None configuration variable.

I think my scheduled periodic tasks aren't running anymore but am currently testing things.

Edit: Setting BROKER_HEARTBEAT = 600 causes celery conn to reset once and thus repeat the long task once. Second time around however, the long task is acknowledged and there is no missed heartbeats; timeout 60s message. Configuration is same as my previous comment.

Other config variables: CELERYD_PREFETCH_MULTIPLIER = 1 # Doesn't seem to work. Flower shows the multiplier as 4. BROKER_CONNECTION_TIMEOUT = 720 BROKER_POOL_LIMIT = None

debug logs: https://gist.github.com/shenoy-anurag/783e78896e4888c118e40222c63b3354

shenoy-anurag avatar Aug 04 '20 15:08 shenoy-anurag

@shenoy-anurag Regarding as CELERYD_PREFETCH_MULTIPLIER =1 settings. you have to also set CELERY_TASK_ACKS_LATE = True. please check https://docs.celeryproject.org/en/latest/userguide/optimizing.html#reserve-one-task-at-a-time

If you use django and app.config_from_object('django.conf:settings', namespace='CELERY'), you must use CELERY_WORKER_PREFETCH_MULTIPLIER as parameter name.

crook avatar Aug 13 '20 15:08 crook

I've had similar issues with this. Depending on why you're running -P solo , you might consider running -P gevent --concurrency 1: that'll allow using gevent within the task to yield execution to allow sending heartbeats (but keeping the task execution context "single threaded".) Our large tasks were loops with each loop taking up to 10 seconds. Adding gevent.sleep(0) will allow sending heartbeats mid-task if needed:

@app.task(track_started=True, acks_late=True, bind=True)
def process_upload(self, rows):
    import gevent
    for row in rows:
        # allow a context switch to send a heartbeat if necessary
        gevent.sleep(0)
        # ... process the row, ~10s

Other than this, I'm not sure this will ever be truly "solved": celery is doing exactly what it's told and running in a single-threaded context, and can't send heartbeats mid-task execution.

andytumelty avatar Oct 08 '20 09:10 andytumelty

@andytumelty is this the solution for error: [Errno 104] Connection reset by peer even if we have disabled heartbeats and prefetch (btw it seems not working)?

davidmir avatar Oct 23 '20 17:10 davidmir