kombu
kombu copied to clipboard
Connection.collect() is not thread safe
https://github.com/celery/kombu/blob/dba85e2d9515b9ce202bd30e8690131aa055e6bf/kombu/connection.py#L334-L350
If any other thread creates a socket while Connection.collect()
is mid-execution, the new socket will inherit the temporary timeout.
Example here: https://gist.github.com/mattbennett/888e42af88c2733918ad1810956abbde
@mattbennett I think this is expected, there are many shared objects and thread-safety is not guaranteed, as far as I know. Can you please explain here how you encountered the issue in the first place? Was it during the use of a thread pool in a Celery worker, for example?
There are some comments in the issue here: https://github.com/nameko/nameko/pull/521. Essentially, there is a background thread closing pools while other threads are creating sockets.
I understand that kombu objects may not be safe for use in multiple threads concurrently, but it doesn't seem right that side-effects leak into any other thread using the socket library. As a user of kombu there's no way I can protect against it.
This problem will happen whenever socket.setdefaulttimeout
is used in a program with more than one thread. I don't know whether best practice is to not make assumptions about the timeout on new sockets, as in my workaround, or to refrain from using socket.setdefaulttimeout
in multi-threaded environments. I think it's the latter though.
There was a similar bug reported against boto here: https://github.com/boto/boto/issues/1935
Indeed this is very annoying.
It took me hours of debugging to understand why some sockets used to send mail were sometime abruptly closed after 100ms : kombu calling socket.setdefaulttimeout in another thread just before the mail socket was created ....
Why aren't we using settimeout()
instead?
Would that be an appropriate fix?
I'm affected by this issue. The problem occours if I use multiple threads and call collect in each thread
Thread 01 | Thread 02 |
---|---|
_timeo = socket.getdefaulttimeout() # stores NULL (default value) |
`` |
socket.setdefaulttimeout(socket_timeout) # sets to 0.1 |
`` |
`` | _timeo = socket.getdefaulttimeout() # stores 0.1 (setted by thread 01) |
socket.setdefaulttimeout(_timeo) # restores to NULL |
`` |
`` | socket.setdefaulttimeout(socket_timeout) # sets to 0.1 |
`` | socket.setdefaulttimeout(_timeo) # restores to 0.1 |
After this, the default socket timeout of the whole system becomes 0.1 seconds
I'm using a multi-process/multi-thread web server (4 threads per process). When this issue occurs, all sockets operations in that process may fail because the timeout so low (0.1 seconds).