zmq_ctx_term() blocks, if socket bind failed!

Open EgalYue opened this issue 4 years ago • 4 comments

Issue description

I made a communication between 2 RK3399 chips using ZMQ. I already set linger=0 flag and zmq_close in the same thread. I use ctrl+c to interrupt my program. But I found sometimes, zmq_ctx_term() is still blocked, when sockets were closed.

In my program the subscriber-socket uses zmq_bind not zmq_connect, and the socket reconnects every 0.5s when these is no message comes.

Through many experiments, I found this problem only occurs when "socket bind failed! "(address is already in use). I know because of short-time(500ms) reconnect, the last socket is still in TIME_WAIT state, the socket in ZMQbg/Reaper maybe not closed actually?


  • libzmq version (commit hash if unreleased): 4.3
  • OS: Docker in ubuntu18.04 or RK3399 chip (linaro)

Minimal test code / Steps to reproduce the issue

I can reproduce this issue in docker or RK3399. And I write a piece of code used to reproduce it. On my laptop the socket can always reconnect every 500ms, so it never occurs on laptop.

#include <zmq.h>
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include <map>
#include <string>
#include <string.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <vector>
#include <unistd.h>
#include <signal.h>

std::string randstr(const int len)
    char str[20];
    int i;
    for (i = 0; i < len; ++i)
        switch ((rand() % 3))
        case 1:
            str[i] = 'A' + rand() % 26;
        case 2:
            str[i] = 'a' + rand() % 26;
            str[i] = '0' + rand() % 10;
    str[i] = '\0';
    std::string res = str;
    return res;

struct nbZmqSockets{
    zmq_pollitem_t items [2];

class SimpleSuber{
        SimpleSuber(void* context, const std::string& ip): 
                    m_Sockets(new nbZmqSockets()), 
            if (init() < 0){

            for (auto x: m_Sockets->items){

        int init(){
            //-------- 1st socket: used to sub ------------
            std::string port = "33567";
                std::cout<<"[ZMQ Subscriber]: Error msgType does not match a port! Please check defined ports in 'common.h' !"<<std::endl;
                return -1;

            void* receiver = zmq_socket(m_context, ZMQ_SUB);
            std::string endpoint = m_ip + ":" + port;

            if (zmq_bind(receiver, endpoint.c_str()) < 0){
                std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
                return -1;
            std::cout<<"[ZMQ Subscriber]: Bind ok! " << endpoint.c_str()<<std::endl;
            int linger = 0;
            zmq_setsockopt (receiver, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
            //-------- 2nd socket: used to terminate Blocking sub ------------
            void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
            std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
            m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
            if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
                return -1;
            zmq_setsockopt (ctrlPuller, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!

            m_Sockets->items[0] = { receiver, 0, ZMQ_POLLIN, 0 };
            m_Sockets->items[1] = { ctrlPuller, 0, ZMQ_POLLIN, 0 };
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind ok! ."<< m_endpointCtrl.c_str()<<std::endl;
            return 1;

        int reconnect(){
            if (init() < 0){
                std::cout<<"[ZMQ Subscriber]: Reconnect failed !!!"<<std::endl;  
                return -1; 
            } else{
                std::cout<<"[ZMQ Subscriber]: Reconnect ok !!!"<<std::endl;  
                return 1;


        void closeSocket(){
            for (auto x: m_Sockets->items){

        int subscribe(std::string & str_msg, long timeout=500){
            std::cout << "subscribe" << std::endl;
            void* receiver = m_Sockets->items[0].socket;
            std::string topicStr = "str_msg";
            const char* topic = topicStr.c_str(); // Topic filter
            zmq_setsockopt (receiver, ZMQ_SUBSCRIBE, topic, strlen(topic));

            int poll_flag = zmq_poll (m_Sockets->items, 2, timeout);
            if (0 == poll_flag){ // blcoking point !
                std::cout<<"[ZMQ Subscriber]: Not receive Ts2WithID msg exceeded %ld[ms] ! Reconnecting now..." << timeout << std::endl;
                while (1){
                    if (reconnect() > 0){
                        return 0; 
                    std::this_thread::sleep_for (std::chrono::milliseconds(timeout)); //TODO important!!!  
                return 0; 
            } else if (-1 == poll_flag){
                std::cout<<"[ZMQ Subscriber]: Error occur by zmq_poll Ts2WithID!" << std::endl;
                return -1; 

            //-------------------- 1st socket: used to Sub ---------------------------------
            if (m_Sockets->items[0].revents & ZMQ_POLLIN){
                    zmq_msg_t recvMsg;
                    int rc_env = zmq_msg_init(&recvMsg);
                    rc_env = zmq_msg_recv(&recvMsg, receiver, 0);
                    if (-1 == rc_env){
                        std::cout<<"[ZMQ Subscriber]: Recv Stop Signal or error occurs when receiving msg..."<<std::endl;
                        return -1;
                    size_t size = zmq_msg_size(&recvMsg);
                    char *str_msg = (char*)malloc(size + 1);
	                memcpy(str_msg, zmq_msg_data(&recvMsg), size);
            // ----------------2nd socket: used to terminate Blocking ----------------------
            if (m_Sockets->items[1].revents & ZMQ_POLLIN){
                return executeTerminateCmd();
            return 1;

        int terminateBlocking(){
            void* ctrlPusher = zmq_socket(m_context, ZMQ_REQ);

            if (zmq_connect (ctrlPusher, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect failed!!!  " << m_endpointCtrl.c_str()<<std::endl;
                std::abort(); //TODO...
            int linger = 0;
            zmq_setsockopt (ctrlPusher, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect ok!!!  (%s)"<< m_endpointCtrl.c_str()<<std::endl;

            const char* ctrlCmd = "KILL";
            zmq_msg_t ctrlMsg;
            zmq_msg_init_size(&ctrlMsg, strlen(ctrlCmd));
            memcpy(zmq_msg_data(&ctrlMsg), ctrlCmd, strlen(ctrlCmd));
            int rc = zmq_msg_send(&ctrlMsg, ctrlPusher, 0); // TODO...
            if (-1 == rc){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send '%s' as ctrlCmd failed!!!"<< ctrlCmd<<std::endl;
                return -1;
            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send ctrlCmd successfully, the blocked subscriber will be closed..."<<std::endl;

            //  watting for a reply, with timeout
            zmq_pollitem_t items[] = {{static_cast<void*>(ctrlPusher), 0, ZMQ_POLLIN, 0}};
            int poll_rc = zmq_poll(&items[0], 1, 500); //TODO timeout ms 
            if (poll_rc <= 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Time out when receiving reply from control..."<<std::endl;
                return -1; //TODO return 1?
            //  If we got a reply, process it
            if (items[0].revents & ZMQ_POLLIN) {
                zmq_msg_t replyMsg;
                rc = zmq_msg_init(&replyMsg);
                rc = zmq_msg_recv(&replyMsg, ctrlPusher, 0);
                if (-1 == rc){
                    std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Error occurs when receiving Reply from server"<<std::endl;
                    return -1;
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Succ get reply from control"<<std::endl;

            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Finish terminateBlocking task successfully!"<<std::endl;
            return 1;
         * Receive Terminate signal
        int executeTerminateCmd(){
            //ALOGD("[ZMQ Subscriber]: [ctrlPuller] Running executeTerminateCmd, begin:");
            void* ctrlPuller = m_Sockets->items[1].socket;
            zmq_msg_t ctrlMsg;
            int rc = zmq_msg_init(&ctrlMsg);
            rc = zmq_msg_recv(&ctrlMsg, ctrlPuller, 0);
            if (-1 == rc) {
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] pull ctrlCmd failed!!!"<<std::endl;
                return -1;

            size_t msgSize = zmq_msg_size(&ctrlMsg);
            char *recvMsg = (char*)malloc(msgSize + 1);
            memcpy(recvMsg, zmq_msg_data(&ctrlMsg), msgSize);
            recvMsg[msgSize] = 0;
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] recv '%s' as ctrlCmd." <<  recvMsg << std::endl;

            // Reply info: "OK"    
            const char* ans = "OK"; // can be anything
            zmq_msg_t replyMsg;
            zmq_msg_init_size(&replyMsg, strlen(ans));
            memcpy(zmq_msg_data(&replyMsg), ans, strlen(ans));
            rc = zmq_msg_send(&replyMsg, ctrlPuller, 0);
            if (-1 == rc){
                std::cout<<"[ZMQ Subscriber]:[ctrlPuller] Send 'OK' as reply failed!"<<std::endl;
                return -1;
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Reply 'OK' successfully!"<<std::endl;

            // call close sockets
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Terminate Blocking sub successfully! Sockets (%s) are already closed!"<<std::endl;
            return -2; // TODO... return 0? 

        void* m_context;
        std::string m_ip;
        std::shared_ptr<nbZmqSockets> m_Sockets;
        std::string m_endpointCtrl;

bool app_stopped = false;
void sigint_handler(int sig){
	if(sig == SIGINT){
		// ctrl+c退出时执行的代码
		std::cout << "ctrl+c pressed!" << std::endl;
		app_stopped = true;

void t_sub(std::shared_ptr<SimpleSuber> suber){
    std::string msg_str = "";
        int sleep_t = 100000;
        std::cout << ">>> loop begin after "<< sleep_t << " us"<<std::endl;
        if (suber->subscribe(msg_str) <= -1){
            std::cout << ">> loop finish"<<std::endl;
        std::cout << ">>> loop end"<<std::endl;

int main(int argc, char **argv){
    if (argc < 2){
        std::cout<< "Usage: simple_sub [ip]" << std::endl;
        std::cout<< "Ex: simple_sub tcp://" << std::endl;
        return -1;
    std::cout << "=== This is simple_sub ... ===" << std::endl;
    std::string ip = argv[1]; // "tcp://";
    signal(SIGINT, sigint_handler); // ctrl + c

    void* m_context = zmq_ctx_new();
    std::shared_ptr<SimpleSuber> suber = std::make_shared<SimpleSuber>(m_context, ip);
    std::thread thread_sub(t_sub, suber);

    int sleep_t = 50000000;
    std::cout<<">>> After sleep: " << sleep_t << "us"<<std::endl;
    int cnt = 0;
        if (cnt % 10000 == 0){
            std::cout<< "cnt=" << cnt << std::endl;
        if (app_stopped){
            suber->terminateBlocking(); // terminate socket, stop socket in the same thread

    if (thread_sub.joinable()){

    std::cout<< " --- end ---" << std::endl;
    return 0;

What's the actual result? (include assertion message & call stack if applicable)

Block inside zmq_ctx_term()! 672502605

Never bind failed, can exit successfully! 1787261099

What's the expected result?

Even though socket binds failed somethimes, we can release all resources after calling zmq_ctx_term(), and it should not be blocked!

EgalYue avatar Dec 02 '20 09:12 EgalYue

It's hard to tell from the code included, but there is a race condition setting linger that is documented here:

Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).

bill-torpey avatar Jan 05 '21 18:01 bill-torpey

Thanks @bill-torpey

Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).

I modified my code, and tested in docker, it seem that is works fine now! I will try to make some more test for this problem.

EgalYue avatar Jan 06 '21 04:01 EgalYue

stale[bot] avatar Apr 16 '22 17:04 stale[bot]

I think the root cause of blocking zmq_ctx_term() is because of socket leak when zmq_bind() failed:

            void* receiver = zmq_socket(m_context, ZMQ_SUB);
            std::string endpoint = m_ip + ":" + port;

            if (zmq_bind(receiver, endpoint.c_str()) < 0){
                std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
                return -1;
            void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
            std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
            m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
            if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
                return -1;

You should call zmq_close() for receiver and ctrlPuller when zmq_bind() failed.

NanXiao avatar Oct 24 '23 03:10 NanXiao