confluent-kafka-python
confluent-kafka-python copied to clipboard
Improper handling of Python C-API exceptions
Description
The library calls Python APIs (Logger.log specifically) without checking whether the Python error indicator was set.
https://github.com/confluentinc/confluent-kafka-python/blob/5a87879681d28375a55203c4839338b13b668046/src/confluent_kafka/src/confluent_kafka.c#L1834
As Python docs,
If the error is not handled or carefully propagated, additional calls into the Python/C API may not behave as intended and may fail in mysterious ways.
Consider a case where Logger.log raises an exception, thus, setting the python error indicator. In the next log_cb invocation by librdkafka, this library would again call Logger.log without handling the previous error. Because of this, method calls returns a misleading SystemError which is difficult to catch in a python application.
How to reproduce
We see this issue consistently when SIGINT is sent to a process which is polling the Consumer. Logger.log raises a KeyboardInterrupt which isn't handled before calling Logger.log again. We can't share our proprietary application so here's a minimal reproducer using a mock Logger class to easily reproduce the issue (we see the issue with the default Logger class):
from confluent_kafka import Consumer
bootstrap_servers = "<bootstrap servers>"
consumer_config = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'NONE',
'debug': 'all',
}
print("creating consumer")
class Logger:
"""
Mock Logger class to easily reproduce the issue
"""
def log(self, *args, **kwargs):
raise KeyboardInterrupt
logger = Logger()
consumer = Consumer(consumer_config, logger=logger)
print("consumer created")
consumer.subscribe(["some_topic"])
while True:
print("polling")
r = consumer.poll(2)
Running this would show this:
[nix-shell:/codemill/krishan/maestro-api]$ python cf_gh_issue.py |& head -100
creating consumer
consumer created
polling
Traceback (most recent call last):
File "/codemill/krishan/maestro-api/cf_gh_issue.py", line 21, in log
raise KeyboardInterrupt
KeyboardInterrupt
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/codemill/krishan/maestro-api/cf_gh_issue.py", line 21, in log
raise KeyboardInterrupt
SystemError: <class 'KeyboardInterrupt'> returned a result with an exception set
This SystemError is misleading.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version:
2.3.0 - [x] Apache Kafka broker version: 3.0.0
- [x] Client configuration:
debug=all - [x] Operating system:
Red Hat Enterprise Linux 8.9 - [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue