confluent-kafka-python
confluent-kafka-python copied to clipboard
VALUE_DESERIALIZATION - Connection reset by peer
Hello 👋 I'm responsible for two App Containers consuming from 2 different Kafka topics: one with Avro schema, the other schema-less. The schema less consumer never crashed, however the one with schema crashes (and auto restarts) periodically every 2 weeks (a bit noisy, also not nice to have an unwanted downtime of 30 secs).
Now, let's assume the crashing Container has a temporary networking issue on connecting to its schema registry, how would the library (in particular SchemaRegistryClient / DeserializingConsumer) behave in this case ? would it be retrying for a little bit, or just crash brutally ?
Consider I'm having the following setup:
schema_registry_client = SchemaRegistryClient(...)
avro_deserializer = AvroDeserializer(schema_registry_client)
string_deserializer = StringDeserializer("utf_8")
consumer = DeserializingConsumer(...)
the exception is being raised is:
Traceback (most recent call last):
File "/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
response.begin()
File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/requests/adapters.py", line 440, in send
resp = conn.urlopen(
File "/urllib3/connectionpool.py", line 785, in urlopen
retries = retries.increment(
File "/urllib3/util/retry.py", line 550, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/urllib3/packages/six.py", line 769, in reraise
raise value.with_traceback(tb)
File "/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
response.begin()
File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
return self._sslobj.read(len, buffer)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/confluent_kafka/deserializing_consumer.py", line 137, in poll
value = self._value_deserializer(value, ctx)
File "/confluent_kafka/schema_registry/avro.py", line 348, in __call__
schema = self._registry.get_schema(schema_id)
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 368, in get_schema
response = self._rest_client.get('schemas/ids/
{}
'.format(schema_id))
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 124, in get
return self.send_request(url, method='GET', query=query)
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 167, in send_request
response = self.session.request(
File "/requests/sessions.py", line 529, in request
resp = self.send(prep, **send_kwargs)
File "/requests/sessions.py", line 645, in send
r = adapter.send(request, **kwargs)
File "/requests/adapters.py", line 501, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/src/kafka_consumer.py", line 88, in <module>
kafka_consumer()
File "/src/kafka_consumer.py", line 62, in kafka_consumer
raise e
File "/src/kafka_consumer.py", line 48, in kafka_consumer
message_raw = consumer.poll(1.0)
File "/confluent_kafka/deserializing_consumer.py", line 139, in poll
raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))"}
Thank you for any assistance or suggestions 🙏
it looks like failed requests to SR are not retried. in the consume case, this is actually pretty inconvenient, as calling consume again will give you the next message (whereas you want to get the one that resulted in an SR call failure again). since this happens infrequently, i would suggest an appropriate way of dealing with it would be just restarting the process, or consumer.
marking as enhancement, as reminder to look into doing something better.