How should I handle TimeoutError?
TimeoutError: [Errno 110] Connection timed out
...
ch.connection.drain_events(timeout=1)
File "amqp/connection.py", line 471, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 476, in blocking_read
frame = self.transport.read_frame()
File "amqp/transport.py", line 226, in read_frame
frame_header = read(7, True)
File "amqp/transport.py", line 401, in _read
s = recv(n - len(rbuf))
drain_events function normally raises socket.timeout() when there is no event in timeout period. But sometimes I see the previous traceback in logs. AFAIK Python socket module raises socket.timeout() too when there is no data to be received. I can reproduce that with a simple script.
However, I can't reproduce the case that drain_events throwing TimeoutError.
Does this exception needs to be handled by the library or my application code?
Using Python 3.5 on Linux.
After some investigation I found out that TimeoutError is specifically a subclass of OSError representing the case where errno is ETIMEDOUT: https://bugs.python.org/issue21376#msg217402
In Linux, ETIMEDOUT is set by operating system if TCP keep-alive mechanism detects that the connection is dead: http://man7.org/linux/man-pages/man7/tcp.7.html
I have found that there are some default options that is set on the socket about TCP keep-alive feature starting from version 2.1.4: https://amqp.readthedocs.io/en/latest/changelog.html#version-2-1-4
These parameters are defined here in code: https://github.com/celery/py-amqp/blob/068223d25167ab63735158a7083946971308fce2/amqp/transport.py#L36-L42
I think TCP_USER_TIMEOUT parameter here is set to very low value (1 second). I propose increasing this value to at least 10 seconds. What do you think?
@cenkalti, I see a similar Timeout problem when using a Celery ResultGroup with several tasks, and then attempting to revoke() the group.
File "/…/celery/tasks.py", line 386, in revoke_group_tasks
group_result.revoke()
File "/…/lib/python3.6/site-packages/celery/result.py", line 589, in revoke
terminate=terminate, signal=signal, reply=wait)
File "/…/lib/python3.6/site-packages/celery/app/control.py", line 210, in revoke
}, **kwargs)
File "/…/lib/python3.6/site-packages/celery/app/control.py", line 436, in broadcast
limit, callback, channel=channel,
File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/…/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/…/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/…/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/…/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
self._write(s)
TimeoutError: [Errno 110] Connection timed out
Did changing the TCP timeout solve this?
A “workaround” for this problem might be to revoke all tasks of a group one-by-one in a loop, although I have not yet tried that.
@jenstroeger No, changing the TCP_USER_TIMEOUT didn't solve the problem, we keep seeing TimeoutError exceptions.
@cenkalti, hmmm… ok 🤔I guess I’ll switch to plan B then and revoke all tasks of the group individually, while digging for the root cause of the timeout on our side. Thanks!
@cenkalti, in my case the TimeoutError seems to be caused by revoke() on a task that has already executed: the group contains six tasks which are scheduled over the period of two days, and revoking that group at any time during these two days attempts to revoke a task that has already executed.
# group_results.revoke()
for result in group_results:
try:
_log.info("Revoking %s", result.id)
result.revoke()
except Exception as e:
_log.exception(f"Failed to revoke {result.id}!")
It is the task which has already executed that causes the TimeoutError, and all other ones are being revoked successfully.
@georgepsarakis, is it possible to shed some light on that?
Here having the same error:
Traceback (most recent call last):
File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/channel.py", line 1766, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/method_framing.py", line 172, in write_frame
write(view[:offset])
File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/transport.py", line 305, in write
self._write(s)
TimeoutError: [Errno 110] Connection timed out
Using amqp==2.6.0, and here is the code that (after pushing aprox 1 millon packages) raise the exception:
def run():
with Connection(rabbit_url, read_timeout=30000, write_timeout=30000) as conn:
with open("foo.csv", "r") as fp:
exchange = Exchange("example-exchange-1", type="direct")
channel = conn.channel()
producer = Producer(exchange=exchange, channel=channel, routing_key="test")
queue = Queue(name="test-1", exchange=exchange, routing_key="test")
queue.maybe_bind(conn)
queue.declare()
for line in fp:
producer.publish(line, max_retries=3, errback=lambda x: print(x))