confluent-kafka-python
confluent-kafka-python copied to clipboard
Unable to catch KafkaError Exception
Description
The group responsible for the Kafka Broker got a problem and it generated a KafkaError within my application.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/app/service/kafka_consumer_services.py", line 49, in consume
self._treat_message(msg=msg)
File "/app/service/kafka_consumer_services.py", line 107, in _treat_message
self.kafka_consumer.commit(asynchronous=False)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}
What I don't understand is, that I'm supposed to catch the error in my code:
def _treat_message(self, msg: Message):
try:
payload_str = msg.value().decode("utf-8")
payload = json.loads(payload_str)
logger.info(f"Treating this event")
result = Result(**payload)
self._add_update_test_result(result=result)
self._save_result_to_file(result=result)
self.kafka_consumer.commit(asynchronous=False)
except (ValueError, KeyError, pydantic.error_wrappers.ValidationError,) as ve:
logger.warning(f"Message is invalid, ignoring : {ve}")
self.kafka_consumer.commit(asynchronous=False)
except KafkaError as ke:
logger.error(f"Kafka Error happened - {ke}... ")
Can someone help me understand what went wrong please?
How to reproduce
Commit a message while there is a problem with the Broker
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): confluent_kafka-1.6.1.dist-info - [ ] Apache Kafka broker version: N/A
- [ ] Client configuration:
consumer_configuration = { "bootstrap.servers": f"{server_name}:{communication_port}", "group.id": f"{consumer_group}", "auto.offset.reset": "earliest", "enable.auto.commit": "false", "security.protocol": "SASL_SSL", "sasl.mechanism": "SCRAM-SHA-512", "enable.ssl.certificate.verification": "false", "sasl.username": f"{consumer_username}", "sasl.password": f"{consumer_password}", } - [ ] Operating system: Container python:3.8-slim (Debian)
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts N/A
- [ ] Critical issue
I just saw those messages.
I will try to work with these first:
%4|1624165645.301|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state steady) after 10493 ms without a successful response from the group coordinator (broker 6, last error was Broker: Not coordinator): revoking assignment and rejoining group
%4|1624165936.814|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 173ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Based on this, I may have a valid issue. https://github.com/confluentinc/confluent-kafka-go/issues/598
Anyone can help me with this?
same issue here
%4|1625095123.813|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10499 ms without a successful response from the group coordinator (broker 1, last error was Broker: Not coordinator): revoking assignment and rejoining group
@immortel32 KafkaError is not an exception; it's an object that contains information about the error. As shown here:
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}
The actual exception is KafkaException, and you can extract the KafkaError instance from the exception with
try:
raise_kafka_exception()
except KafkaException as err:
kafka_error = err.args[0]
assert error.code() is not None
@rvinzent I completely agree with you and that's why I opened an issue. If you look at the code I shared, I have the except KafkaException and the code did not catch it.
@rvinzent I completely agree with you and that's why I opened an issue. If you look at the code I shared, I have the except KafkaException and the code did not catch it.
The code you shared catches KafkaError not KafkaException
@immortel32
KafkaErroris not an exception; it's an object that contains information about the error. As shown here:cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Commit failed: Local: Broker transport failure"}The actual exception is
KafkaException, and you can extract theKafkaErrorinstance from the exception withtry: raise_kafka_exception() except KafkaException as err: kafka_error = err.args[0] assert error.code() is not None
The example from our code is similar with what @rvinzent provided: https://github.com/confluentinc/confluent-kafka-python/blob/master/tests/test_Consumer.py#L106
@immortel32 do you still have any questions related to this one?
it never enters the exception block
@immortel32 can this be closed?
Yes, you can