confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Potential memory leaks?

Open Dmitry-k42 opened this issue 3 years ago • 4 comments

Description

I am trying this package and have noticed that it leaks in some special cases. I try to produce a too large message and when I catch the exception I split it recursively and repeat producing. I have great memory consumption. On my notebook I surveyed free memory decreasing rapidly about 1.5 GB per minute.

How to reproduce

Here is a script (not a real code, just for example):

import time
import uuid

from confluent_kafka import SerializingProducer, KafkaException, KafkaError
from confluent_kafka.serialization import StringSerializer

str_serializer = StringSerializer()


def send(key, value, deep=0):
    config = {
        'api.version.request': True,
        'bootstrap.servers': 'localhost:9092',
        'compression.type': 'gzip',
        'key.serializer': str_serializer,
        'value.serializer': str_serializer,
        'acks': 'all',
    }
    try:
        p = SerializingProducer(config)
        p.produce('sandbox_test', key=key, value=value)
    except KafkaException as e:
        error: KafkaError = e.args[0]
        if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
            mid = int(len(value) / 2)
            send(key, value[:mid], deep + 1)
            send(key, value[mid:], deep + 1)
        else:
            raise e
    p.flush()


def main():
    for i in range(int(10e6)):
        try:
            print(f'Try #{i:06d}')
            send(key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
            time.sleep(0.1)
        except KeyboardInterrupt:
            break


if __name__ == '__main__':
    main()

I can avoid memory leak if I rewrite send procedure like this (the splitting replaced from the except block):

def send(key, value, deep=0):
    config = {
        'api.version.request': True,
        'bootstrap.servers': 'localhost:9092',
        'compression.type': 'gzip',
        'key.serializer': str_serializer,
        'value.serializer': str_serializer,
        'acks': 'all',
    }
    need_to_split = False
    try:
        p = SerializingProducer(config)
        p.produce('sandbox_test', key=key, value=value)
    except KafkaException as e:
        error: KafkaError = e.args[0]
        if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
            need_to_split = True
        else:
            raise e
    if need_to_split:
        mid = int(len(value) / 2)
        send(key, value[:mid], deep + 1)
        send(key, value[mid:], deep + 1)
    p.flush()

Can you help me figure out what causes this side efffect? I assumed it is due to I create a local producer but after I made the producer global memory still runs out.

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('1.8.2', 17302016) ('1.8.2', 17302271)
  • [x] Apache Kafka broker version: 3.0.0
  • [x] Client configuration: { 'api.version.request': True, 'bootstrap.servers': 'localhost:9092', 'compression.type': 'gzip', 'key.serializer': StringSerializer(), 'value.serializer': StringSerializer(), 'acks': 'all', }
  • [x] Operating system: Debian GNU/Linux 11 (bullseye) (official python:3.9 docker image)
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

Dmitry-k42 avatar Jan 15 '22 16:01 Dmitry-k42

You're recreating the SerializingProducer every time you're sending a message. Don't do that. Create it once and reuse it. Not sure if that is the cause of your issue but it seems problematic to me.

i.e:

def main():
    config = {
        'api.version.request': True,
        'bootstrap.servers': 'localhost:9092',
        'compression.type': 'gzip',
        'key.serializer': str_serializer,
        'value.serializer': str_serializer,
        'acks': 'all',
    }
    p = SerializingProducer(config)
    for i in range(int(10e6)):
        try:
            print(f'Try #{i:06d}')
            send(producer=p, key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
            time.sleep(0.1)
        except KeyboardInterrupt:
            break

Atheuz avatar Jan 16 '22 01:01 Atheuz

@Atheuz, thnaks for your reply, but as I already noticed above it is not the cause of the problem. If I create the producer only once it still leaks. A little less but still. Here is an edited version of the proof script:

import time
import uuid

from confluent_kafka import SerializingProducer, KafkaException, KafkaError
from confluent_kafka.serialization import StringSerializer

str_serializer = StringSerializer()
config = {
    'api.version.request': True,
    'bootstrap.servers': 'localhost:9092',
    'compression.type': 'gzip',
    'key.serializer': str_serializer,
    'value.serializer': str_serializer,
    'acks': 'all',
}
p = SerializingProducer(config)


def send(key, value, deep=0):
    try:
        p.produce('sandbox_test', key=key, value=value)
    except KafkaException as e:
        error: KafkaError = e.args[0]
        if error.code() == KafkaError.MSG_SIZE_TOO_LARGE and len(value) > 1:
            mid = int(len(value) / 2)
            send(key, value[:mid], deep + 1)
            send(key, value[mid:], deep + 1)
        else:
            raise e
    p.flush()


def main():
    for i in range(int(10e6)):
        try:
            print(f'Try #{i:06d}')
            send(key=uuid.uuid4().hex, value=uuid.uuid4().hex * 100000)
            time.sleep(0.1)
        except KeyboardInterrupt:
            break


if __name__ == '__main__':
    main()

Dmitry-k42 avatar Jan 17 '22 11:01 Dmitry-k42

Try a non-recursive implementation and see if that helps.

edenhill avatar Jan 17 '22 11:01 edenhill

It seems to me than recursion isn't the point. I can manage the problem if I do recursion call out of the except block (please see me first message)

Dmitry-k42 avatar Jan 17 '22 12:01 Dmitry-k42

i am closing this because i believe it is unlikely to be an issue with the underlying library (more likely lazy garbage collection policy somewhere, or normal memory use in librdkafka buffering) and we have limited time budget for community issues. we'd consider in more detail with further evidence.

mhowlett avatar Oct 25 '22 16:10 mhowlett