confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Unable to catch KafkaError Exception

Open immortel32 opened this issue 4 years ago • 10 comments
trafficstars

Description

The group responsible for the Kafka Broker got a problem and it generated a KafkaError within my application.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/app/service/kafka_consumer_services.py", line 49, in consume
    self._treat_message(msg=msg)
  File "/app/service/kafka_consumer_services.py", line 107, in _treat_message
    self.kafka_consumer.commit(asynchronous=False)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}

What I don't understand is, that I'm supposed to catch the error in my code:

    def _treat_message(self, msg: Message):
        try:
            payload_str = msg.value().decode("utf-8")
            payload = json.loads(payload_str)
            logger.info(f"Treating this event")
            result = Result(**payload)
            self._add_update_test_result(result=result)
            self._save_result_to_file(result=result)
            self.kafka_consumer.commit(asynchronous=False)
        except (ValueError, KeyError, pydantic.error_wrappers.ValidationError,) as ve:
            logger.warning(f"Message is invalid, ignoring : {ve}")
            self.kafka_consumer.commit(asynchronous=False)
        except KafkaError as ke:
            logger.error(f"Kafka Error happened - {ke}... ")

Can someone help me understand what went wrong please?

How to reproduce

Commit a message while there is a problem with the Broker

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): confluent_kafka-1.6.1.dist-info
  • [ ] Apache Kafka broker version: N/A
  • [ ] Client configuration: consumer_configuration = { "bootstrap.servers": f"{server_name}:{communication_port}", "group.id": f"{consumer_group}", "auto.offset.reset": "earliest", "enable.auto.commit": "false", "security.protocol": "SASL_SSL", "sasl.mechanism": "SCRAM-SHA-512", "enable.ssl.certificate.verification": "false", "sasl.username": f"{consumer_username}", "sasl.password": f"{consumer_password}", }
  • [ ] Operating system: Container python:3.8-slim (Debian)
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts N/A
  • [ ] Critical issue

immortel32 avatar Jun 21 '21 10:06 immortel32

I just saw those messages.

I will try to work with these first:

%4|1624165645.301|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state steady) after 10493 ms without a successful response from the group coordinator (broker 6, last error was Broker: Not coordinator): revoking assignment and rejoining group

%4|1624165936.814|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 173ms (adjust max.poll.interval.ms for long-running message processing): leaving group

immortel32 avatar Jun 21 '21 10:06 immortel32

Based on this, I may have a valid issue. https://github.com/confluentinc/confluent-kafka-go/issues/598

immortel32 avatar Jun 21 '21 10:06 immortel32

Anyone can help me with this?

immortel32 avatar Jun 29 '21 17:06 immortel32

same issue here

rogaha avatar Jul 01 '21 03:07 rogaha

%4|1625095123.813|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10499 ms without a successful response from the group coordinator (broker 1, last error was Broker: Not coordinator): revoking assignment and rejoining group

rogaha avatar Jul 01 '21 03:07 rogaha

@immortel32 KafkaError is not an exception; it's an object that contains information about the error. As shown here:

cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}

The actual exception is KafkaException, and you can extract the KafkaError instance from the exception with

try:
    raise_kafka_exception()
except KafkaException as err:
    kafka_error = err.args[0]
    assert error.code() is not None

rvinzent avatar Jul 22 '21 17:07 rvinzent

@rvinzent I completely agree with you and that's why I opened an issue. If you look at the code I shared, I have the except KafkaException and the code did not catch it.

immortel32 avatar Jul 23 '21 10:07 immortel32

@rvinzent I completely agree with you and that's why I opened an issue. If you look at the code I shared, I have the except KafkaException and the code did not catch it.

The code you shared catches KafkaError not KafkaException

rvinzent avatar Jul 23 '21 16:07 rvinzent

@immortel32 KafkaError is not an exception; it's an object that contains information about the error. As shown here:

cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}

The actual exception is KafkaException, and you can extract the KafkaError instance from the exception with

try:
    raise_kafka_exception()
except KafkaException as err:
    kafka_error = err.args[0]
    assert error.code() is not None

The example from our code is similar with what @rvinzent provided: https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_Consumer.py#L106

@immortel32 do you still have any questions related to this one?

jliunyu avatar Mar 15 '22 05:03 jliunyu

it never enters the exception block

DoubtMail avatar Feb 07 '23 02:02 DoubtMail

@immortel32 can this be closed?

nhaq-confluent avatar Mar 06 '24 01:03 nhaq-confluent

Yes, you can

immortel32 avatar Mar 09 '24 15:03 immortel32