confluent-kafka-python
confluent-kafka-python copied to clipboard
Multiple calls to list_offsets() may cause memory leaks
Description
When the list_offsets() method of the AdminClient is invoked for multiple times, memory leakage occurs and the memory usage keeps increasing.
How to reproduce
self.admin_client = AdminClient({'bootstrap.servers': 'xxx:xxx',
'security.protocol': 'SSL',
'ssl.ca.location': 'xxx',
'ssl.certificate.location': 'xxx',
'ssl.key.location': 'xxx',
'ssl.key.password': 'xxx',
'ssl.endpoint.identification.algorithm': 'none'
})
def run(self):
while True:
try:
topic_partition_offsets = {}
for partition in [0, 1, 2]:
topic_partition = TopicPartition("testtopic", partition)
topic_partition_offsets[topic_partition] = OffsetSpec.latest()
list_offset_futmap = self.admin_client.list_offsets(
topic_partition_offsets, request_timeout=30)
for partition, fut in list_offset_futmap.items():
try:
print(f"topic: {partition.topic}, partition: {partition.partition}, offset {fut.result().offset}")
except Exception:
print(f"get offset error. occurred with {partition.topic}:[{partition.partition}]")
continue
except Exception as exc:
print("got error.%s", exc)
print(traceback.format_exc())
finally:
time.sleep(0.1)
This problem occurs when the list_offsets interface is invoked for multiple times. testtopic indicates the created topic. There are three partitions and a few message offsets.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version('2.3.0')andconfluent_kafka.libversion('2.3.0')): - [x] Apache Kafka broker version: 3.5.0
- [ ] Client configuration:
{...} - [x] Operating system: linux
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue