libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

zmq_ctx_term() blocks, if socket bind failed!

Open EgalYue opened this issue 4 years ago • 4 comments

Issue description

I made a communication between 2 RK3399 chips using ZMQ. I already set linger=0 flag and zmq_close in the same thread. I use ctrl+c to interrupt my program. But I found sometimes, zmq_ctx_term() is still blocked, when sockets were closed.

In my program the subscriber-socket uses zmq_bind not zmq_connect, and the socket reconnects every 0.5s when these is no message comes.

Through many experiments, I found this problem only occurs when "socket bind failed! "(address is already in use). I know because of short-time(500ms) reconnect, the last socket is still in TIME_WAIT state, the socket in ZMQbg/Reaper maybe not closed actually?

Environment

  • libzmq version (commit hash if unreleased): 4.3
  • OS: Docker in ubuntu18.04 or RK3399 chip (linaro)

Minimal test code / Steps to reproduce the issue

I can reproduce this issue in docker or RK3399. And I write a piece of code used to reproduce it. On my laptop the socket can always reconnect every 500ms, so it never occurs on laptop.

#include <zmq.h>
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include <map>
#include <string>
#include <string.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <vector>
#include <unistd.h>
#include <signal.h>
#include<thread>

std::string randstr(const int len)
{
    char str[20];
    srand(time(NULL));
    int i;
    for (i = 0; i < len; ++i)
    {
        switch ((rand() % 3))
        {
        case 1:
            str[i] = 'A' + rand() % 26;
            break;
        case 2:
            str[i] = 'a' + rand() % 26;
            break;
        default:
            str[i] = '0' + rand() % 10;
            break;
        }
    }
    str[i] = '\0';
    std::string res = str;
    return res;
}


struct nbZmqSockets{
    zmq_pollitem_t items [2];
};

class SimpleSuber{
    public:
        SimpleSuber(void* context, const std::string& ip): 
                    m_Sockets(new nbZmqSockets()), 
                    m_context(context), 
                    m_ip(ip){
            if (init() < 0){
                std::abort();
            }
        }

        ~SimpleSuber(){
            for (auto x: m_Sockets->items){
                zmq_close(x.socket);
            }
        }

        int init(){
            //-------- 1st socket: used to sub ------------
            std::string port = "33567";
            if(port.empty()){
                std::cout<<"[ZMQ Subscriber]: Error msgType does not match a port! Please check defined ports in 'common.h' !"<<std::endl;
                return -1;
            }

            void* receiver = zmq_socket(m_context, ZMQ_SUB);
            std::string endpoint = m_ip + ":" + port;

            if (zmq_bind(receiver, endpoint.c_str()) < 0){
                std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
                return -1;
            }
            std::cout<<"[ZMQ Subscriber]: Bind ok! " << endpoint.c_str()<<std::endl;
            int linger = 0;
            zmq_setsockopt (receiver, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
            
            //-------- 2nd socket: used to terminate Blocking sub ------------
            void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
            std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
            m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
            if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
                return -1;
            }
            zmq_setsockopt (ctrlPuller, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!

            m_Sockets->items[0] = { receiver, 0, ZMQ_POLLIN, 0 };
            m_Sockets->items[1] = { ctrlPuller, 0, ZMQ_POLLIN, 0 };
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind ok! ."<< m_endpointCtrl.c_str()<<std::endl;
            return 1;
        }

        int reconnect(){
            closeSocket();
            if (init() < 0){
                std::cout<<"[ZMQ Subscriber]: Reconnect failed !!!"<<std::endl;  
                return -1; 
            } else{
                std::cout<<"[ZMQ Subscriber]: Reconnect ok !!!"<<std::endl;  
                return 1;
            }

        }

        void closeSocket(){
            for (auto x: m_Sockets->items){
                zmq_close(x.socket);
            }
        }

        int subscribe(std::string & str_msg, long timeout=500){
            std::cout << "subscribe" << std::endl;
            void* receiver = m_Sockets->items[0].socket;
            std::string topicStr = "str_msg";
            const char* topic = topicStr.c_str(); // Topic filter
            zmq_setsockopt (receiver, ZMQ_SUBSCRIBE, topic, strlen(topic));

            int poll_flag = zmq_poll (m_Sockets->items, 2, timeout);
            if (0 == poll_flag){ // blcoking point !
                std::cout<<"[ZMQ Subscriber]: Not receive Ts2WithID msg exceeded %ld[ms] ! Reconnecting now..." << timeout << std::endl;
                while (1){
                    if (reconnect() > 0){
                        return 0; 
                    }
                    std::this_thread::sleep_for (std::chrono::milliseconds(timeout)); //TODO important!!!  
                }
                return 0; 
            } else if (-1 == poll_flag){
                std::cout<<"[ZMQ Subscriber]: Error occur by zmq_poll Ts2WithID!" << std::endl;
                return -1; 
            }

            //-------------------- 1st socket: used to Sub ---------------------------------
            if (m_Sockets->items[0].revents & ZMQ_POLLIN){
                {
                    zmq_msg_t recvMsg;
                    int rc_env = zmq_msg_init(&recvMsg);
                    rc_env = zmq_msg_recv(&recvMsg, receiver, 0);
                    if (-1 == rc_env){
                        std::cout<<"[ZMQ Subscriber]: Recv Stop Signal or error occurs when receiving msg..."<<std::endl;
                        return -1;
                    }
                    size_t size = zmq_msg_size(&recvMsg);
                    char *str_msg = (char*)malloc(size + 1);
	                memcpy(str_msg, zmq_msg_data(&recvMsg), size);
                    zmq_msg_close(&recvMsg);
                }
            }
            // ----------------2nd socket: used to terminate Blocking ----------------------
            if (m_Sockets->items[1].revents & ZMQ_POLLIN){
                return executeTerminateCmd();
            }
            return 1;
        }


        int terminateBlocking(){
            void* ctrlPusher = zmq_socket(m_context, ZMQ_REQ);

            if (zmq_connect (ctrlPusher, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect failed!!!  " << m_endpointCtrl.c_str()<<std::endl;
                std::abort(); //TODO...
            }
            int linger = 0;
            zmq_setsockopt (ctrlPusher, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect ok!!!  (%s)"<< m_endpointCtrl.c_str()<<std::endl;

            const char* ctrlCmd = "KILL";
            zmq_msg_t ctrlMsg;
            zmq_msg_init_size(&ctrlMsg, strlen(ctrlCmd));
            memcpy(zmq_msg_data(&ctrlMsg), ctrlCmd, strlen(ctrlCmd));
            int rc = zmq_msg_send(&ctrlMsg, ctrlPusher, 0); // TODO...
            if (-1 == rc){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send '%s' as ctrlCmd failed!!!"<< ctrlCmd<<std::endl;
                zmq_close(ctrlPusher);
                return -1;
            }
            zmq_msg_close(&ctrlMsg);
            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send ctrlCmd successfully, the blocked subscriber will be closed..."<<std::endl;

            //  watting for a reply, with timeout
            zmq_pollitem_t items[] = {{static_cast<void*>(ctrlPusher), 0, ZMQ_POLLIN, 0}};
            int poll_rc = zmq_poll(&items[0], 1, 500); //TODO timeout ms 
            if (poll_rc <= 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Time out when receiving reply from control..."<<std::endl;
                zmq_close(ctrlPusher);
                return -1; //TODO return 1?
            }
            //  If we got a reply, process it
            if (items[0].revents & ZMQ_POLLIN) {
                zmq_msg_t replyMsg;
                rc = zmq_msg_init(&replyMsg);
                rc = zmq_msg_recv(&replyMsg, ctrlPusher, 0);
                if (-1 == rc){
                    std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Error occurs when receiving Reply from server"<<std::endl;
                    zmq_close(ctrlPusher);
                    return -1;
                }
                zmq_msg_close(&replyMsg);
                std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Succ get reply from control"<<std::endl;
            }

            zmq_close(ctrlPusher);
            std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Finish terminateBlocking task successfully!"<<std::endl;
            return 1;
        }
    private:
        /**
         * Receive Terminate signal
         */
        int executeTerminateCmd(){
            //ALOGD("[ZMQ Subscriber]: [ctrlPuller] Running executeTerminateCmd, begin:");
            void* ctrlPuller = m_Sockets->items[1].socket;
                
            zmq_msg_t ctrlMsg;
            int rc = zmq_msg_init(&ctrlMsg);
            rc = zmq_msg_recv(&ctrlMsg, ctrlPuller, 0);
            if (-1 == rc) {
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] pull ctrlCmd failed!!!"<<std::endl;
                return -1;
            }

            size_t msgSize = zmq_msg_size(&ctrlMsg);
            char *recvMsg = (char*)malloc(msgSize + 1);
            memcpy(recvMsg, zmq_msg_data(&ctrlMsg), msgSize);
            zmq_msg_close(&ctrlMsg);
            recvMsg[msgSize] = 0;
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] recv '%s' as ctrlCmd." <<  recvMsg << std::endl;

            // Reply info: "OK"    
            const char* ans = "OK"; // can be anything
            zmq_msg_t replyMsg;
            zmq_msg_init_size(&replyMsg, strlen(ans));
            memcpy(zmq_msg_data(&replyMsg), ans, strlen(ans));
            rc = zmq_msg_send(&replyMsg, ctrlPuller, 0);
            if (-1 == rc){
                std::cout<<"[ZMQ Subscriber]:[ctrlPuller] Send 'OK' as reply failed!"<<std::endl;
                return -1;
            }
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Reply 'OK' successfully!"<<std::endl;
            zmq_msg_close(&replyMsg);

            // call close sockets
            closeSocket();
            std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Terminate Blocking sub successfully! Sockets (%s) are already closed!"<<std::endl;
            return -2; // TODO... return 0? 
        }

        void* m_context;
        std::string m_ip;
        std::shared_ptr<nbZmqSockets> m_Sockets;
        std::string m_endpointCtrl;
};

bool app_stopped = false;
void sigint_handler(int sig){
	if(sig == SIGINT){
		// ctrl+c退出时执行的代码
		std::cout << "ctrl+c pressed!" << std::endl;
		app_stopped = true;
	}
}

void t_sub(std::shared_ptr<SimpleSuber> suber){
    std::string msg_str = "";
    while(1){
        int sleep_t = 100000;
        usleep(sleep_t);
        std::cout << ">>> loop begin after "<< sleep_t << " us"<<std::endl;
        if (suber->subscribe(msg_str) <= -1){
            break;
            std::cout << ">> loop finish"<<std::endl;
        }
        std::cout << ">>> loop end"<<std::endl;
    }
}

int main(int argc, char **argv){
    if (argc < 2){
        std::cout<< "Usage: simple_sub [ip]" << std::endl;
        std::cout<< "Ex: simple_sub tcp://127.0.0.1" << std::endl;
        return -1;
    }
    std::cout << "=== This is simple_sub ... ===" << std::endl;
    std::string ip = argv[1]; // "tcp://127.0.0.1";
    signal(SIGINT, sigint_handler); // ctrl + c

    void* m_context = zmq_ctx_new();
    std::shared_ptr<SimpleSuber> suber = std::make_shared<SimpleSuber>(m_context, ip);
    std::thread thread_sub(t_sub, suber);
    

    int sleep_t = 50000000;
    usleep(sleep_t);
    std::cout<<">>> After sleep: " << sleep_t << "us"<<std::endl;
   
    int cnt = 0;
    while(1){
        cnt++;
        if (cnt % 10000 == 0){
            std::cout<< "cnt=" << cnt << std::endl;
        }
        if (app_stopped){
            suber->terminateBlocking(); // terminate socket, stop socket in the same thread
			break;
		}
    }

    if (thread_sub.joinable()){
        thread_sub.join();
    }

    zmq_ctx_destroy(m_context);
    std::cout<< " --- end ---" << std::endl;
    return 0;
}

What's the actual result? (include assertion message & call stack if applicable)

Block inside zmq_ctx_term()! 672502605

Never bind failed, can exit successfully! 1787261099

What's the expected result?

Even though socket binds failed somethimes, we can release all resources after calling zmq_ctx_term(), and it should not be blocked!

EgalYue avatar Dec 02 '20 09:12 EgalYue

It's hard to tell from the code included, but there is a race condition setting linger that is documented here: https://github.com/zeromq/libzmq/issues/3252.

Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).

bill-torpey avatar Jan 05 '21 18:01 bill-torpey

Thanks @bill-torpey

Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).

I modified my code, and tested in docker, it seem that is works fine now! I will try to make some more test for this problem.

EgalYue avatar Jan 06 '21 04:01 EgalYue

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

stale[bot] avatar Apr 16 '22 17:04 stale[bot]

I think the root cause of blocking zmq_ctx_term() is because of socket leak when zmq_bind() failed:

            void* receiver = zmq_socket(m_context, ZMQ_SUB);
            std::string endpoint = m_ip + ":" + port;

            if (zmq_bind(receiver, endpoint.c_str()) < 0){
                std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
                return -1;
            }
            ......
            void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
            std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
            m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
            if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
                std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
                return -1;
            }

You should call zmq_close() for receiver and ctrlPuller when zmq_bind() failed.

NanXiao avatar Oct 24 '23 03:10 NanXiao