hiredis icon indicating copy to clipboard operation
hiredis copied to clipboard

async.c:451: redisProcessCallbacks: Assertion `(c->flags & 0x20 || c->flags & 0x40)' failed.

Open Nightaway opened this issue 7 years ago • 8 comments

redis-adapter.h

#ifndef __DRAGON_REDIS_ADAPTER_H__
#define __DRAGON_REDIS_ADAPTER_H__

#include <stdlib.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>

typedef struct redisEvents {
    redisAsyncContext *context;
    int epfd;
} redisEvents;

static void redisReadEvent(redisEvents *e) {
    redisAsyncHandleRead(e->context);
}

static void redisWriteEvent(redisEvents *e) {
    redisAsyncHandleWrite(e->context);
}

static void redisAddRead(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	epoll_event event;
	event.data.fd = e->context->c.fd;
	event.events = EPOLLIN;
	int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
	if (ret == -1) {
		if (errno == EEXIST) {
			ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
		}
		if (ret  == -1) {
			perror("redisAddRead epoll_ctl");
		}
		return ;
	}
}

static void redisDelRead(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
	if (ret == -1) {
		perror("redisDelRead epoll_ctl");
		return ;
	}

}

static void redisAddWrite(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	epoll_event event;
	event.data.fd = e->context->c.fd;
	event.events = EPOLLOUT;
	int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
	if (ret == -1) {
		if (errno == EEXIST) {
			ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
		}
		if (ret  == -1) {
			perror("redisAddWrite epoll_ctl");
		}
		return ;
	}
}

static void redisDelWrite(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
	if (ret == -1) {
		perror("redisDelWrite epoll_ctl");
		return ;
	}
}

static void redisCleanup(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;
    redisDelRead(privdata);
    redisDelWrite(privdata);
    free(e);
}

static int redisAttach(redisAsyncContext *ac, int epfd) {
    redisContext *c = &(ac->c);
    redisEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisEvents*)malloc(sizeof(*e));
    e->context = ac;
    e->epfd = epfd;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisAddRead;
    ac->ev.delRead = redisDelRead;
    ac->ev.addWrite = redisAddWrite;
    ac->ev.delWrite = redisDelWrite;
    ac->ev.cleanup = redisCleanup;
    ac->ev.data = e;

    /* Initialize read/write events */
	epoll_event event;
	event.data.fd = c->fd;
	event.events = EPOLLIN|EPOLLOUT;
	int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, c->fd, &event);
	if (ret == -1) {
		perror("redisAttach epoll_ctl");
		return REDIS_ERR;
	}
    return REDIS_OK;
}

#endif

server.cc

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>

#include "redis-adapter.h"

int sockfd;
int epfd;
epoll_event event;
redisAsyncContext *c_sub;

static void subCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) return;
}

int CreateSocketAndBind(int port) {
	struct sockaddr_in servaddr;
	sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd == -1) {
		perror("socket:");
		return -1;
	}
	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(port);
	int ret = bind(sockfd, (sockaddr *)&servaddr, sizeof(servaddr));
	if (ret == -1) {
		perror("bind:");
		return -1;
	}
	return 0;
}
int MakeSocketNonbloking(int fd) {
	int flags = fcntl(fd, F_GETFL, 0);
	if (flags == -1) {
		perror("fcntl");
		return -1;
	}
 	
 	flags|= O_NONBLOCK;
  	int s = fcntl(fd, F_SETFL, flags);
	if (s == -1) {
		perror("fcntl");
		return-1;
	}
	return 0;
}

int InitEpoll() {
	int ret = listen(sockfd, SOMAXCONN);
	if (ret != 0) {
		perror("listen:");
		exit(1);
	}
	ret = MakeSocketNonbloking(sockfd);
	if (ret != 0) {
		perror("MakeSocketNonbloking:");
		exit(1);
	}
	epfd = epoll_create(65535);
	if (epfd == -1) {
		perror("epoll_create:");
		exit(1);
	}

	event.data.fd = sockfd;
	event.events = EPOLLIN;
	ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
	if (ret == -1) {
		perror("epoll_ctl");
		exit(1);
	}

	c_sub = redisAsyncConnect("127.0.0.1", 6379);
	if (c_sub->err) {
	    printf("Error: %s\n", c_sub->errstr);
	    exit(1);
	}
	redisAttach(c_sub, epfd);
}

int Run() {
	epoll_event events[64];
	for (int z=0; z<10; ++z) {
		redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %d", z);
		redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %d", z);
	}
	while (1) {
		int count = epoll_wait(epfd, events, 64, 0);
		if (count == -1) {
			perror("epoll_wait:");
			exit(1);
		} 

		for (int i=0; i<count; i++) {
			 if (events[i].data.fd == c_sub->c.fd) {
				printf("Redis sub event %d\n", getpid());
				if (events[i].events & EPOLLIN) {
					redisAsyncHandleRead(c_sub);
				} else if (events[i].events & EPOLLOUT) {
					redisAsyncHandleWrite(c_sub);
				}
			}
		}
	}
	close(epfd);
}

int main() {
	CreateSocketAndBind(9090);
	InitEpoll();
	Run();
	return 0;
}

I am coding a IM Server using redis sub/pub pattern, server.cc crash on assert occasionally while client forking 100 client processes and use ctrl + c to close client processes. if u comment server.cc redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %s", name); and redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str()); problem will go away.

Nightaway avatar Nov 26 '16 07:11 Nightaway

This is a huge example of code I absolutely don't know. Plus on first sight it's completely unclear what the client.cc is for, it doesn't touch Redis/Hiredis at all.

Please provide a reduced example triggering the error.

badboy avatar Nov 26 '16 11:11 badboy

code reduced. I use redis as a message broker.

Nightaway avatar Nov 28 '16 03:11 Nightaway

I find problem that is UNSUBSCRIBE command call. The problem gone away when comment redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str());.

Nightaway avatar Nov 28 '16 07:11 Nightaway

There are still hundreds of lines of code. What exactly did you reduce?

badboy avatar Nov 28 '16 08:11 badboy

Reduced again

Nightaway avatar Nov 28 '16 08:11 Nightaway

I'm experiencing the same failed assertion. My code is nearly identical to the pub / sub sample in the wiki, with only an additional call to unsubscribe.

Once I call:

auto status = redisAsyncCommand(context_, nullptr, nullptr, "UNSUBSCRIBE %s", key.c_str());

After the onMessage function executes for the unsubscribe response, I get the failed assertion:

async.c:463: redisProcessCallbacks: Assertion (c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

Is there anything I need to do before unsubscribing to prevent this?

kbirk avatar Oct 31 '18 23:10 kbirk

After some digging I found a potential cause of the issue. I found the issue occurs when my program does the following:

  • Subscribe to a channel
  • Poll libevent for messages, process the subscribe and message replies
  • Unsubscribe from the channel
  • Subscribe to a different channel
  • Poll libevent, assert fails after processing the unsubscribe reply

It seems the only way I can prevent the failed assertion is by not subscribing to a new channel until I've processed a unsubscribe reply from the previous.

Is this intended? I was under the impression that you could have multiple subscriptions on a single context.

kbirk avatar Nov 01 '18 22:11 kbirk

I also faced the same problem. After I have unsubscribed all the channel and then call redisAsyncHandleRead, hiredis assert failed at async.c: 478. I use hiredis 0.14.1 version. And I didn't use multithread.

ZezhongWang avatar Apr 27 '20 09:04 ZezhongWang

This issue seems to describe what is fixed on master by #1036.

bjosv avatar Sep 05 '22 14:09 bjosv

I think you're right. Going to close this but if the problem isn't actually solved feel free to reopen

michael-grunder avatar Sep 05 '22 22:09 michael-grunder