Handle OSError to properly recycle SSL connection, fix infinite loop
Here's a stack trace we had our logs flooded with.
[07/15/2020 08:51:14.799: ERROR/kafka.producer.sender] Uncaught error in kafka producer I/O thread Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 60, in run self.run_once() File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 160, in run_once self._client.poll(timeout_ms=poll_timeout_ms) File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 580, in poll self._maybe_connect(node_id) File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 390, in _maybe_connect conn.connect() File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 426, in connect if self._try_handshake(): File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 505, in _try_handshake self._sock.do_handshake() File "/usr/local/lib/python3.6/ssl.py", line 1077, in do_handshake self._sslobj.do_handshake() File "/usr/local/lib/python3.6/ssl.py", line 689, in do_handshake self._sslobj.do_handshake() OSError: [Errno 0] Error
The problem is Python 3.6 is returning OSError, which is not expected. Such exception is propagated to the caller and code making recycling of such connection is not executed. Therefore, Producer is guaranteed to get the same exception on a next call to poll().
Throwing of OSError doesn't seem to be documented even in latest Python docs. See 3.8 docs, but there are signs of it in 3.8 source code.
anything else needed here @dpkp and @jeffwidman ?
OSError is a very broad exception and has different coverage in different python versions. Can we narrow this down at all?
@dpkp we can't. I have seen it (not its subclass) thrown in my application and Python source code also suggests that it can be thrown in certain situation. I'm afraid we would have to apply this due to way Python is developed.
It looks like this is a python bug that has been resolved in recent versions of python: https://bugs.python.org/issue31122
I'd prefer to try to limit this to cases where known buggy versions of python are running, and also to limit it to errno=0 (note the stacktrace OSError: [Errno 0] Error)
Could we just add a case to the exception handler in conn.py (_try_handshake.py:513)? I think the self.close here is the key to handling this case properly. Otherwise, it appears that the connection error gets recycled, not the connection itself, leaving us with an endless loop of logging the error.
except OSError as ex:
if ex.errno == 0:
log.warning('SSL connection closed by server during handshake: OSError 0.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
else:
raise
I've been trying to fix the OSError issue by moving my project from Python 3.6.x to 3.9.6 and it seems to fix it. I'm seeing this trio of log entries in my log where previously it would get into the endless loop OSError issue. I see many of these trios consecutively, separated by typ. < 100 mS. I take this to mean the network got unhealthy for some reason.
INFO:2021-09-03 07:14:05,210:kafka.conn:380:<BrokerConnection node_id=2 host=myKafkaUrl:443 <connecting> [IPv4 ('x.x.x.x', 443)]>: connecting to myKafkaUrl:443 [('x.x.x.x', 443) IPv4]
WARNING:2021-09-03 07:14:05,225:kafka.conn:514:SSL connection closed by server during handshake.
INFO:2021-09-03 07:14:05,225:kafka.conn:919:<BrokerConnection node_id=2 host=myKafkaUrl:443 <handshake> [IPv4 ('x.x.x.x', 443)]>: Closing connection. KafkaConnectionError: SSL connection closed by server during handshake
WARNING:2021-09-03 07:14:05,226:kafka.client:331:Node 2 connection failed -- refreshing metadata
I think this means that I'm seeing an SSLEOFError (instead of OSError 0) -- good, but I wish we could make the logs a bit more descriptive
So maybe something like this (new error case and more descriptive logging)?
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError) as ex:
log.warning('SSL connection closed by server during handshake: %s', type(ex).__name__)
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
except OSError as ex:
if ex.errno == 0:
log.warning('SSL connection closed by server during handshake: OSError 0.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
else:
raise
# Other SSLErrors will be raised to user
https://bugs.python.org/msg375481 notes that the aforementioned bug had a patch propagated into Python 3.8+. Assuming we resume supporting the library for Python 3.8+, we could revisit this if it's still encountered in newer Python versions. Otherwise, we can close this PR.