py-amqp icon indicating copy to clipboard operation
py-amqp copied to clipboard

How should I handle TimeoutError?

Open cenkalti opened this issue 7 years ago • 8 comments

TimeoutError: [Errno 110] Connection timed out
  ...
    ch.connection.drain_events(timeout=1)
  File "amqp/connection.py", line 471, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 476, in blocking_read
    frame = self.transport.read_frame()
  File "amqp/transport.py", line 226, in read_frame
    frame_header = read(7, True)
  File "amqp/transport.py", line 401, in _read
    s = recv(n - len(rbuf))

cenkalti avatar Mar 08 '18 11:03 cenkalti

drain_events function normally raises socket.timeout() when there is no event in timeout period. But sometimes I see the previous traceback in logs. AFAIK Python socket module raises socket.timeout() too when there is no data to be received. I can reproduce that with a simple script.

However, I can't reproduce the case that drain_events throwing TimeoutError.

Does this exception needs to be handled by the library or my application code?

Using Python 3.5 on Linux.

cenkalti avatar Mar 08 '18 11:03 cenkalti

After some investigation I found out that TimeoutError is specifically a subclass of OSError representing the case where errno is ETIMEDOUT: https://bugs.python.org/issue21376#msg217402

In Linux, ETIMEDOUT is set by operating system if TCP keep-alive mechanism detects that the connection is dead: http://man7.org/linux/man-pages/man7/tcp.7.html

I have found that there are some default options that is set on the socket about TCP keep-alive feature starting from version 2.1.4: https://amqp.readthedocs.io/en/latest/changelog.html#version-2-1-4

These parameters are defined here in code: https://github.com/celery/py-amqp/blob/068223d25167ab63735158a7083946971308fce2/amqp/transport.py#L36-L42

I think TCP_USER_TIMEOUT parameter here is set to very low value (1 second). I propose increasing this value to at least 10 seconds. What do you think?

cenkalti avatar Mar 08 '18 22:03 cenkalti

@cenkalti, I see a similar Timeout problem when using a Celery ResultGroup with several tasks, and then attempting to revoke() the group.

   File "/…/celery/tasks.py", line 386, in revoke_group_tasks
     group_result.revoke()
   File "/…/lib/python3.6/site-packages/celery/result.py", line 589, in revoke
     terminate=terminate, signal=signal, reply=wait)
   File "/…/lib/python3.6/site-packages/celery/app/control.py", line 210, in revoke
     }, **kwargs)
   File "/…/lib/python3.6/site-packages/celery/app/control.py", line 436, in broadcast
     limit, callback, channel=channel,
   File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
     serializer=serializer)
   File "/…/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
     serializer=serializer,
   File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
     exchange_name, declare,
   File "/…/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
     mandatory=mandatory, immediate=immediate,
   File "/…/lib/python3.6/site-packages/amqp/channel.py", line 1734, in _basic_publish
     (0, exchange, routing_key, mandatory, immediate), msg
   File "/…/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
     conn.frame_writer(1, self.channel_id, sig, args, content)
   File "/…/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
     write(view[:offset])
   File "/…/lib/python3.6/site-packages/amqp/transport.py", line 258, in write
     self._write(s)
TimeoutError: [Errno 110] Connection timed out

Did changing the TCP timeout solve this?

A “workaround” for this problem might be to revoke all tasks of a group one-by-one in a loop, although I have not yet tried that.

jenstroeger avatar Apr 28 '18 21:04 jenstroeger

@jenstroeger No, changing the TCP_USER_TIMEOUT didn't solve the problem, we keep seeing TimeoutError exceptions.

cenkalti avatar Apr 30 '18 10:04 cenkalti

@cenkalti, hmmm… ok 🤔I guess I’ll switch to plan B then and revoke all tasks of the group individually, while digging for the root cause of the timeout on our side. Thanks!

jenstroeger avatar Apr 30 '18 10:04 jenstroeger

@cenkalti, in my case the TimeoutError seems to be caused by revoke() on a task that has already executed: the group contains six tasks which are scheduled over the period of two days, and revoking that group at any time during these two days attempts to revoke a task that has already executed.

# group_results.revoke()
for result in group_results:
    try:
        _log.info("Revoking %s", result.id)
        result.revoke()
    except Exception as e:
        _log.exception(f"Failed to revoke {result.id}!")

It is the task which has already executed that causes the TimeoutError, and all other ones are being revoked successfully.

@georgepsarakis, is it possible to shed some light on that?

jenstroeger avatar May 28 '18 06:05 jenstroeger

Here having the same error:

Traceback (most recent call last):
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/channel.py", line 1766, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/method_framing.py", line 172, in write_frame
    write(view[:offset])
  File "/home/emarcozzi/env/lib64/python3.6/site-packages/amqp/transport.py", line 305, in write
    self._write(s)
TimeoutError: [Errno 110] Connection timed out

Using amqp==2.6.0, and here is the code that (after pushing aprox 1 millon packages) raise the exception:

def run():
    with Connection(rabbit_url, read_timeout=30000, write_timeout=30000) as conn:
        with open("foo.csv", "r") as fp:
            exchange = Exchange("example-exchange-1", type="direct")
            channel = conn.channel()
            producer = Producer(exchange=exchange, channel=channel, routing_key="test")
            queue = Queue(name="test-1", exchange=exchange, routing_key="test")
            queue.maybe_bind(conn)
            queue.declare()
            for line in fp:
                producer.publish(line, max_retries=3, errback=lambda x: print(x))

edvm avatar Jul 22 '20 17:07 edvm