libzmq
libzmq copied to clipboard
Memory leak connecting a gazillion times to a pubsub
I am currently using a small suite of software that uses zeromq to distribute realtime data.
https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c
After some data downtime, we noticed that the pubsub's sucked up memory. Our clients typically reconnect every 60s if no data was received to overcome other network issues. I created a small test tool to figure out if there might be an issue with ZeroMQ.
https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-test.c
This shows up in ZeroMQ thus it makes me wonder: when should some destroys fly in?
==31474== 1,180,296 bytes in 1,521 blocks are possibly lost in loss record 45 of 47
==31474== at 0x4C2A790: operator new(unsigned long, std::nothrow_t const&) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==31474== by 0x4E60B98: zmq::session_base_t::create(zmq::io_thread_t*, bool, zmq::socket_base_t*, zmq::options_t const&, zmq::address_t const*) (in /usr/lib64/libzmq.so.3.1.0)
==31474== by 0x4E7042E: zmq::tcp_listener_t::in_event() (in /usr/lib64/libzmq.so.3.1.0)
==31474== by 0x4E4EEED: zmq::epoll_t::loop() (in /usr/lib64/libzmq.so.3.1.0)
==31474== by 0x4E70B89: thread_routine (in /usr/lib64/libzmq.so.3.1.0)
==31474== by 0x588D313: start_thread (in /lib64/libpthread-2.20.so)
==31474== by 0x517843C: clone (in /lib64/libc-2.20.so)
Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.
On Tue, Nov 11, 2014 at 3:30 AM, Stefan de Konink [email protected] wrote:
I am currently using a small suite of software that uses zeromq to distribute realtime data.
https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c
After some data downtime, we noticed that the pubsub's sucked up memory. Our clients typically reconnect every 60s if no data was received to overcome other network issues. I created a small test tool to figure out if there might me an issue with ZeroMQ.
https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-test.c
This shows up in ZeroMQ thus it makes me wonder: when should some destroys fly in?
==31474== 1,180,296 bytes in 1,521 blocks are possibly lost in loss record 45 of 47 ==31474== at 0x4C2A790: operator new(unsigned long, std::nothrow_t const&) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so) ==31474== by 0x4E60B98: zmq::session_base_t::create(zmq::io_thread_t_, bool, zmq::socket_base_t_, zmq::options_t const&, zmq::address_t const*) (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E7042E: zmq::tcp_listener_t::in_event() (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E4EEED: zmq::epoll_t::loop() (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E70B89: thread_routine (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x588D313: start_thread (in /lib64/libpthread-2.20.so) ==31474== by 0x517843C: clone (in /lib64/libc-2.20.so)
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256.
Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.
Sadly not the life of my users, getting the right dependencies on CZMQ distributions is hell.
It is? Surely it's just one additional library.. On Nov 11, 2014 2:07 PM, "Stefan de Konink" [email protected] wrote:
Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.
Sadly not the life of my users, getting the right dependencies on CZMQ distributions is hell.
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62544079.
Yes it is. Try to find it on some "stable" binary distribution such as Debian or Red Hat. Anyway not quite the bikeshed I want to get into here. There is a memoryleak inside libzmq, unrelated to my programming skills.
Ah, it usually works better with github, indeed.
Sorry for bikeshedding. The code without CZMQ is just harder to understand... actually the test case is opaque and I'm not sure what it's supposed to be showing. You're looping on closing/opening sockets, and this will always create lots of TCP timewait sockets which will take a while to leave the system. Where is the memory leak exactly?
On Tue, Nov 11, 2014 at 5:48 PM, Stefan de Konink [email protected] wrote:
Yes it is. Try to find it on some "stable" binary distribution such as Debian or Red Hat. Anyway not quite the bikeshed I want to get into here. There is a memoryleak inside libzmq, unrelated to my programming skills.
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62576462.
Where is the memory leak exactly?
Please Pieter, the bug is not difficult to understand. The pubsub is leaking memory for each connect.
As far as I can understand it you're creating and destroying sub sockets. Here is a minimal CZMQ program that does this:
#include <czmq.h>
int main (void)
{
zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
int busy = 100;
while (busy--) {
zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
zclock_sleep (10);
zsock_destroy (&sub);
}
zsock_destroy (&pub);
return 0;
}
When I run this under valgrind, it reports:
==30032==
==30032== HEAP SUMMARY:
==30032== in use at exit: 0 bytes in 0 blocks
==30032== total heap usage: 443 allocs, 443 frees, 140,997 bytes allocated
==30032==
==30032== All heap blocks were freed -- no leaks are possible
==30032==
==30032== For counts of detected and suppressed errors, rerun with: -v
==30032== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 2 from 2)
So I'm left asking what it is you're doing different. Your code is not self-evident, or else I'm stupid. Anyhow my example works, it is certainly leaking data for each connect, afaict.
The above example shows that even with a source distribution such as gentoo which installs czmq-2.2.0 the example doesn't compile. So while I know for a fact that czmq makes the life of the programmer easy - I have used it - in practise it is too unstable to share code.
test.c: In function ‘main’:
test.c:5:5: error: unknown type name ‘zsock_t’
zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
^
test.c:5:20: warning: initialization makes pointer from integer without a cast
zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
^
test.c:8:9: error: unknown type name ‘zsock_t’
zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
^
test.c:8:24: warning: initialization makes pointer from integer without a cast
zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
Indeed, the github czmq version works. And in your example I am unable to find a memory leak. I am currently trying to spot the difference.
Adding this will get you the first two memory leaks:
#include <czmq.h>
int main (void)
{
zsock_t *pub = zsock_new_pub ("tcp://127.0.0.1:5678");
zsock_set_sndhwm (pub, 8192);
zsock_set_rcvhwm (pub, 8192);
int busy = 100;
while (busy--) {
zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:5678", "");
zclock_sleep (10);
zsock_destroy (&sub);
}
zsock_destroy (&pub);
return 0;
}
I won't doubt that czmq is doing a good job in memory management. But do observe the following examples:
#include <czmq.h>
int main (void) {
zsock_t *pubsub = zsock_new_pub ("tcp://127.0.0.1:1234");
int busy = 100;
while (busy--) {
sleep(1);
}
zsock_destroy (&pubsub);
return 0;
}
#include <czmq.h>
int main (void)
{
int busy = 100;
while (busy--) {
zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:1234", "");
zclock_sleep (10);
zsock_destroy (&sub);
}
return 0;
}
Using the czmq testers "there is no memory leak" after the application close. But look at what the application memory use is after the second tester finished. The memory is increasing and is only freed because of zsock_destroy (&pubsub).
Sorry, there was a bug in my example, should have been tcp:// instead of tcp. It asserts in zsock_set_sndhwm() otherwise (pub is NULL), no memory leaks though.
So you're saying that the pub socket allocates memory for each sub connection, and this memory is not freed until the pub socket is destroyed. It's possible the pub socket doesn't see the TCP connection closing, yes.
On Wed, Nov 12, 2014 at 12:14 AM, Stefan de Konink <[email protected]
wrote:
Adding this will get you the first two memory leaks:
#include <czmq.h> int main (void) { zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678"); zsock_set_sndhwm (pub, 8192); zsock_set_rcvhwm (pub, 8192);
int busy = 100; while (busy--) { zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", ""); zclock_sleep (10); zsock_destroy (&sub); } zsock_destroy (&pub); return 0;}
I won't doubt that czmq is doing a good job in memory management. But do observe the following examples:
#include <czmq.h> int main (void) { zsock_t *pubsub = zsock_new_pub ("tcp://127.0.0.1:1234");
int busy = 100; while (busy--) { sleep(1); } zsock_destroy (&pubsub); return 0;}
#include <czmq.h> int main (void) { int busy = 100; while (busy--) { zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:1234", ""); zclock_sleep (10); zsock_destroy (&sub); } return 0; }
Using the czmq testers "there is no memory leak" after the application close. But look at what the application memory use is after the second tester finished. The memory is increasing and is only freed because of zsock_destroy (&pubsub).
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62639664.
I noticed the problem with tcp:// while splitting up. Even in your example you can observe that the memory usage is increasing. So even in your own example, the question is still there: why isn't the memory being released?
skinkie 21426 5.0 0.9 239120 77304 pts/1 Sl+ 00:35 0:03 ./test
I've no idea how this works internally in libzmq. All I'm concerned with here is reproducing the test to a point where others know what is going on. You may want to continue this discussion on zeromq-dev.
On Wed, Nov 12, 2014 at 12:37 AM, Stefan de Konink <[email protected]
wrote:
I noticed the problem with tcp:// while splitting up. Even in your example you can see observe that the memory is increasing. So even in your own example, the question is still there: why isn't the memory being released?
skinkie 21426 5.0 0.9 239120 77304 pts/1 Sl+ 00:35 0:03 ./test
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62642172.
I guess it is quite related to the topic "PUSH does not seem to release PULL connection message memory".
Sounds like the same thing, yes.
On Wed, Nov 12, 2014 at 12:56 AM, Stefan de Konink <[email protected]
wrote:
I guess it is quite related to the topic "PUSH does not seem to release PULL connection message memory".
— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62644210.
Digging into the push/pull leak I found that the pipe's are never destroyed, they are just marked as inactive and moved to the back of the list. They are still tracked and destroyed with the context, but every time a socket is reconnected there is a new set of pipes.
session_base_t::process_term() is called and it calls pipe_t::terminate() The pipe_t term command is sent and received, the delimiter is sent but is never received so it has one sub pipe waiting on the delimiter and the other waiting on the term ack. It then stays in this state until the context is destroyed.
@greroger thanks a lot, just working on this one and your comment helps a lot
I was about to submit an issue, i have the exact same leak with PUB/SUB sockets with multiple co/deco
==6187== 6,088 bytes in 1 blocks are definitely lost in loss record 1 of 1
==6187== at 0x4C29F90: malloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==6187== by 0x4E518C0: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x4E6F74B: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x4E6FB97: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x4E66419: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x4E4DBED: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x4E6AABF: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187== by 0x5EA9313: start_thread (in /usr/lib/libpthread-2.20.so)
==6187== by 0x59DF5BC: clone (in /usr/lib/libc-2.20.so)
@hurtonm Do you have any update on this ticket?
@skinkie I feel very strange about this mem leak issue but I have never been able to reproduce it.
I am actively reproducing it and it causes much downtime.
@skinkie That't she mystery. I tried three different examples, and still no leak detected. One test was running inside docker image someone prepared for this. Could you do prepare simple C client and server that I can compile and run so that the leak shows up? I really want to understand this problem and fix it.
https://github.com/zeromq/libzmq/issues/1256#issuecomment-62639664 can't make in more simple than that.
@skinkie thanks, I managed to reproduce the leak reported. The problem is that in that example the published does not make any calls to library so that the library gets no chance to do internal cleaning. Calling
zsocket_events (zsock_resolve (pubsub));
periodically solves that leak for that example. Not sure about other reported leaks.
@hurtonm could you give a hint how to do something similar in https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c
@skinkie Is there a traffic on the pubsub socket when you experience the memory growth?
The problem is actually that the clients use this tool, which at line 89 reconnect on "no data". To overcome network other issues. Hence the publisher at the top keeps growing when then is no data to distribute because of the reconnects.
https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-pubsub.c#L89
@skinkie Just call zmq_getsockopt(pubsub, ZMQ_EVENTS, &events, ) before the goto statement. This shoulld invoke internal bookkeeping, which cleans up retired data structures. Please let me know it that helps.
Your suggestion implemented as below does not fix it.
} else {
int fd = 0;
size_t fd_size = sizeof(fd);
zmq_close (items[0].socket);
sleep (1);
zmq_getsockopt (pubsub, ZMQ_EVENTS, &fd, &fd_size);
goto init;
}
@skinkie Could you please simplify that program a bit so that I could easily reproduce the problem?
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
int main (int argc, char *argv[]) {
void *context = zmq_init (1);
zmq_pollitem_t items[1];
init:
items[0].socket = zmq_socket (context, ZMQ_SUB);
items[0].events = ZMQ_POLLIN;
zmq_connect (items[0].socket, "tcp://127.0.0.1:7827");
int rc;
size_t more_size = sizeof(int);
/* Ensure that every 60s there is data */
while ((rc = zmq_poll (items, 1, 10)) >= 0) {
if (rc > 0) {
int more;
do {
/* Create an empty 0MQ message to hold the message part */
zmq_msg_t part;
rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from the socket */
rc = zmq_msg_recv (&part, items[0].socket, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
zmq_msg_close (&part);
} while (more);
} else {
zmq_close (items[0].socket);
goto init;
}
}
zmq_close (items[0].socket);
zmq_ctx_destroy (context);
return rc;
}
#include <pwd.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <unistd.h>
int main (int argc, char *argv[]) {
void *context = zmq_init (1);
void *pubsub = zmq_socket (context, ZMQ_XPUB);
/* Apply a high water mark at the PubSub */
uint64_t hwm = 8192;
zmq_setsockopt (pubsub, ZMQ_SNDHWM, &hwm, sizeof(hwm));
zmq_setsockopt (pubsub, ZMQ_RCVHWM, &hwm, sizeof(hwm));
zmq_bind (pubsub, "tcp://127.0.0.1:7827");
zmq_pollitem_t items[1];
init:
items[0].socket = zmq_socket (context, ZMQ_SUB);
items[0].events = ZMQ_POLLIN;
zmq_connect (items[0].socket, "tcp://127.0.0.1:7817");
int rc;
size_t more_size = sizeof(int);
/* Ensure that every 60s there is data */
while ((rc = zmq_poll (items, 1, 60 * 1000L)) >= 0) {
if (rc > 0) {
int more;
do {
/* Create an empty 0MQ message to hold the message part */
zmq_msg_t part;
rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from the socket */
rc = zmq_msg_recv (&part, items[0].socket, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
/* Send the message, when more is set, apply the flag, otherwise don't */
zmq_msg_send (&part, pubsub, (more ? ZMQ_SNDMORE : 0));
zmq_msg_close (&part);
} while (more);
} else {
int fd = 0;
size_t fd_size = sizeof(fd);
zmq_close (items[0].socket);
sleep (1);
zmq_getsockopt (pubsub, ZMQ_EVENTS, &fd, &fd_size);
goto init;
}
}
zmq_close (items[0].socket);
zmq_close (pubsub);
zmq_ctx_destroy (context);
return 0;
}