confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

"Handle is terminating" exception when calling `delete_topics`

Open mhowlett opened this issue 3 years ago • 3 comments

It has been reported that:

AdminClient({...}).delete_topics(
    topics=topics, operation_timeout=30.0
)

gives the exception:

cimpl.KafkaException: KafkaError{code=_DESTROY,val=-197,str="Handle is terminating: Success"}

whereas:

client: AdminClient = AdminClient({...})

client.delete_topics(
    topics=topics, operation_timeout=30.0
)

does not.

stack trace:

 File "<redacted>/topics.py", line 26, in delete_topics
    future.result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.7/site-packages/confluent_kafka/admin/__init__.py", line 235, in _make_topics_result
    result = f.result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
cimpl.KafkaException: KafkaError{code=_DESTROY,val=-197,str="Handle is terminating: Success"}

Seems like it might be a bug related to reference counting of the AdminClient instance.

mhowlett avatar Mar 13 '22 17:03 mhowlett

I don't think that's valid use; delete_topics() is asynchronous and returns a list of futures which the application should wait on. Even if we put a reference to the client in the futures (which we may already do, not sure), it wouldn't help here since the futures are lost immediately anyway since they're not assigned to a variable.

Please try assigning the delete_topics() return value (futures) to a variable and wait on the futures to complete to see if that helps.

edenhill avatar Mar 13 '22 19:03 edenhill

Ideally, i think it would be good if fire-and-forget usage like this worked, however, I think it's very low priority compared other open issues. And it's kind of a feature in the sense that it pushes you towards writing code to handle the result.

I'm not sure of the practicalities in achieving this without digging in.

mhowlett avatar Mar 13 '22 22:03 mhowlett

Please try assigning the delete_topics() return value (futures) to a variable and wait on the futures to complete to see if that helps.

Using the return value does not help either. Especially for the admin client, where the only use might be creating a topic it does seem like a bug to me

from confluent_kafka.admin import AdminClient, NewTopic


# this raises the exception mentioned in this issue
for topic, future in (
    AdminClient({"bootstrap.servers": "localhost:9092"})
    .create_topics([NewTopic("test-topic", 1, 1)])
    .items()
):
    print(topic)
    future.result()


# this works
client = AdminClient({"bootstrap.servers": "localhost:9092"})

for topic, future in client.create_topics([NewTopic("test-topic", 1, 1)]).items():
    print(topic)
    future.result()

fr-ser avatar Nov 03 '22 18:11 fr-ser