confluent-kafka-python
confluent-kafka-python copied to clipboard
"Handle is terminating" exception when calling `delete_topics`
It has been reported that:
AdminClient({...}).delete_topics(
topics=topics, operation_timeout=30.0
)
gives the exception:
cimpl.KafkaException: KafkaError{code=_DESTROY,val=-197,str="Handle is terminating: Success"}
whereas:
client: AdminClient = AdminClient({...})
client.delete_topics(
topics=topics, operation_timeout=30.0
)
does not.
stack trace:
File "<redacted>/topics.py", line 26, in delete_topics
future.result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.7/site-packages/confluent_kafka/admin/__init__.py", line 235, in _make_topics_result
result = f.result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
cimpl.KafkaException: KafkaError{code=_DESTROY,val=-197,str="Handle is terminating: Success"}
Seems like it might be a bug related to reference counting of the AdminClient instance.
I don't think that's valid use; delete_topics() is asynchronous and returns a list of futures which the application should wait on. Even if we put a reference to the client in the futures (which we may already do, not sure), it wouldn't help here since the futures are lost immediately anyway since they're not assigned to a variable.
Please try assigning the delete_topics() return value (futures) to a variable and wait on the futures to complete to see if that helps.
Ideally, i think it would be good if fire-and-forget usage like this worked, however, I think it's very low priority compared other open issues. And it's kind of a feature in the sense that it pushes you towards writing code to handle the result.
I'm not sure of the practicalities in achieving this without digging in.
Please try assigning the delete_topics() return value (futures) to a variable and wait on the futures to complete to see if that helps.
Using the return value does not help either. Especially for the admin client, where the only use might be creating a topic it does seem like a bug to me
from confluent_kafka.admin import AdminClient, NewTopic
# this raises the exception mentioned in this issue
for topic, future in (
AdminClient({"bootstrap.servers": "localhost:9092"})
.create_topics([NewTopic("test-topic", 1, 1)])
.items()
):
print(topic)
future.result()
# this works
client = AdminClient({"bootstrap.servers": "localhost:9092"})
for topic, future in client.create_topics([NewTopic("test-topic", 1, 1)]).items():
print(topic)
future.result()