confluent-kafka-python
confluent-kafka-python copied to clipboard
Potential memory leaks?
Description
I am trying this package and have noticed that it leaks in some special cases. I try to produce a too large message and when I catch the exception I split it recursively and repeat producing. I have great memory consumption. On my notebook I surveyed free memory decreasing rapidly about 1.5 GB per minute.
How to reproduce
Here is a script (not a real code, just for example):
import time
import uuid
from confluent_kafka import SerializingProducer, KafkaException, KafkaError
from confluent_kafka.serialization import StringSerializer
str_serializer = StringSerializer()
def send(key, value, deep=0):
config = {
'api.version.request': True,
'bootstrap.servers': 'localhost:9092',
'compression.type': 'gzip',
'key.serializer': str_serializer,
'value.serializer': str_serializer,
'acks': 'all',
}
try:
p = SerializingProducer(config)
p.produce('sandbox_test', key=key, value=value)
except KafkaException as e:
error: KafkaError = e.args[0]
if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
mid = int(len(value) / 2)
send(key, value[:mid], deep + 1)
send(key, value[mid:], deep + 1)
else:
raise e
p.flush()
def main():
for i in range(int(10e6)):
try:
print(f'Try #{i:06d}')
send(key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
time.sleep(0.1)
except KeyboardInterrupt:
break
if __name__ == '__main__':
main()
I can avoid memory leak if I rewrite send procedure like this (the splitting replaced from the except block):
def send(key, value, deep=0):
config = {
'api.version.request': True,
'bootstrap.servers': 'localhost:9092',
'compression.type': 'gzip',
'key.serializer': str_serializer,
'value.serializer': str_serializer,
'acks': 'all',
}
need_to_split = False
try:
p = SerializingProducer(config)
p.produce('sandbox_test', key=key, value=value)
except KafkaException as e:
error: KafkaError = e.args[0]
if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
need_to_split = True
else:
raise e
if need_to_split:
mid = int(len(value) / 2)
send(key, value[:mid], deep + 1)
send(key, value[mid:], deep + 1)
p.flush()
Can you help me figure out what causes this side efffect? I assumed it is due to I create a local producer but after I made the producer global memory still runs out.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): ('1.8.2', 17302016) ('1.8.2', 17302271) - [x] Apache Kafka broker version: 3.0.0
- [x] Client configuration:
{ 'api.version.request': True, 'bootstrap.servers': 'localhost:9092', 'compression.type': 'gzip', 'key.serializer': StringSerializer(), 'value.serializer': StringSerializer(), 'acks': 'all', } - [x] Operating system: Debian GNU/Linux 11 (bullseye) (official python:3.9 docker image)
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
You're recreating the SerializingProducer every time you're sending a message. Don't do that. Create it once and reuse it. Not sure if that is the cause of your issue but it seems problematic to me.
i.e:
def main():
config = {
'api.version.request': True,
'bootstrap.servers': 'localhost:9092',
'compression.type': 'gzip',
'key.serializer': str_serializer,
'value.serializer': str_serializer,
'acks': 'all',
}
p = SerializingProducer(config)
for i in range(int(10e6)):
try:
print(f'Try #{i:06d}')
send(producer=p, key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
time.sleep(0.1)
except KeyboardInterrupt:
break
@Atheuz, thnaks for your reply, but as I already noticed above it is not the cause of the problem. If I create the producer only once it still leaks. A little less but still. Here is an edited version of the proof script:
import time
import uuid
from confluent_kafka import SerializingProducer, KafkaException, KafkaError
from confluent_kafka.serialization import StringSerializer
str_serializer = StringSerializer()
config = {
'api.version.request': True,
'bootstrap.servers': 'localhost:9092',
'compression.type': 'gzip',
'key.serializer': str_serializer,
'value.serializer': str_serializer,
'acks': 'all',
}
p = SerializingProducer(config)
def send(key, value, deep=0):
try:
p.produce('sandbox_test', key=key, value=value)
except KafkaException as e:
error: KafkaError = e.args[0]
if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
mid = int(len(value) / 2)
send(key, value[:mid], deep + 1)
send(key, value[mid:], deep + 1)
else:
raise e
p.flush()
def main():
for i in range(int(10e6)):
try:
print(f'Try #{i:06d}')
send(key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
time.sleep(0.1)
except KeyboardInterrupt:
break
if __name__ == '__main__':
main()
Try a non-recursive implementation and see if that helps.
It seems to me than recursion isn't the point. I can manage the problem if I do recursion call out of the except block (please see me first message)
i am closing this because i believe it is unlikely to be an issue with the underlying library (more likely lazy garbage collection policy somewhere, or normal memory use in librdkafka buffering) and we have limited time budget for community issues. we'd consider in more detail with further evidence.