libzmq
libzmq copied to clipboard
zmq_ctx_term() blocks, if socket bind failed!
Issue description
I made a communication between 2 RK3399 chips using ZMQ. I already set linger=0
flag and zmq_close
in the same thread.
I use ctrl+c
to interrupt my program. But I found sometimes, zmq_ctx_term()
is still blocked, when sockets were closed.
In my program the subscriber-socket uses zmq_bind
not zmq_connect
, and the socket reconnects every 0.5s when these is no message comes.
Through many experiments, I found this problem only occurs when "socket bind failed! "(address is already in use).
I know because of short-time(500ms) reconnect, the last socket is still in TIME_WAIT state, the socket in ZMQbg/Reaper
maybe not closed actually?
Environment
- libzmq version (commit hash if unreleased): 4.3
- OS: Docker in ubuntu18.04 or RK3399 chip (linaro)
Minimal test code / Steps to reproduce the issue
I can reproduce this issue in docker or RK3399. And I write a piece of code used to reproduce it. On my laptop the socket can always reconnect every 500ms, so it never occurs on laptop.
#include <zmq.h>
#include <iostream>
#include <string>
#include <memory>
#include <functional>
#include <map>
#include <string>
#include <string.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <vector>
#include <unistd.h>
#include <signal.h>
#include<thread>
std::string randstr(const int len)
{
char str[20];
srand(time(NULL));
int i;
for (i = 0; i < len; ++i)
{
switch ((rand() % 3))
{
case 1:
str[i] = 'A' + rand() % 26;
break;
case 2:
str[i] = 'a' + rand() % 26;
break;
default:
str[i] = '0' + rand() % 10;
break;
}
}
str[i] = '\0';
std::string res = str;
return res;
}
struct nbZmqSockets{
zmq_pollitem_t items [2];
};
class SimpleSuber{
public:
SimpleSuber(void* context, const std::string& ip):
m_Sockets(new nbZmqSockets()),
m_context(context),
m_ip(ip){
if (init() < 0){
std::abort();
}
}
~SimpleSuber(){
for (auto x: m_Sockets->items){
zmq_close(x.socket);
}
}
int init(){
//-------- 1st socket: used to sub ------------
std::string port = "33567";
if(port.empty()){
std::cout<<"[ZMQ Subscriber]: Error msgType does not match a port! Please check defined ports in 'common.h' !"<<std::endl;
return -1;
}
void* receiver = zmq_socket(m_context, ZMQ_SUB);
std::string endpoint = m_ip + ":" + port;
if (zmq_bind(receiver, endpoint.c_str()) < 0){
std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
return -1;
}
std::cout<<"[ZMQ Subscriber]: Bind ok! " << endpoint.c_str()<<std::endl;
int linger = 0;
zmq_setsockopt (receiver, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
//-------- 2nd socket: used to terminate Blocking sub ------------
void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
return -1;
}
zmq_setsockopt (ctrlPuller, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
m_Sockets->items[0] = { receiver, 0, ZMQ_POLLIN, 0 };
m_Sockets->items[1] = { ctrlPuller, 0, ZMQ_POLLIN, 0 };
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind ok! ."<< m_endpointCtrl.c_str()<<std::endl;
return 1;
}
int reconnect(){
closeSocket();
if (init() < 0){
std::cout<<"[ZMQ Subscriber]: Reconnect failed !!!"<<std::endl;
return -1;
} else{
std::cout<<"[ZMQ Subscriber]: Reconnect ok !!!"<<std::endl;
return 1;
}
}
void closeSocket(){
for (auto x: m_Sockets->items){
zmq_close(x.socket);
}
}
int subscribe(std::string & str_msg, long timeout=500){
std::cout << "subscribe" << std::endl;
void* receiver = m_Sockets->items[0].socket;
std::string topicStr = "str_msg";
const char* topic = topicStr.c_str(); // Topic filter
zmq_setsockopt (receiver, ZMQ_SUBSCRIBE, topic, strlen(topic));
int poll_flag = zmq_poll (m_Sockets->items, 2, timeout);
if (0 == poll_flag){ // blcoking point !
std::cout<<"[ZMQ Subscriber]: Not receive Ts2WithID msg exceeded %ld[ms] ! Reconnecting now..." << timeout << std::endl;
while (1){
if (reconnect() > 0){
return 0;
}
std::this_thread::sleep_for (std::chrono::milliseconds(timeout)); //TODO important!!!
}
return 0;
} else if (-1 == poll_flag){
std::cout<<"[ZMQ Subscriber]: Error occur by zmq_poll Ts2WithID!" << std::endl;
return -1;
}
//-------------------- 1st socket: used to Sub ---------------------------------
if (m_Sockets->items[0].revents & ZMQ_POLLIN){
{
zmq_msg_t recvMsg;
int rc_env = zmq_msg_init(&recvMsg);
rc_env = zmq_msg_recv(&recvMsg, receiver, 0);
if (-1 == rc_env){
std::cout<<"[ZMQ Subscriber]: Recv Stop Signal or error occurs when receiving msg..."<<std::endl;
return -1;
}
size_t size = zmq_msg_size(&recvMsg);
char *str_msg = (char*)malloc(size + 1);
memcpy(str_msg, zmq_msg_data(&recvMsg), size);
zmq_msg_close(&recvMsg);
}
}
// ----------------2nd socket: used to terminate Blocking ----------------------
if (m_Sockets->items[1].revents & ZMQ_POLLIN){
return executeTerminateCmd();
}
return 1;
}
int terminateBlocking(){
void* ctrlPusher = zmq_socket(m_context, ZMQ_REQ);
if (zmq_connect (ctrlPusher, m_endpointCtrl.c_str()) < 0){
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect failed!!! " << m_endpointCtrl.c_str()<<std::endl;
std::abort(); //TODO...
}
int linger = 0;
zmq_setsockopt (ctrlPusher, ZMQ_LINGER, &linger, sizeof(linger)); // important!!!
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Connect ok!!! (%s)"<< m_endpointCtrl.c_str()<<std::endl;
const char* ctrlCmd = "KILL";
zmq_msg_t ctrlMsg;
zmq_msg_init_size(&ctrlMsg, strlen(ctrlCmd));
memcpy(zmq_msg_data(&ctrlMsg), ctrlCmd, strlen(ctrlCmd));
int rc = zmq_msg_send(&ctrlMsg, ctrlPusher, 0); // TODO...
if (-1 == rc){
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send '%s' as ctrlCmd failed!!!"<< ctrlCmd<<std::endl;
zmq_close(ctrlPusher);
return -1;
}
zmq_msg_close(&ctrlMsg);
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Send ctrlCmd successfully, the blocked subscriber will be closed..."<<std::endl;
// watting for a reply, with timeout
zmq_pollitem_t items[] = {{static_cast<void*>(ctrlPusher), 0, ZMQ_POLLIN, 0}};
int poll_rc = zmq_poll(&items[0], 1, 500); //TODO timeout ms
if (poll_rc <= 0){
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Time out when receiving reply from control..."<<std::endl;
zmq_close(ctrlPusher);
return -1; //TODO return 1?
}
// If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN) {
zmq_msg_t replyMsg;
rc = zmq_msg_init(&replyMsg);
rc = zmq_msg_recv(&replyMsg, ctrlPusher, 0);
if (-1 == rc){
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Error occurs when receiving Reply from server"<<std::endl;
zmq_close(ctrlPusher);
return -1;
}
zmq_msg_close(&replyMsg);
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Succ get reply from control"<<std::endl;
}
zmq_close(ctrlPusher);
std::cout<<"[ZMQ Subscriber]: [ctrlPusher] Finish terminateBlocking task successfully!"<<std::endl;
return 1;
}
private:
/**
* Receive Terminate signal
*/
int executeTerminateCmd(){
//ALOGD("[ZMQ Subscriber]: [ctrlPuller] Running executeTerminateCmd, begin:");
void* ctrlPuller = m_Sockets->items[1].socket;
zmq_msg_t ctrlMsg;
int rc = zmq_msg_init(&ctrlMsg);
rc = zmq_msg_recv(&ctrlMsg, ctrlPuller, 0);
if (-1 == rc) {
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] pull ctrlCmd failed!!!"<<std::endl;
return -1;
}
size_t msgSize = zmq_msg_size(&ctrlMsg);
char *recvMsg = (char*)malloc(msgSize + 1);
memcpy(recvMsg, zmq_msg_data(&ctrlMsg), msgSize);
zmq_msg_close(&ctrlMsg);
recvMsg[msgSize] = 0;
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] recv '%s' as ctrlCmd." << recvMsg << std::endl;
// Reply info: "OK"
const char* ans = "OK"; // can be anything
zmq_msg_t replyMsg;
zmq_msg_init_size(&replyMsg, strlen(ans));
memcpy(zmq_msg_data(&replyMsg), ans, strlen(ans));
rc = zmq_msg_send(&replyMsg, ctrlPuller, 0);
if (-1 == rc){
std::cout<<"[ZMQ Subscriber]:[ctrlPuller] Send 'OK' as reply failed!"<<std::endl;
return -1;
}
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Reply 'OK' successfully!"<<std::endl;
zmq_msg_close(&replyMsg);
// call close sockets
closeSocket();
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Terminate Blocking sub successfully! Sockets (%s) are already closed!"<<std::endl;
return -2; // TODO... return 0?
}
void* m_context;
std::string m_ip;
std::shared_ptr<nbZmqSockets> m_Sockets;
std::string m_endpointCtrl;
};
bool app_stopped = false;
void sigint_handler(int sig){
if(sig == SIGINT){
// ctrl+c退出时执行的代码
std::cout << "ctrl+c pressed!" << std::endl;
app_stopped = true;
}
}
void t_sub(std::shared_ptr<SimpleSuber> suber){
std::string msg_str = "";
while(1){
int sleep_t = 100000;
usleep(sleep_t);
std::cout << ">>> loop begin after "<< sleep_t << " us"<<std::endl;
if (suber->subscribe(msg_str) <= -1){
break;
std::cout << ">> loop finish"<<std::endl;
}
std::cout << ">>> loop end"<<std::endl;
}
}
int main(int argc, char **argv){
if (argc < 2){
std::cout<< "Usage: simple_sub [ip]" << std::endl;
std::cout<< "Ex: simple_sub tcp://127.0.0.1" << std::endl;
return -1;
}
std::cout << "=== This is simple_sub ... ===" << std::endl;
std::string ip = argv[1]; // "tcp://127.0.0.1";
signal(SIGINT, sigint_handler); // ctrl + c
void* m_context = zmq_ctx_new();
std::shared_ptr<SimpleSuber> suber = std::make_shared<SimpleSuber>(m_context, ip);
std::thread thread_sub(t_sub, suber);
int sleep_t = 50000000;
usleep(sleep_t);
std::cout<<">>> After sleep: " << sleep_t << "us"<<std::endl;
int cnt = 0;
while(1){
cnt++;
if (cnt % 10000 == 0){
std::cout<< "cnt=" << cnt << std::endl;
}
if (app_stopped){
suber->terminateBlocking(); // terminate socket, stop socket in the same thread
break;
}
}
if (thread_sub.joinable()){
thread_sub.join();
}
zmq_ctx_destroy(m_context);
std::cout<< " --- end ---" << std::endl;
return 0;
}
What's the actual result? (include assertion message & call stack if applicable)
Block inside zmq_ctx_term()
!
Never bind failed, can exit successfully!
What's the expected result?
Even though socket binds failed somethimes, we can release all resources after calling zmq_ctx_term()
, and it should not be blocked!
It's hard to tell from the code included, but there is a race condition setting linger that is documented here: https://github.com/zeromq/libzmq/issues/3252.
Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).
Thanks @bill-torpey
Our workaround has been to set the linger option immediately after creating the socket (with zmq_socket).
I modified my code, and tested in docker, it seem that is works fine now! I will try to make some more test for this problem.
This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.
I think the root cause of blocking zmq_ctx_term()
is because of socket leak when zmq_bind()
failed:
void* receiver = zmq_socket(m_context, ZMQ_SUB);
std::string endpoint = m_ip + ":" + port;
if (zmq_bind(receiver, endpoint.c_str()) < 0){
std::cout<<"[ZMQ subscriber]: Bind failed! "<< endpoint.c_str()<<std::endl;
return -1;
}
......
void *ctrlPuller = zmq_socket (m_context, ZMQ_REP);
std::string IPC_CTRL_CHANNEL_SUB = "inproc:///sdcard/ctrlChannel_sub";
m_endpointCtrl = IPC_CTRL_CHANNEL_SUB + randstr(8);
if (zmq_bind (ctrlPuller, m_endpointCtrl.c_str()) < 0){
std::cout<<"[ZMQ Subscriber]: [ctrlPuller] Bind failed!!! "<< m_endpointCtrl.c_str()<<std::endl;
return -1;
}
You should call zmq_close()
for receiver
and ctrlPuller
when zmq_bind()
failed.