libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

Memory leak connecting a gazillion times to a pubsub

Open skinkie opened this issue 11 years ago • 60 comments
trafficstars

I am currently using a small suite of software that uses zeromq to distribute realtime data.

https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c

After some data downtime, we noticed that the pubsub's sucked up memory. Our clients typically reconnect every 60s if no data was received to overcome other network issues. I created a small test tool to figure out if there might be an issue with ZeroMQ.

https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-test.c

This shows up in ZeroMQ thus it makes me wonder: when should some destroys fly in?

==31474== 1,180,296 bytes in 1,521 blocks are possibly lost in loss record 45 of 47
==31474==    at 0x4C2A790: operator new(unsigned long, std::nothrow_t const&) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==31474==    by 0x4E60B98: zmq::session_base_t::create(zmq::io_thread_t*, bool, zmq::socket_base_t*, zmq::options_t const&, zmq::address_t const*) (in /usr/lib64/libzmq.so.3.1.0)
==31474==    by 0x4E7042E: zmq::tcp_listener_t::in_event() (in /usr/lib64/libzmq.so.3.1.0)
==31474==    by 0x4E4EEED: zmq::epoll_t::loop() (in /usr/lib64/libzmq.so.3.1.0)
==31474==    by 0x4E70B89: thread_routine (in /usr/lib64/libzmq.so.3.1.0)
==31474==    by 0x588D313: start_thread (in /lib64/libpthread-2.20.so)
==31474==    by 0x517843C: clone (in /lib64/libc-2.20.so)

skinkie avatar Nov 11 '14 02:11 skinkie

Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.

On Tue, Nov 11, 2014 at 3:30 AM, Stefan de Konink [email protected] wrote:

I am currently using a small suite of software that uses zeromq to distribute realtime data.

https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c

After some data downtime, we noticed that the pubsub's sucked up memory. Our clients typically reconnect every 60s if no data was received to overcome other network issues. I created a small test tool to figure out if there might me an issue with ZeroMQ.

https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-test.c

This shows up in ZeroMQ thus it makes me wonder: when should some destroys fly in?

==31474== 1,180,296 bytes in 1,521 blocks are possibly lost in loss record 45 of 47 ==31474== at 0x4C2A790: operator new(unsigned long, std::nothrow_t const&) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so) ==31474== by 0x4E60B98: zmq::session_base_t::create(zmq::io_thread_t_, bool, zmq::socket_base_t_, zmq::options_t const&, zmq::address_t const*) (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E7042E: zmq::tcp_listener_t::in_event() (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E4EEED: zmq::epoll_t::loop() (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x4E70B89: thread_routine (in /usr/lib64/libzmq.so.3.1.0) ==31474== by 0x588D313: start_thread (in /lib64/libpthread-2.20.so) ==31474== by 0x517843C: clone (in /lib64/libc-2.20.so)

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256.

hintjens avatar Nov 11 '14 09:11 hintjens

Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.

Sadly not the life of my users, getting the right dependencies on CZMQ distributions is hell.

skinkie avatar Nov 11 '14 13:11 skinkie

It is? Surely it's just one additional library.. On Nov 11, 2014 2:07 PM, "Stefan de Konink" [email protected] wrote:

Quite separately, if you're working in C I'd strongly recommend using CZMQ, it will make your life much easier.

Sadly not the life of my users, getting the right dependencies on CZMQ distributions is hell.

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62544079.

hintjens avatar Nov 11 '14 14:11 hintjens

Yes it is. Try to find it on some "stable" binary distribution such as Debian or Red Hat. Anyway not quite the bikeshed I want to get into here. There is a memoryleak inside libzmq, unrelated to my programming skills.

skinkie avatar Nov 11 '14 16:11 skinkie

Ah, it usually works better with github, indeed.

Sorry for bikeshedding. The code without CZMQ is just harder to understand... actually the test case is opaque and I'm not sure what it's supposed to be showing. You're looping on closing/opening sockets, and this will always create lots of TCP timewait sockets which will take a while to leave the system. Where is the memory leak exactly?

On Tue, Nov 11, 2014 at 5:48 PM, Stefan de Konink [email protected] wrote:

Yes it is. Try to find it on some "stable" binary distribution such as Debian or Red Hat. Anyway not quite the bikeshed I want to get into here. There is a memoryleak inside libzmq, unrelated to my programming skills.

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62576462.

hintjens avatar Nov 11 '14 18:11 hintjens

Where is the memory leak exactly?

Please Pieter, the bug is not difficult to understand. The pubsub is leaking memory for each connect.

skinkie avatar Nov 11 '14 18:11 skinkie

As far as I can understand it you're creating and destroying sub sockets. Here is a minimal CZMQ program that does this:

#include <czmq.h>
int main (void)
{
    zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
    int busy = 100;
    while (busy--) {
        zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
        zclock_sleep (10);
        zsock_destroy (&sub);
    }
    zsock_destroy (&pub);
    return 0;
}

When I run this under valgrind, it reports:

==30032==
==30032== HEAP SUMMARY:
==30032==     in use at exit: 0 bytes in 0 blocks
==30032==   total heap usage: 443 allocs, 443 frees, 140,997 bytes allocated
==30032==
==30032== All heap blocks were freed -- no leaks are possible
==30032==
==30032== For counts of detected and suppressed errors, rerun with: -v
==30032== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 2 from 2)

So I'm left asking what it is you're doing different. Your code is not self-evident, or else I'm stupid. Anyhow my example works, it is certainly leaking data for each connect, afaict.

hintjens avatar Nov 11 '14 19:11 hintjens

The above example shows that even with a source distribution such as gentoo which installs czmq-2.2.0 the example doesn't compile. So while I know for a fact that czmq makes the life of the programmer easy - I have used it - in practise it is too unstable to share code.

test.c: In function ‘main’:
test.c:5:5: error: unknown type name ‘zsock_t’
     zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
     ^
test.c:5:20: warning: initialization makes pointer from integer without a cast
     zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678");
                    ^
test.c:8:9: error: unknown type name ‘zsock_t’
         zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
         ^
test.c:8:24: warning: initialization makes pointer from integer without a cast
         zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");

Indeed, the github czmq version works. And in your example I am unable to find a memory leak. I am currently trying to spot the difference.

skinkie avatar Nov 11 '14 22:11 skinkie

Adding this will get you the first two memory leaks:

#include <czmq.h>

int main (void)
{
    zsock_t *pub = zsock_new_pub ("tcp://127.0.0.1:5678");
    zsock_set_sndhwm (pub, 8192);
    zsock_set_rcvhwm (pub, 8192);

    int busy = 100;
    while (busy--) {
        zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:5678", "");
        zclock_sleep (10);
        zsock_destroy (&sub);
    }
    zsock_destroy (&pub);
    return 0;
}

I won't doubt that czmq is doing a good job in memory management. But do observe the following examples:

#include <czmq.h>

int main (void) {
    zsock_t *pubsub   = zsock_new_pub  ("tcp://127.0.0.1:1234");

    int busy = 100;
    while (busy--) {
        sleep(1);
    }

    zsock_destroy (&pubsub);
    return 0;
}
#include <czmq.h>

int main (void)
{
    int busy = 100;
    while (busy--) {
        zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:1234", "");
        zclock_sleep (10);
        zsock_destroy (&sub);
    }
    return 0;
}

Using the czmq testers "there is no memory leak" after the application close. But look at what the application memory use is after the second tester finished. The memory is increasing and is only freed because of zsock_destroy (&pubsub).

skinkie avatar Nov 11 '14 23:11 skinkie

Sorry, there was a bug in my example, should have been tcp:// instead of tcp. It asserts in zsock_set_sndhwm() otherwise (pub is NULL), no memory leaks though.

So you're saying that the pub socket allocates memory for each sub connection, and this memory is not freed until the pub socket is destroyed. It's possible the pub socket doesn't see the TCP connection closing, yes.

On Wed, Nov 12, 2014 at 12:14 AM, Stefan de Konink <[email protected]

wrote:

Adding this will get you the first two memory leaks:

#include <czmq.h> int main (void) { zsock_t *pub = zsock_new_pub ("tcp:127.0.0.1:5678"); zsock_set_sndhwm (pub, 8192); zsock_set_rcvhwm (pub, 8192);

int busy = 100;
while (busy--) {
    zsock_t *sub = zsock_new_sub ("tcp:127.0.0.1:5678", "");
    zclock_sleep (10);
    zsock_destroy (&sub);
}
zsock_destroy (&pub);
return 0;

}

I won't doubt that czmq is doing a good job in memory management. But do observe the following examples:

#include <czmq.h> int main (void) { zsock_t *pubsub = zsock_new_pub ("tcp://127.0.0.1:1234");

int busy = 100;
while (busy--) {
    sleep(1);
}

zsock_destroy (&pubsub);
return 0;

}

#include <czmq.h> int main (void) { int busy = 100; while (busy--) { zsock_t *sub = zsock_new_sub ("tcp://127.0.0.1:1234", ""); zclock_sleep (10); zsock_destroy (&sub); } return 0; }

Using the czmq testers "there is no memory leak" after the application close. But look at what the application memory use is after the second tester finished. The memory is increasing and is only freed because of zsock_destroy (&pubsub).

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62639664.

hintjens avatar Nov 11 '14 23:11 hintjens

I noticed the problem with tcp:// while splitting up. Even in your example you can observe that the memory usage is increasing. So even in your own example, the question is still there: why isn't the memory being released?

skinkie  21426  5.0  0.9 239120 77304 pts/1    Sl+  00:35   0:03 ./test

skinkie avatar Nov 11 '14 23:11 skinkie

I've no idea how this works internally in libzmq. All I'm concerned with here is reproducing the test to a point where others know what is going on. You may want to continue this discussion on zeromq-dev.

On Wed, Nov 12, 2014 at 12:37 AM, Stefan de Konink <[email protected]

wrote:

I noticed the problem with tcp:// while splitting up. Even in your example you can see observe that the memory is increasing. So even in your own example, the question is still there: why isn't the memory being released?

skinkie 21426 5.0 0.9 239120 77304 pts/1 Sl+ 00:35 0:03 ./test

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62642172.

hintjens avatar Nov 11 '14 23:11 hintjens

I guess it is quite related to the topic "PUSH does not seem to release PULL connection message memory".

skinkie avatar Nov 11 '14 23:11 skinkie

Sounds like the same thing, yes.

On Wed, Nov 12, 2014 at 12:56 AM, Stefan de Konink <[email protected]

wrote:

I guess it is quite related to the topic "PUSH does not seem to release PULL connection message memory".

— Reply to this email directly or view it on GitHub https://github.com/zeromq/libzmq/issues/1256#issuecomment-62644210.

hintjens avatar Nov 12 '14 00:11 hintjens

Digging into the push/pull leak I found that the pipe's are never destroyed, they are just marked as inactive and moved to the back of the list. They are still tracked and destroyed with the context, but every time a socket is reconnected there is a new set of pipes.

session_base_t::process_term() is called and it calls pipe_t::terminate() The pipe_t term command is sent and received, the delimiter is sent but is never received so it has one sub pipe waiting on the delimiter and the other waiting on the term ack. It then stays in this state until the context is destroyed.

greroger avatar Nov 24 '14 15:11 greroger

@greroger thanks a lot, just working on this one and your comment helps a lot

hurtonm avatar Nov 24 '14 15:11 hurtonm

I was about to submit an issue, i have the exact same leak with PUB/SUB sockets with multiple co/deco

==6187== 6,088 bytes in 1 blocks are definitely lost in loss record 1 of 1
==6187==    at 0x4C29F90: malloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==6187==    by 0x4E518C0: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x4E6F74B: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x4E6FB97: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x4E66419: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x4E4DBED: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x4E6AABF: ??? (in /usr/lib/libzmq.so.3.0.0)
==6187==    by 0x5EA9313: start_thread (in /usr/lib/libpthread-2.20.so)
==6187==    by 0x59DF5BC: clone (in /usr/lib/libc-2.20.so)

mwestphal avatar Nov 27 '14 09:11 mwestphal

@hurtonm Do you have any update on this ticket?

skinkie avatar Dec 06 '14 12:12 skinkie

@skinkie I feel very strange about this mem leak issue but I have never been able to reproduce it.

hurtonm avatar Dec 06 '14 14:12 hurtonm

I am actively reproducing it and it causes much downtime.

skinkie avatar Dec 06 '14 14:12 skinkie

@skinkie That't she mystery. I tried three different examples, and still no leak detected. One test was running inside docker image someone prepared for this. Could you do prepare simple C client and server that I can compile and run so that the leak shows up? I really want to understand this problem and fix it.

hurtonm avatar Dec 06 '14 14:12 hurtonm

https://github.com/zeromq/libzmq/issues/1256#issuecomment-62639664 can't make in more simple than that.

skinkie avatar Dec 06 '14 14:12 skinkie

@skinkie thanks, I managed to reproduce the leak reported. The problem is that in that example the published does not make any calls to library so that the library gets no chance to do internal cleaning. Calling

  zsocket_events (zsock_resolve (pubsub));

periodically solves that leak for that example. Not sure about other reported leaks.

hurtonm avatar Dec 07 '14 18:12 hurtonm

@hurtonm could you give a hint how to do something similar in https://github.com/StichtingOpenGeo/universal/blob/master/universal-pubsub.c

skinkie avatar Dec 07 '14 18:12 skinkie

@skinkie Is there a traffic on the pubsub socket when you experience the memory growth?

hurtonm avatar Dec 07 '14 18:12 hurtonm

The problem is actually that the clients use this tool, which at line 89 reconnect on "no data". To overcome network other issues. Hence the publisher at the top keeps growing when then is no data to distribute because of the reconnects.

https://github.com/StichtingOpenGeo/universal/blob/master/universal-sub-pubsub.c#L89

skinkie avatar Dec 07 '14 19:12 skinkie

@skinkie Just call zmq_getsockopt(pubsub, ZMQ_EVENTS, &events, ) before the goto statement. This shoulld invoke internal bookkeeping, which cleans up retired data structures. Please let me know it that helps.

hurtonm avatar Dec 07 '14 20:12 hurtonm

Your suggestion implemented as below does not fix it.

        } else {
            int fd = 0;
            size_t fd_size = sizeof(fd);

            zmq_close (items[0].socket);
            sleep (1);
            zmq_getsockopt (pubsub, ZMQ_EVENTS, &fd, &fd_size);

            goto init;
        }

skinkie avatar Dec 07 '14 21:12 skinkie

@skinkie Could you please simplify that program a bit so that I could easily reproduce the problem?

hurtonm avatar Dec 07 '14 21:12 hurtonm

#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <zmq.h>

int main (int argc, char *argv[]) {
    void *context  = zmq_init (1);

    zmq_pollitem_t items[1];

init:
    items[0].socket = zmq_socket (context, ZMQ_SUB);
    items[0].events = ZMQ_POLLIN;

    zmq_connect (items[0].socket, "tcp://127.0.0.1:7827");

    int rc;
    size_t more_size = sizeof(int);

    /* Ensure that every 60s there is data */
    while ((rc = zmq_poll (items, 1, 10)) >= 0) {
        if (rc > 0) {
            int more;
            do {
                /* Create an empty 0MQ message to hold the message part */
                zmq_msg_t part;
                rc = zmq_msg_init (&part);
                assert (rc == 0);

                /* Block until a message is available to be received from the socket */
                rc = zmq_msg_recv (&part, items[0].socket, 0);
                assert (rc != -1);

                /* Determine if more message parts are to follow */
                rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
                assert (rc == 0);

                zmq_msg_close (&part);
            } while (more);
        } else {
            zmq_close (items[0].socket);
            goto init;
        }
    }

    zmq_close (items[0].socket);

    zmq_ctx_destroy (context);

    return rc;
}
#include <pwd.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <unistd.h>


int main (int argc, char *argv[]) {
    void *context  = zmq_init (1);
    void *pubsub   = zmq_socket (context, ZMQ_XPUB);

    /* Apply a high water mark at the PubSub */
    uint64_t hwm   = 8192;
    zmq_setsockopt (pubsub, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt (pubsub, ZMQ_RCVHWM, &hwm, sizeof(hwm));

    zmq_bind (pubsub, "tcp://127.0.0.1:7827");

    zmq_pollitem_t items[1];

init:
    items[0].socket = zmq_socket (context, ZMQ_SUB);
    items[0].events = ZMQ_POLLIN;

    zmq_connect (items[0].socket, "tcp://127.0.0.1:7817");

    int rc;
    size_t more_size = sizeof(int);

    /* Ensure that every 60s there is data */
    while ((rc = zmq_poll (items, 1, 60 * 1000L)) >= 0) {
        if (rc > 0) {
            int more;
            do {
                /* Create an empty 0MQ message to hold the message part */
                zmq_msg_t part;
                rc = zmq_msg_init (&part);
                assert (rc == 0);

                /* Block until a message is available to be received from the socket */
                rc = zmq_msg_recv (&part, items[0].socket, 0);
                assert (rc != -1);

                /* Determine if more message parts are to follow */
                rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
                assert (rc == 0);

                /* Send the message, when more is set, apply the flag, otherwise don't */
                zmq_msg_send (&part, pubsub, (more ? ZMQ_SNDMORE : 0));

                zmq_msg_close (&part);
            } while (more);
        } else {
            int fd = 0;
            size_t fd_size = sizeof(fd);

            zmq_close (items[0].socket);
            sleep (1);
            zmq_getsockopt (pubsub, ZMQ_EVENTS, &fd, &fd_size);

            goto init;
        }
    }

    zmq_close (items[0].socket);

    zmq_close (pubsub);

    zmq_ctx_destroy (context);

    return 0;
}

skinkie avatar Dec 07 '14 21:12 skinkie