confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Multiple calls to list_offsets() may cause memory leaks

Open whuwangjj opened this issue 11 months ago • 0 comments

Description

When the list_offsets() method of the AdminClient is invoked for multiple times, memory leakage occurs and the memory usage keeps increasing.

image

image

image

How to reproduce

                  
        self.admin_client = AdminClient({'bootstrap.servers': 'xxx:xxx',
                                         'security.protocol': 'SSL',
                                         'ssl.ca.location': 'xxx',
                                         'ssl.certificate.location': 'xxx',
                                         'ssl.key.location': 'xxx',
                                         'ssl.key.password': 'xxx',
                                         'ssl.endpoint.identification.algorithm': 'none'
                                         })

        def run(self):
            while True:
                try:
                    topic_partition_offsets = {}
                    for partition in [0, 1, 2]:
                        topic_partition = TopicPartition("testtopic", partition)
                        topic_partition_offsets[topic_partition] = OffsetSpec.latest()

                    list_offset_futmap = self.admin_client.list_offsets(
                        topic_partition_offsets, request_timeout=30)
                    for partition, fut in list_offset_futmap.items():
                        try:
                            print(f"topic: {partition.topic}, partition: {partition.partition}, offset {fut.result().offset}")
                        except Exception:
                            print(f"get offset error. occurred with {partition.topic}:[{partition.partition}]")
                            continue
                except Exception as exc:
                    print("got error.%s", exc)
                    print(traceback.format_exc())
                finally:
                    time.sleep(0.1)

This problem occurs when the list_offsets interface is invoked for multiple times. testtopic indicates the created topic. There are three partitions and a few message offsets.

image

image

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version('2.3.0') and confluent_kafka.libversion('2.3.0')):
  • [x] Apache Kafka broker version: 3.5.0
  • [ ] Client configuration: {...}
  • [x] Operating system: linux
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

whuwangjj avatar Jan 09 '25 07:01 whuwangjj