hiredis
hiredis copied to clipboard
async.c:451: redisProcessCallbacks: Assertion `(c->flags & 0x20 || c->flags & 0x40)' failed.
redis-adapter.h
#ifndef __DRAGON_REDIS_ADAPTER_H__
#define __DRAGON_REDIS_ADAPTER_H__
#include <stdlib.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
typedef struct redisEvents {
redisAsyncContext *context;
int epfd;
} redisEvents;
static void redisReadEvent(redisEvents *e) {
redisAsyncHandleRead(e->context);
}
static void redisWriteEvent(redisEvents *e) {
redisAsyncHandleWrite(e->context);
}
static void redisAddRead(void *privdata) {
redisEvents *e = (redisEvents*)privdata;
epoll_event event;
event.data.fd = e->context->c.fd;
event.events = EPOLLIN;
int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
if (ret == -1) {
if (errno == EEXIST) {
ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
}
if (ret == -1) {
perror("redisAddRead epoll_ctl");
}
return ;
}
}
static void redisDelRead(void *privdata) {
redisEvents *e = (redisEvents*)privdata;
int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
if (ret == -1) {
perror("redisDelRead epoll_ctl");
return ;
}
}
static void redisAddWrite(void *privdata) {
redisEvents *e = (redisEvents*)privdata;
epoll_event event;
event.data.fd = e->context->c.fd;
event.events = EPOLLOUT;
int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
if (ret == -1) {
if (errno == EEXIST) {
ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
}
if (ret == -1) {
perror("redisAddWrite epoll_ctl");
}
return ;
}
}
static void redisDelWrite(void *privdata) {
redisEvents *e = (redisEvents*)privdata;
int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
if (ret == -1) {
perror("redisDelWrite epoll_ctl");
return ;
}
}
static void redisCleanup(void *privdata) {
redisEvents *e = (redisEvents*)privdata;
redisDelRead(privdata);
redisDelWrite(privdata);
free(e);
}
static int redisAttach(redisAsyncContext *ac, int epfd) {
redisContext *c = &(ac->c);
redisEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redisEvents*)malloc(sizeof(*e));
e->context = ac;
e->epfd = epfd;
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisAddRead;
ac->ev.delRead = redisDelRead;
ac->ev.addWrite = redisAddWrite;
ac->ev.delWrite = redisDelWrite;
ac->ev.cleanup = redisCleanup;
ac->ev.data = e;
/* Initialize read/write events */
epoll_event event;
event.data.fd = c->fd;
event.events = EPOLLIN|EPOLLOUT;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, c->fd, &event);
if (ret == -1) {
perror("redisAttach epoll_ctl");
return REDIS_ERR;
}
return REDIS_OK;
}
#endif
server.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include "redis-adapter.h"
int sockfd;
int epfd;
epoll_event event;
redisAsyncContext *c_sub;
static void subCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
if (reply == NULL) return;
}
int CreateSocketAndBind(int port) {
struct sockaddr_in servaddr;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket:");
return -1;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
int ret = bind(sockfd, (sockaddr *)&servaddr, sizeof(servaddr));
if (ret == -1) {
perror("bind:");
return -1;
}
return 0;
}
int MakeSocketNonbloking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl");
return -1;
}
flags|= O_NONBLOCK;
int s = fcntl(fd, F_SETFL, flags);
if (s == -1) {
perror("fcntl");
return-1;
}
return 0;
}
int InitEpoll() {
int ret = listen(sockfd, SOMAXCONN);
if (ret != 0) {
perror("listen:");
exit(1);
}
ret = MakeSocketNonbloking(sockfd);
if (ret != 0) {
perror("MakeSocketNonbloking:");
exit(1);
}
epfd = epoll_create(65535);
if (epfd == -1) {
perror("epoll_create:");
exit(1);
}
event.data.fd = sockfd;
event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
if (ret == -1) {
perror("epoll_ctl");
exit(1);
}
c_sub = redisAsyncConnect("127.0.0.1", 6379);
if (c_sub->err) {
printf("Error: %s\n", c_sub->errstr);
exit(1);
}
redisAttach(c_sub, epfd);
}
int Run() {
epoll_event events[64];
for (int z=0; z<10; ++z) {
redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %d", z);
redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %d", z);
}
while (1) {
int count = epoll_wait(epfd, events, 64, 0);
if (count == -1) {
perror("epoll_wait:");
exit(1);
}
for (int i=0; i<count; i++) {
if (events[i].data.fd == c_sub->c.fd) {
printf("Redis sub event %d\n", getpid());
if (events[i].events & EPOLLIN) {
redisAsyncHandleRead(c_sub);
} else if (events[i].events & EPOLLOUT) {
redisAsyncHandleWrite(c_sub);
}
}
}
}
close(epfd);
}
int main() {
CreateSocketAndBind(9090);
InitEpoll();
Run();
return 0;
}
I am coding a IM Server using redis sub/pub pattern, server.cc crash on assert occasionally while client forking 100 client processes and use ctrl + c to close client processes. if u comment server.cc redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %s", name); and redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str()); problem will go away.
This is a huge example of code I absolutely don't know. Plus on first sight it's completely unclear what the client.cc
is for, it doesn't touch Redis/Hiredis at all.
Please provide a reduced example triggering the error.
code reduced. I use redis as a message broker.
I find problem that is UNSUBSCRIBE command call. The problem gone away when comment redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str());.
There are still hundreds of lines of code. What exactly did you reduce?
Reduced again
I'm experiencing the same failed assertion. My code is nearly identical to the pub / sub sample in the wiki, with only an additional call to unsubscribe.
Once I call:
auto status = redisAsyncCommand(context_, nullptr, nullptr, "UNSUBSCRIBE %s", key.c_str());
After the onMessage
function executes for the unsubscribe response, I get the failed assertion:
async.c:463: redisProcessCallbacks: Assertion
(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`
Is there anything I need to do before unsubscribing to prevent this?
After some digging I found a potential cause of the issue. I found the issue occurs when my program does the following:
- Subscribe to a channel
- Poll
libevent
for messages, process thesubscribe
andmessage
replies - Unsubscribe from the channel
- Subscribe to a different channel
- Poll
libevent
, assert fails after processing theunsubscribe
reply
It seems the only way I can prevent the failed assertion is by not subscribing to a new channel until I've processed a unsubscribe
reply from the previous.
Is this intended? I was under the impression that you could have multiple subscriptions on a single context.
I also faced the same problem. After I have unsubscribed all the channel and then call redisAsyncHandleRead, hiredis assert failed at async.c: 478. I use hiredis 0.14.1 version. And I didn't use multithread.
This issue seems to describe what is fixed on master by #1036.
I think you're right. Going to close this but if the problem isn't actually solved feel free to reopen