hiredis icon indicating copy to clipboard operation
hiredis copied to clipboard

libhiredis with libev and custom event loop

Open gri6507 opened this issue 4 years ago • 3 comments

I posted this question on StackOverflow, but have not received an answer. Perhaps this is a better place to ask my question.

My application uses libhiredis with libev backend. I need to send Redis async commands and process the resulting Redis async callback. However, unlike the simple example from here I cannot use the default event loop. The following code approximates the example with a custom event loop. However, when compiled with only the redisLibevAttach() induced libev io watcher, the event loop thread terminates immediately. You can see this by running

g++ -g -std=c++11 -Wall -Wextra -Werror hiredis_ev.cpp -o hiredis_ev -lpthread -lhiredis -lev && gdb ./hiredis_ev

where GDB happily prints that a new thread is created and almost immediately terminates. This is further confirmed by running info thread in GDB which does not show my_ev_loop. However, if I change the code to add any other libev watcher, like a timer, then everything is good. You can see this by running

g++ -g -DTIMER -std=c++11 -Wall -Wextra -Werror hiredis_ev.cpp -o hiredis_ev -lpthread -lhiredis -lev &&  ./hiredis_ev

I should not need a dummy libev timer to keep the event loop running. What am I missing?

#include <iostream>
#include <thread>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libev.h>

static struct ev_loop *loop = nullptr;
static void redis_async_cb(redisAsyncContext *, void *, void *)
{
  std::cout << "Redis async callback" << std::endl;
  fflush(nullptr);
}

#ifdef TIMER
  static ev_timer timer_w;
  static void ev_timer_cb(EV_P_ ev_timer *, int)
  {
    std::cout << "EV timer callback" << std::endl;
    fflush(nullptr);
  }
#endif

int main()
{
  loop = ev_loop_new(EVFLAG_AUTO);

#ifdef TIMER
  ev_timer_init(&timer_w, ev_timer_cb, 0, 0.1);
  ev_timer_start(loop, &timer_w);
#endif

  redisAsyncContext* async_context = redisAsyncConnect("localhost", 6379);
  if (nullptr == async_context)
  {
    throw std::runtime_error("No redis async context");
  }
  
  redisLibevAttach(loop, async_context);
  std::thread ev_thread(ev_run, loop, 0); 
  pthread_setname_np(ev_thread.native_handle(), "my_ev_loop");
  ev_thread.detach();

  // Give the event loop time to start
  while (!ev_iteration(loop))
  {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }

  // Send a SUBSCRIBE message which should generate an async callback
  if (REDIS_OK != redisAsyncCommand(async_context, redis_async_cb, nullptr, "SUBSCRIBE foo"))
  {
    throw std::runtime_error("Could not issue redis async command");
  }
  std::cout << "Waiting for async callback" << std::endl;
  fflush(nullptr);
    fflush(nullptr);

  // Wait forever (use CTRL-C to terminate)
  while (true)
  {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }

  return 0;
}

gri6507 avatar May 10 '20 20:05 gri6507

Move your SUBSCRIBE call before you spin off the thread and start the loop and it works

#include <iostream>
#include <thread>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libev.h>

static struct ev_loop *loop = nullptr;
static void redis_async_cb(redisAsyncContext *, void *, void *)
{
    std::cout << "Redis async callback" << std::endl;
    fflush(nullptr);
}

#ifdef TIMER
static ev_timer timer_w;
static void ev_timer_cb(EV_P_ ev_timer *, int) {
    std::cout << "EV timer callback" << std::endl;
    fflush(nullptr);
}
#endif

int main() {
    loop = ev_loop_new(EVFLAG_AUTO);

#ifdef TIMER
    ev_timer_init(&timer_w, ev_timer_cb, 0, 0.1);
    ev_timer_start(loop, &timer_w);
#endif

    redisAsyncContext* async_context = redisAsyncConnect("localhost", 6379);
    if (nullptr == async_context) {
        throw std::runtime_error("No redis async context");
    }

    redisLibevAttach(loop, async_context);
    
    // Send a SUBSCRIBE message which should generate an async callback
    if (REDIS_OK != redisAsyncCommand(async_context, redis_async_cb, nullptr, "SUBSCRIBE foo")) {
        throw std::runtime_error("Could not issue redis async command");
    }

    std::thread ev_thread(ev_loop, loop, 0); 
    pthread_setname_np(ev_thread.native_handle(), "my_ev_loop");
    ev_thread.detach();

    // Give the event loop time to start
    while (!ev_iteration(loop)) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    std::cout << "Waiting for async callback" << std::endl;
    fflush(nullptr);
    fflush(nullptr);

    // Wait forever (use CTRL-C to terminate)
    while (true) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    return 0;
}

michael-grunder avatar May 10 '20 22:05 michael-grunder

Move your SUBSCRIBE call before you spin off the thread and start the loop and it works

@michael-grunder - Thank you! that does indeed work. But I for my posterity, can you please explain why it does? I was under the impression that simply starting an ev watcher, which is what happens when redisLibevAttach() is called, is sufficient. I think what this implies is that an io watcher must also have something to process before the loop starts.

gri6507 avatar May 11 '20 12:05 gri6507

It's a good question actually.

From a bit of digging, it looks like we're not starting the watchers in redisLibevAttach, just initializing them with ev_io_init.

Once a command is sent, we execute logic like:

static void redisLibevAddWrite(void *privdata) {
    redisLibevEvents *e = (redisLibevEvents*)privdata;
    struct ev_loop *loop = e->loop;
    ((void)loop);
    if (!e->writing) {
        e->writing = 1;
        ev_io_start(EV_A_ &e->wev);
    }
}

Which starts the watcher. If I add a call to ev_io_start(EV_A_ &e->wev) in redisLibevAttach I can put the subscribe before or after (or both) and it works.

I didn't write most of the async functionality so I'll need to spend a bit more time with it to see whether tweaking the logic is appropriate.

michael-grunder avatar May 11 '20 17:05 michael-grunder