confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

VALUE_DESERIALIZATION - Connection reset by peer

Open mugx-fc opened this issue 3 years ago • 2 comments

Hello 👋 I'm responsible for two App Containers consuming from 2 different Kafka topics: one with Avro schema, the other schema-less. The schema less consumer never crashed, however the one with schema crashes (and auto restarts) periodically every 2 weeks (a bit noisy, also not nice to have an unwanted downtime of 30 secs).

Now, let's assume the crashing Container has a temporary networking issue on connecting to its schema registry, how would the library (in particular SchemaRegistryClient / DeserializingConsumer) behave in this case ? would it be retrying for a little bit, or just crash brutally ?

Consider I'm having the following setup:

schema_registry_client = SchemaRegistryClient(...)
avro_deserializer = AvroDeserializer(schema_registry_client)
string_deserializer = StringDeserializer("utf_8")
consumer = DeserializingConsumer(...)

the exception is being raised is:

Traceback (most recent call last):
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/requests/adapters.py", line 440, in send
    resp = conn.urlopen(
  File "/urllib3/connectionpool.py", line 785, in urlopen
    retries = retries.increment(
  File "/urllib3/util/retry.py", line 550, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/confluent_kafka/deserializing_consumer.py", line 137, in poll
    value = self._value_deserializer(value, ctx)
  File "/confluent_kafka/schema_registry/avro.py", line 348, in __call__
    schema = self._registry.get_schema(schema_id)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 368, in get_schema
    response = self._rest_client.get('schemas/ids/
{}
'.format(schema_id))
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 124, in get
    return self.send_request(url, method='GET', query=query)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 167, in send_request
    response = self.session.request(
  File "/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/src/kafka_consumer.py", line 88, in <module>
    kafka_consumer()
  File "/src/kafka_consumer.py", line 62, in kafka_consumer
    raise e
  File "/src/kafka_consumer.py", line 48, in kafka_consumer
    message_raw = consumer.poll(1.0)
  File "/confluent_kafka/deserializing_consumer.py", line 139, in poll
    raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))"}

Thank you for any assistance or suggestions 🙏

mugx-fc avatar May 05 '22 12:05 mugx-fc

it looks like failed requests to SR are not retried. in the consume case, this is actually pretty inconvenient, as calling consume again will give you the next message (whereas you want to get the one that resulted in an SR call failure again). since this happens infrequently, i would suggest an appropriate way of dealing with it would be just restarting the process, or consumer.

mhowlett avatar May 06 '22 14:05 mhowlett

marking as enhancement, as reminder to look into doing something better.

mhowlett avatar May 06 '22 14:05 mhowlett