confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Attempting to call list_topics() on a closed Kafka Consumer object results in a segmentation fault.

Open prashant-kumar-27-06 opened this issue 8 months ago • 1 comments

Description:

When calling the list_topics() method on a Kafka Consumer object after the close() method has already been invoked, a segmentation fault occurs, causing the Python interpreter to crash and shut down the terminal. This issue was encountered in the following situation:

Steps to Reproduce:

  1. Create and configure a Kafka Consumer object.
  2. Call the close() method on the Consumer object.
  3. Attempt to call list_topics() on the closed Consumer object.

Observed Behavior:

The terminal crashes and a segmentation fault error is raised. The following error log details the issue:

In [2]: from confluent_kafka import Consumer as ConfluentKafkaConsumer

In [5]: from datetime import datetime, timedelta

In [6]: base_prefix = "testkafka.kuprasha.testing.confluent.upgrade."

In [7]: def get_a_group_name():
   ...:     return f"{base_prefix}{datetime.now().strftime('%Y%m%d%H%M%S')}.group"
   ...: 

In [8]: def on_commit(err, topics_partitions):
   ...:     if err:
   ...:         print(f"ERROR _ON_COMMIT: {err}")
   ...:     else:
   ...:         print(f"Offset commit succeeded: {topics_partitions}")
   ...: 
   ...: def on_error(err):
   ...:     print(f"Encountered error {err.name()}: {err}")
   ...: 

In [10]: my_server = ",".join(get_kafka_server_host_port('qa'))

In [11]: configs = {'bootstrap.servers': my_server,
    ...:            'group.id': get_a_group_name(),
    ...:            'on_commit': on_commit,
    ...:            'error_cb': on_error,
    ...:            'auto.offset.reset': 'earliest',
    ...:            "enable.auto.commit": False,
    ...:           }
    ...: configs = deep_merge(get_sasl_kerberos_config(), configs)
    ...: 
    ...: my_consumer = ConfluentKafkaConsumer(configs)

In [12]: my_consumer.assignment()
Out[12]: []

In [13]: topic_info = my_consumer.list_topics()

In [14]: type(topic_info)
Out[14]: confluent_kafka.admin._metadata.ClusterMetadata

In [15]: my_consumer.close()

In [16]: my_consumer.assignment()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[16], line 1
----> 1 my_consumer.assignment()

RuntimeError: Consumer closed

In [17]: my_consumer.list_topics()
Fatal Python error: Segmentation fault

Thread 0x00007f5c78f8e700 (most recent call first):
  File "/usr/local/python/python-3.11/std/lib64/python3.11/site-packages/traitlets/traitlets.py", line 0 in get
  File "kwargs", line 0 in sig
Segmentation fault (core dumped)

Impact:

This issue results in the abrupt termination of the application and loss of any unsaved work in the IPython environment.

prashant-kumar-27-06 avatar Mar 13 '25 12:03 prashant-kumar-27-06