help
help copied to clipboard
uv_async_send aborted in iOS
- Version: 1.17
- Platform: iOS
I create and run uv loop in thread A, then in another thread B, I send a async event to stop the uv loop and then close async handle and loop. This work OK in most times,but I got some crashs one or two a day, I can not find out where is wrong, please help me and any help will be appreciated.
The Crash Log:
0 | libsystem_kernel.dylib | 0x0000000193d660dc ___pthread_kill + 8 |
---|---|---|
1 | libsystem_c.dylib | 0x0000000193cbfea8 _abort + 140 |
2 | LiveShow | 0x00000001007737dc _uv__async_send + 196 |
3 | LiveShow | 0x00000001007736b8 _uv_async_send + 76 |
The code I use: Thread A:
uv_loop_init(&_pd->_loop);
_pd->_loop.data = this;
uv_async_init(&_pd->_loop, &_pd->_cleanup_async, ALAKcpSession::_static_cleanup_async_cb);
_pd->_cleanup_async.data = this
uv_run(&(this->_pd->_loop), UV_RUN_DEFAULT);
Thread B:
uv_async_send(&_pd->_cleanup_async);
CallBack of async handle _static_cleanup_async_cb and release mehhod:
void Session::_static_cleanup_async_cb(uv_async_t *a)
{
Session* sess = (Session*) a->data;
if (sess && &(sess->_pd->_cleanup_async) == a) {
uv_stop(&(sess->_pd->_loop));
} else {
log_d("ERR -------- Session::_static_cleanup_async_cb");
}
}
void Session::_release_private_data()
{
#define RELEASE_UV_HANDLE(x) \
if (x) { \
uv_close((uv_handle_t*)(x), NULL); \
}
RELEASE_UV_HANDLE(&_pd->_cleanup_async);
int r = uv_loop_close(&_pd->_loop);
}
Please help me to find what I did wrong or some tips to help me to find what wrong.
uv_async_t
is implemented in terms of pipe(2). If you hit an abort while sending, that means writing to the pipe failed for some reason.
Can you apply this patch and see what it prints?
diff --git a/src/unix/async.c b/src/unix/async.c
index a5c47bca..e9b30e8c 100644
--- a/src/unix/async.c
+++ b/src/unix/async.c
@@ -179,6 +179,7 @@ static void uv__async_send(uv_loop_t* loop) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
+ printf("r=%d errno=%d\n", r, errno);
abort();
}
Forgot to mention, the behavior you describe sounds very much like one thread closing the handle while the other thread is still working with it but the patch will probably catch that. I expect that errno=9 (EBADF, bad file descriptor.)
bnoordhuis thanks for your reply first. May be I described not exactly clear. My case is run a uv loop in Thread A, and in thread B I use uv_async_send to notify the loop to stop. The callback method _static_cleanup_async_cb is called in Thread A, this method call uv_stop to stop the uv loop. After the uv_stop called, the async handle _pd->_cleanup_async closed in Thread A.
So my case is not one thread closing the handle while the other thread is still working with it. And most time my case work perfectly, but I got little crash through our online crash collection. I can not get the crash myself, and I don't know how to make this crash happen. That's why I am here to ask your help.
Okay, can you try the patch and see what it prints?
Thanks for your reply. I can not make this case myself, this only happened in the user of our iOS app. I can not get the r & errno
from uv__async_send except write those r & errno
to a file and then upload to our server. The solution need modify the source code of libuv and need a long time to update our iOS app through AppStore.
Can I just remove the abort() call in uv__async_send?
Is there any reasons abort must called in uv__async_send? Or any bad result would happen if I just remove the abort() call.
It's there for a reason ("the impossible happened") so remove at your own peril.
You're saying this only happened for one user of your app and you have no way to reproduce locally? Then there isn't much I can do either; the code snippet you posted is too little to go on.
Below is the whole code snippet except logic code, and please help me to find what I did wrong.
Sesson.h
enum SessionState
{
kSessionStateInit = 1 << 0,
kSessionStateHandsShake = 1 << 1,
kSessionStateAvailable = 1 << 2,
kSessionStateTimeout = 1 << 3,
kSessionStateClose = 1 << 4,
};
typedef struct SessionConf
{
/**
* sever domain host or ip
*/
uint8_t server_host[256];
/**
* server port
*/
uint16_t server_port;
/**
* heart beat interval milliseconds
*/
int hb_interval;
/**
* heart beat timeout milliseconds
*/
int hb_timeout;
/**
* hands shake timeout milliseconds
*/
int hs_timeout;
/**
* close timeout milliseconds
*/
int close_timeout;
/**
* enable speed or not, 0:disable, 1:enable
*/
int enable_speed;
/**
* kconvid
*/
uint32_t convid;
} SessionConf;
class SessionHandle;
/**
* @desc class Session
*/
class Session : public ThreadHandle
{
public:
Session(SessionHandle* handle);
~Session();
public:
// interface ThreadHandle
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
public:
int start_session(const char*url);
void stop_session();
// list is a data struct vector, for secret reason I use void* to represent.
// typedef std::list<RtpPacket *> RtpPacketList;
// int send_packet_buffers(RtpPacketList* list);
int send_packet_buffers(void* list);
private:
typedef struct private_data_s
{
uv_loop_t _loop;
uv_udp_t* _udp_sock;
uv_async_t _stop_async;
uv_async_t _write_async;
uv_async_t _cleanup_async;
uint32_t _state; // SessionState
SessionConf _conf;
struct sockaddr_in _saddr;
uint8_t* _socket_recv_buf;
uint32_t _socket_recv_buf_len;
} private_data_t;
private:
void _init_private_data();
void _release_private_data();
int _init_uv_udp_socket();
int _init_uv_udp_socket_with_ip(const char* ip);
void _send_heart_beat_packet();
void _send_close_pakcet();
private:
// uv callback
static void _static_stop_async_cb(uv_async_t* a);
static void _static_write_async_cb(uv_async_t* a);
static void _static_cleanup_async_cb(uv_async_t* a);
static void _static_udp_socket_recv_cb(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf,
const struct sockaddr* addr,
unsigned flags);
private:
static void _static_uv_alloc_buffer(uv_handle_t* handle, size_t size, uv_buf_t* buf);
private:
ThreadImp* _pth;
SessionHandle* _sess_handle;
private_data_t* _pd;
};
Session.cpp
int is_ip_address_string(const char* s)
{
if (s == NULL) {
return 0;
}
struct in_addr addr;
bool is_ip = inet_aton(s, &addr) == 0 ? false : true;
return is_ip ? 1 : 0;
}
void on_uv_close_socket(uv_handle_t* h)
{
free(h);
}
Session::Session(SessionHandle* handle) : _sess_handle(handle), _pd(NULL), _pth(NULL)
{
_pd = new private_data_t();
memset(_pd, 0, sizeof(private_data_t));
_pth = new ThreadImp("session.thread", this, 10, true);
}
Session::~Session()
{
/// FIX: uv_run maybe exist before uv_stop, cause all uv_handles is inactive.
uv_async_send(&_pd->_cleanup_async);
_pth->stop();
if (_pth) {
delete _pth;
_pth = NULL;
}
if (_pd) {
delete _pd;
_pd = NULL;
}
}
void Session::on_thread_start()
{
this->_init_private_data();
// do other create job here and send handshake packet using udp
// !!! Logic removed
}
int Session::on_before_cycle()
{
return 0;
}
int Session::cycle()
{
uv_run(&(this->_pd->_loop), UV_RUN_DEFAULT);
return 0;
}
int Session::on_end_cycle()
{
return 0;
}
void Session::on_thread_stop()
{
this->_release_private_data();
// do other destroy job
// !!! Logic removed
}
int Session::start_session(const char*url)
{
char *p, *col;
if (NULL == url) {
return -1;
}
#define UDP_SCHEMA "rtc://"
if (strncasecmp(url, UDP_SCHEMA, strlen(UDP_SCHEMA)) != 0) {
return -1;
}
p = (char*)(url + strlen(UDP_SCHEMA));
col = strchr(p, ':');
if (!col) {
return -1;
}
#define RTP_SERVER_HOST (256)
int hostlen = (int)(col - p);
if (hostlen < RTP_SERVER_HOST) {
strncpy((char*)_pd->_conf.server_host,p,hostlen);
_pd->_conf.server_host[hostlen] = '\0';
} else {
return -1;
}
col++;
unsigned int port = atoi(col);
if (port > 65535) {
return -1;
} else {
_pd->_conf.server_port = port;
}
_pth->start();
return 0;
}
void Session::stop_session()
{
_pth->stop_loop();
if (_pd->_state == 0) {
return ;
}
if (_pd->_stop_async.data == NULL) {
return ;
}
int r = uv_async_send(&_pd->_stop_async);
if (r < 0) {
}
}
//// WAR: when invoke send_packet_buffers thread may not started!
int Session::send_packet_buffers(void* list)
{
if (list == NULL) {
return -1;
}
// if (list->size() == 0) {
// return 0;
// }
if (!_pth->can_loop()) {
return -1;
}
// !!! Logic comment for compile
// while (list->size() > 0) {
// RtpPacket* p = list->front();
// list->pop_front();
// _queue->push_back(p);
// }
//
// if (_pd->_state != kSessionStateAvailable) {
// return 0;
// }
//
// // WAR: aysn write handle not inited!!
// if (_pd->_write_async.data) {
//
// int r = uv_async_send(&_pd->_write_async);
// if (r < 0) {
// }
//
// } else {
// }
return 0;
}
// --------------------------------------
// private function
// --------------------------------------
void Session::_init_private_data()
{
if (_pd == NULL) {
return ;
}
int ret = 1;
// loop
uv_loop_init(&_pd->_loop);
_pd->_loop.data = this;
// udp sock
ret = this->_init_uv_udp_socket();
// stop async
ret = uv_async_init(&_pd->_loop, &_pd->_stop_async, Session::_static_stop_async_cb);
_pd->_stop_async.data = this;
// write async
ret = uv_async_init(&_pd->_loop, &_pd->_write_async, Session::_static_write_async_cb);
_pd->_write_async.data = this;
// _cleanup_async
ret = uv_async_init(&_pd->_loop, &_pd->_cleanup_async, Session::_static_cleanup_async_cb);
_pd->_cleanup_async.data = this;
this->_pd->_state = kSessionStateInit;
}
void Session::_release_private_data()
{
#define RELEASE_UV_HANDLE(x) \
if (x) { \
uv_close((uv_handle_t*)(x), NULL); \
}
RELEASE_UV_HANDLE(_pd->_udp_sock);
RELEASE_UV_HANDLE(&_pd->_stop_async);
RELEASE_UV_HANDLE(&_pd->_write_async);
RELEASE_UV_HANDLE(&_pd->_cleanup_async);
uv_run(&(this->_pd->_loop), UV_RUN_ONCE);
int r = uv_loop_close(&_pd->_loop);
#undef RELEASE_UV_HANDLE
if (_pd->_udp_sock) {
free(_pd->_udp_sock);
_pd->_udp_sock = NULL;
}
}
int Session::_init_uv_udp_socket()
{
// init udp socket
int ret = -1;
if (_pd->_udp_sock && _pd->_udp_sock->data) {
uv_close((uv_handle_t*) _pd->_udp_sock, on_uv_close_socket);
_pd->_udp_sock->data = NULL;
}
_pd->_udp_sock = (uv_udp_t*) malloc(sizeof(uv_udp_t));
ret = uv_udp_init(&_pd->_loop, _pd->_udp_sock);
_pd->_udp_sock->data = this;
if (ret != 0) {
return ret;
}
if (is_ip_address_string((const char*) _pd->_conf.server_host)) {
// ip
ret = this->_init_uv_udp_socket_with_ip((const char*) _pd->_conf.server_host);
} else {
// host
// resolve host;;
struct addrinfo *result, *rp;
ret = getaddrinfo((const char*)_pd->_conf.server_host, NULL, NULL, &result);
if (ret != 0) {
return ret;
}
char buf[256] = {0};
for (rp = result; rp != NULL; rp = rp->ai_next) {
if (rp->ai_family == AF_INET) {
if (inet_ntop(rp->ai_family, &(((struct sockaddr_in*)(rp->ai_addr))->sin_addr), buf, sizeof(buf))) {
break;
}
}
} // end for
freeaddrinfo(result);
ret = this->_init_uv_udp_socket_with_ip(buf);
}
return ret;
}
int Session::_init_uv_udp_socket_with_ip(const char *ip)
{
int ret = -1;
ret = uv_ip4_addr(ip/*(const char*) _pd->_conf.server_host*/, _pd->_conf.server_port, (struct sockaddr_in*)&(_pd->_saddr));
if (ret != 0) {
return ret;
}
ret = uv_udp_recv_start(_pd->_udp_sock, _static_uv_alloc_buffer, Session::_static_udp_socket_recv_cb);
if (ret != 0) {
return ret;
}
return 0;
}
void Session::_send_heart_beat_packet()
{
if (_pd->_state != kSessionStateAvailable) {
return ;
}
/// !!! Logic removed
/// send handskake packet using udp
}
void Session::_send_close_pakcet()
{
if (_pd->_state == kSessionStateClose || _pd->_state == kSessionStateInit || _pd->_state == kSessionStateTimeout) {
return ;
}
/// !!! Logic removed
/// send close packet using udp
}
// --------------------------------------
// uv callback
// --------------------------------------
void Session::_static_write_async_cb(uv_async_t *a)
{
Session* sess = (Session*) a->data;
if (sess && &(sess->_pd->_write_async) == a) {
/// !!! Logic removed
// send RTPPacket using udp
} else {
}
}
void Session::_static_cleanup_async_cb(uv_async_t *a)
{
Session* sess = (Session*) a->data;
if (sess && &(sess->_pd->_cleanup_async) == a) {
uv_stop(&(sess->_pd->_loop));
} else {
}
}
void Session::_static_stop_async_cb(uv_async_t *a)
{
Session* sess = (Session*) a->data;
if (sess && &(sess->_pd->_stop_async) == a) {
sess->_send_close_pakcet();
} else {
}
}
void Session::_static_udp_socket_recv_cb(uv_udp_t *handle,
ssize_t nread,
const uv_buf_t *buf,
const struct sockaddr *addr,
unsigned int flags)
{
Session* sess = (Session*) handle->data;
if (sess->_pd->_udp_sock == handle) {
/// !!! Logic removed
// process data received
} else {
}
}
void Session::_static_uv_alloc_buffer(uv_handle_t *handle, size_t size, uv_buf_t *buf)
{
Session* sess = (Session*) handle->data;
if (sess) {
buf->base = (char*) sess->_pd->_socket_recv_buf;
buf->len = sess->_pd->_socket_recv_buf_len;
}
}
thread.cpp
ThreadImp::ThreadImp(const char* name, ThreadHandle* handle, int interval_ms, int joinable)
{
_name = name;
_handle = handle;
_interval_ms = interval_ms;
_loop = 0;
_pth = 0;
_joinable = joinable;
_status = THREAD_INIT;
_once = ONCE_STATE_UNINITIALIZED;
}
ThreadImp::~ThreadImp()
{
stop();
}
int ThreadImp::start()
{
if (_pth)
{
return 0;
}
_loop = 1;
if (ATOM_CAS(&_once, ONCE_STATE_UNINITIALIZED, ONCE_STATE_DONE)) {
pthread_create(&_pth, NULL, thread_func, this);
} else {
}
return 0;
}
int ThreadImp::stop()
{
if (_status == THREAD_INIT)
{
return 0;
}
_loop = 0;
if (_joinable && _pth)
{
pthread_join(_pth, NULL);
}
if (!_joinable && _pth)
{
pthread_detach(_pth);
}
_status = THREAD_INIT;
_pth = 0;
return 0;
}
void ThreadImp::stop_loop()
{
_loop = 0;
}
void* ThreadImp::thread_func(void* param)
{
ThreadImp* pthis = (ThreadImp*)param;
pthread_setname_np(pthis->_name);
pthis->thread_cycle();
return NULL;
}
void ThreadImp::thread_cycle()
{
assert(_handle);
_handle->on_thread_start();
_status = THREAD_RUNING;
while (_loop)
{
if (_handle->on_before_cycle() != 0)
{
break;
}
if (_handle->cycle() != 0)
{
break;
}
if (_handle->on_end_cycle() != 0)
{
break;
}
if (_interval_ms)
{
usleep(_interval_ms * 1000);
}
}
_status = THREAD_STOP;
_handle->on_thread_stop();
}
int ThreadImp::can_loop()
{
return _loop;
}
int ThreadImp::is_run()
{
return _status == THREAD_RUNING;
}
const char* ThreadImp::get_thread_name()
{
return _name;
}
Client.cpp
class Client : public SessionHandle
{
public:
Client();
~Client();
public:
int send_video_rtp_packet(const uint8_t* data, uint32_t len, uint32_t timestamp, uint32_t abs_ts);
int send_audio_rtp_packet(const uint8_t* data, uint32_t len, uint32_t timestamp);
int start_session(const char*url);
void stop_session();
Session* session();
public:
// ALAKcpSessionHandle interface
virtual void on_hands_shake(int errcode, int32_t time_delta, int32_t handshake_duration);
virtual void on_heart_beat_timeout(int errcode);
virtual void on_close(int errcode);
virtual void on_online(uint32_t room_id, uint8_t line_id, uint64_t uid);
virtual void on_payload_data(uint8_t* buff, int len);
private:
private:
Session* _sess;
};
Client::Client() : _sess(NULL)
{
_sess = new Session(this);
}
Client::~Client()
{
if (_sess) {
delete _sess;
_sess = NULL;
}
}
int Client::send_audio_rtp_packet(const uint8_t *data, uint32_t len, uint32_t timestamp)
{
// comment for compile reason
// if (data == NULL || len == 0) {
// return -1;
// }
// int ret = 0;
// RtpPacketList* list = NULL;
// ret = _packer->PacketizeAudio((uint8_t*) data, len, timestamp, &list);
// if (ret == 0 && list != NULL) {
// int r = _sess->send_packet_buffers(list);
// if (r == -1) {
// }
// } else {
// }
// ReleasePacketList(&list);
return 0;
}
int Client::send_video_rtp_packet(const uint8_t *data, uint32_t len, uint32_t timestamp, uint32_t abs_ts)
{
if (data == NULL || len == 0) {
return -1;
}
// comment for compile reason
// int ret = 0;
// RtpPacketList* list = NULL;
// ret = _packer->PacketizeVideo((uint8_t*) data, len, timestamp, abs_ts, &list);
// if (ret == 0 && list != NULL) {
// int r = _sess->send_packet_buffers(list);
// if (r == -1) {
// }
// } else {
// }
// ReleasePacketList(&list);
return 0;
}
int Client::start_session(const char*url)
{
return _sess->start_session(url);
}
void Client::stop_session()
{
_sess->stop_session();
}
Session* Client::session()
{
return _sess;
}
// --------------------------------------
// SessionHandle callback
// --------------------------------------
void Client::on_hands_shake(int errcode, int32_t time_delta, int32_t handshake_duration)
{
/// !!! Logic removed
}
void Client::on_close(int errcode)
{
/// !!! Logic removed
}
void Client::on_online(uint32_t room_id, uint8_t line_id, uint64_t uid)
{
/// !!! Logic removed
}
void Client::on_heart_beat_timeout(int errcode)
{
/// !!! Logic removed
}
void Client::on_payload_data(uint8_t* buff, int len)
{
/// !!! Logic removed
}
The instanse of class Client is running in thread B, it will create a Thread A through class Session's startSession method, and the uv loop is runned in Thread A. Our app use the instanse of class Client to send video and audio data through Thread B and uv loop in class Session. When app release Thread B and the instase of class Client, which will release the Session object holded in the instanse of class Client, in the destructor of Session class, I use uv_async_send to notify Thread B to stop the loop.
This is all my code logic except some bussiness code, does I do something wrong?
I skimmed your code and I didn't see anything obviously wrong but I'm also not paid to fix people's bugs, only libuv's bugs. :-)
If you have a reduced test case, I'm happy to look at that. Otherwise, I'll move this over to libuv/help.
That's my case and we can not make the abort happen. So I reduce the test case is useless. Thanks for your help anyway.
Okay, thanks. I've moved the issue, maybe someone else has an idea.
Hi bnoordhuis, I have got a hint from the crashs we got, when uv_async_send hit abort, all the user's phone got low memory.
So, I guess if is there possible write to the pipe fd get low memory error, I mean the errno is ENOMEM.
@frostfeng Sorry, missed your comment. ENOMEM sounds plausible.
I'm inclined to say aborting is the right course of action here because the alternative is that the application gets stuck because the async event is lost.
I suppose libuv could back off and retry but I don't know if that's really an improvement. Can you try this patch and see if it helps?
diff --git a/src/unix/async.c b/src/unix/async.c
index a5c47bca..36efaf09 100644
--- a/src/unix/async.c
+++ b/src/unix/async.c
@@ -168,18 +168,25 @@ static void uv__async_send(uv_loop_t* loop) {
}
#endif
- do
- r = write(fd, buf, len);
- while (r == -1 && errno == EINTR);
+ for (;;) {
+ do
+ r = write(fd, buf, len);
+ while (r == -1 && errno == EINTR);
- if (r == len)
- return;
+ if (r == len)
+ return;
+
+ if (r != -1)
+ abort();
- if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
- abort();
+ if (errno != ENOMEM)
+ abort();
+
+ usleep(1);
+ }
}