kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Memory leak using Kafka-python with PyPy

Open joein opened this issue 5 years ago • 1 comments

I probably observe a memory leak using kafka-python (2.0.1) package with PyPy (Python 3.6.9 (1608da62bfc7, Dec 23 2019, 10:50:04) [PyPy 7.3.0 with GCC 7.3.1 20180303 (Red Hat 7.3.1-5)] on linux.) I run this script on CPython 3.6 (Python 3.6.4 (default, Mar 19 2019, 21:01:45)[GCC 4.9.2] on linux) and there was not any memory problem, it stopped at 20.1 MiB and it seemed stopped on this. I attach a script that consumes data from empty topic, and it's memory consumption is increasing for a long time (I've been testing it for at least 3 hours and all this time it was eating more and more memory). To be accurate, it starts nearly with 100 MiB and in 1-2 hours increases to ~300-400 MiB. I tried to profile concrete internal methods in KafkaConsumer, such as KafkaConsumer.poll, I saw memory consumption at KafkaConsumer._poll_once, and then in KafkaConsumer._client.poll and KafkaConsumer._fetcher.fetched_records, but I can't find anything useful after it. Also I tried to test it without memory_profiler module, just observing htop - the result remained the same. I attached three screenshots: the first is script startup, the second after one minute of work and the remaining is after 12 minutes. As you can see, some poll calls increase memory for 0.1-0.6 MiB (0.3 MiB is the most frequent value, it can be several MiB at startup, but I think it is ok).

import time

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from memory_profiler import profile


_bootstrap_servers = ["172.172.172.2:9092"]
_group_id = "memoryLeakGroup"
_auto_offset_reset = "earliest"
_enable_auto_commit = True
_timeout_ms_max = 5000
_batch_max_size = 10000
_assignment = [TopicPartition(topic="29_06_2020", partition=0)]
_origin_consumer = KafkaConsumer(
    bootstrap_servers=_bootstrap_servers,
    group_id=_group_id,
    auto_offset_reset=_auto_offset_reset,
    enable_auto_commit=_enable_auto_commit,
)
_origin_consumer.assign(_assignment)


@profile
def polling():
    data = _origin_consumer.poll(
        timeout_ms=_timeout_ms_max, max_records=_batch_max_size
    )

    if not data:
        print(f"There is no more data {_assignment}")
    else:
        print(f"There is some data {data}")


if __name__ == "__main__":
    while True:
        try:
            polling()
            time.sleep(0.5)
        except Exception:
            print('wow exception')

2020_06_29_14 58 26 2020_06_29_14 58 53 2020_06_29_15 10 51

I run my scripts in docker, PyPy and Kafka images can be built with the files in attachments (remove .txt extensions from boot.sh and Dockerfile) boot.sh.txt kafka_Dockerfile.txt pypy_requirements.txt pypy_Dockerfile.txt

I've also created an issue on PyPy repo

joein avatar Jun 29 '20 15:06 joein

I have similar behavior. Container dies at the peak of the memory consumption.

Знімок екрана 2021-10-13 о 17 53 41

frutik avatar Oct 13 '21 15:10 frutik