librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

C++ consumer APIs have memory leaks under certain conditions

Open zhangyafeikimi opened this issue 2 years ago • 9 comments

Description

C++ consumer APIs have memory leaks under certain conditions.

How to reproduce

OS: Linux CentOS 7.9.2009 VERSION: librdkafka 2.1.0

I verified it with examples/rdkafka_example.cpp.

The command line is like:

valgrind --leak-check=full --show-leak-kinds=all ./rdkafka_example -C -t test2 -p 0

When the kafka is not started or the topic does not exist, the result is like:

==2446== Memcheck, a memory error detector
==2446== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==2446== Using Valgrind-3.21.0 and LibVEX; rerun with -h for copyright info
==2446== Command: ./rdkafka_example -C -t test2 -p 0
==2446==
% Created consumer rdkafka#consumer-1
^C==2446==
==2446== HEAP SUMMARY:
==2446==     in use at exit: 2,988 bytes in 10 blocks
==2446==   total heap usage: 337 allocs, 327 frees, 78,889 bytes allocated
==2446==
==2446== 20 bytes in 2 blocks are indirectly lost in loss record 1 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x5B08B89: strdup (in /usr/lib64/libc-2.17.so)
==2446==    by 0x4A6138: rd_strdup (rd.h:156)
==2446==    by 0x4A6138: rd_kafka_anyconf_set_prop0 (rdkafka_conf.c:1725)
==2446==    by 0x4A6138: rd_kafka_defaultconf_set (rdkafka_conf.c:2171)
==2446==    by 0x4A63D7: rd_kafka_topic_conf_new (rdkafka_conf.c:2191)
==2446==    by 0x4A68DE: rd_kafka_topic_conf_dup (rdkafka_conf.c:2623)
==2446==    by 0x40CF06: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:92)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 24 bytes in 1 blocks are indirectly lost in loss record 2 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x456274: rd_malloc (rd.h:139)
==2446==    by 0x456274: rd_kafkap_str_new (rdkafka_proto.h:304)
==2446==    by 0x456274: rd_kafka_topic_new0 (rdkafka_topic.c:335)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 112 bytes in 1 blocks are indirectly lost in loss record 3 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4BA12A: rd_calloc (rd.h:133)
==2446==    by 0x4BA12A: rd_kafka_op_new0 (rdkafka_op.c:265)
==2446==    by 0x4ACCF8: rd_kafka_toppar_op_version_bump (rdkafka_partition.c:199)
==2446==    by 0x4B5E46: rd_kafka_toppar_fetch_stop (rdkafka_partition.c:1724)
==2446==    by 0x4B79F0: rd_kafka_toppar_op_serve (rdkafka_partition.c:2012)
==2446==    by 0x4A0DC1: rd_kafka_q_serve (rdkafka_queue.c:513)
==2446==    by 0x422C9B: rd_kafka_thread_main (rdkafka.c:2117)
==2446==    by 0x423F97: _thrd_wrapper_function (tinycthread.c:576)
==2446==    by 0x5867EA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==2446==    by 0x5B7AB0C: clone (in /usr/lib64/libc-2.17.so)
==2446==
==2446== 128 bytes in 1 blocks are indirectly lost in loss record 4 of 9
==2446==    at 0x4C3015B: realloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4418C3: rd_realloc (rd.h:145)
==2446==    by 0x4418C3: rd_list_grow (rdlist.c:48)
==2446==    by 0x44190B: rd_list_init (rdlist.c:56)
==2446==    by 0x45658F: rd_kafka_topic_new0 (rdkafka_topic.c:459)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 5 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x42065C: rd_kafka_new (rdkafka.c:2253)
==2446==    by 0x40A599: RdKafka::Consumer::create(RdKafka::Conf const*, std::string&) (ConsumerImpl.cpp:60)
==2446==    by 0x405F59: main (rdkafka_example.cpp:520)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 6 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x4AD591: rd_kafka_toppar_new0 (rdkafka_partition.c:255)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 7 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x4AD5B1: rd_kafka_toppar_new0 (rdkafka_partition.c:256)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 1,176 bytes in 1 blocks are indirectly lost in loss record 8 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4AD39B: rd_calloc (rd.h:133)
==2446==    by 0x4AD39B: rd_kafka_toppar_new0 (rdkafka_partition.c:219)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 2,988 (1,000 direct, 1,988 indirect) bytes in 1 blocks are definitely lost in loss record 9 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x456244: rd_calloc (rd.h:133)
==2446==    by 0x456244: rd_kafka_topic_new0 (rdkafka_topic.c:331)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== LEAK SUMMARY:
==2446==    definitely lost: 1,000 bytes in 1 blocks
==2446==    indirectly lost: 1,988 bytes in 9 blocks
==2446==      possibly lost: 0 bytes in 0 blocks
==2446==    still reachable: 0 bytes in 0 blocks
==2446==         suppressed: 0 bytes in 0 blocks
==2446==
==2446== For lists of detected and suppressed errors, rerun with: -s
==2446== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

zhangyafeikimi avatar Oct 27 '23 12:10 zhangyafeikimi

Tried 2.3.0, have the same leaks.

zhangyafeikimi avatar Nov 01 '23 07:11 zhangyafeikimi

We have the same issue using the C API directly. I did a bisect and the problem was introduced with 8e20e1ee79b188ae610aac3a2d2517f7f12dd890. If I revert that commit the leaks go away.

dholm avatar Nov 22 '23 10:11 dholm

Thanks for the report. In provided stack trace it's just the example that is leaking memory, because the topic isn't destroyed.

emasab avatar Nov 24 '23 08:11 emasab

Not just for examples. These leaks are fatal for long running servers when the topic is unavailable.

zhangyafeikimi avatar Nov 24 '23 10:11 zhangyafeikimi

I got the same problem. As @dholm said , if I revert that commit https://github.com/confluentinc/librdkafka/commit/8e20e1ee79b188ae610aac3a2d2517f7f12dd890, the leaks go away.

qingzhongli avatar Jan 14 '24 06:01 qingzhongli

My employer has also tentatively bisected an issue to 8e20e1ee79b188ae610aac3a2d2517f7f12dd890. We see that if we have a groupconsumer, and the broker goes down (that is, we shut down the Kafka broker), and then comes back up, and then we try to shut down the groupconsumer, it simply hangs forever. This doesn't reproduce in 1.9.3rc2 or earlier, but it does reproduce in 2.0.0rc1 through 2.3.0. In fact, it appears not to reproduce in a83cadf5eab3a43f7f0d3dd09f5b1a3e9e88312f but to start reproducing with 8e20e1ee79b188ae610aac3a2d2517f7f12dd890.

I see a lot of separate reporters pointing at #4117 (and the followup #4208), and things like "TODO" appearing in the changelog. Is it possible that those commits weren't reviewed as thoroughly as they should have been?

Quuxplusone avatar Mar 28 '24 21:03 Quuxplusone

Found and fixed the memory leak that happens in these examples rdkafka_example and rdkafka_example_cpp here. Happens since 2.0.2 when using rd_kafka_consume_stop on a partition unknown to the client, for example if the client hasn't received metadata yet or the partition number is higher than the maximum.

In these examples it doesn't hangs in the destroy call, but only causes a memory leak. @Quuxplusone in case it causes a hang, please try this fix and in case it's still happening tell me how to reproduce it.

emasab avatar Apr 03 '24 07:04 emasab

This seems related https://github.com/confluentinc/librdkafka/issues/4362

emasab avatar Apr 03 '24 07:04 emasab

@Quuxplusone in case it causes a hang, please try this fix and in case it's still happening tell me how to reproduce it.

Replied on https://github.com/confluentinc/librdkafka/pull/4669#issuecomment-2037799703

Quuxplusone avatar Apr 04 '24 17:04 Quuxplusone