blog icon indicating copy to clipboard operation
blog copied to clipboard

Redis 的事件驱动

Open junnplus opened this issue 5 years ago • 4 comments

Redis server 是一个基于 reactor 模式的事件驱动程序。

事件驱动,简单理解就是有一个事件循环监听事件,当事件发生时会触发相应的处理。另外常见的编程范式有(单线程)同步以及多线/进程编程。

那什么是 reactor 模式?

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. Each service in an application may consist of serveral methods and is represented by a separate event handler that is responsible for dispatching service-specific requests. Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer. http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

事件驱动会同时阻塞监听多个事件源以等待事件的发生,多个事件源可能同时达到,一个事件源也可能同时达到多个事件,reactor 模式可以同时处理多个到达的事件,但一次只多路分解一个事件并分派给相应事件处理程序。

Redis 通过 aeEventLoop 来管理事件循环。

// 5.0 src/ae.h

/* State of an event based program */
typedef struct aeEventLoop {
    // 当前注册的最大文件描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 可监听的最大文件描述符数
    int setsize; /* max number of file descriptors tracked */
    // 下一个时间事件的ID
    long long timeEventNextId;
    // 用于系统时间的矫正
    time_t lastTime;     /* Used to detect system clock skew */
    // 已注册的文件事件数组
    aeFileEvent *events; /* Registered events */
    // 已就绪的文件事件数组
    aeFiredEvent *fired; /* Fired events */
    // 时间事件链表头
    aeTimeEvent *timeEventHead;
    // 事件循环结束标示
    int stop;
    // 不同平台 I/O 多路复用API的数据
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

reactor 模式的结构: image

下面通过 reactor 模式四个部分结构来介绍 Redis 的事件驱动。

  • Handlers
  • Synchronous Event Demultiplexer
  • Initiation Dispatcher
  • Event Handler (Concrete Event Handler)

junnplus avatar Apr 25 '20 09:04 junnplus

Handlers

Identify resources that are managed by an OS.

handler 可以是打开的文件、一个连接(Socket)、Timer 等。这些 handler 可以被用在 Synchronous Event Demultiplexer中,以监听 handler 中发生的事件。

Redis 支持两种 handler,文件描述符和计时器,对应文件事件和时间事件。

// 5.0 src/ae.h

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

对于文件事件,在 aeEventLoop 中的 event 表示,aeFileEvent 结构体 mask 表示监听的事件类型,rfileProc 和 wfileProc 对应事件类型 AE_READABLE 和 AE_WRITABLE 的处理函数。文件事件关联的文件描述符 fd 是 aeEventLoop.events 的下标索引。

对于时间事件,是在 aeEventLoop 维护的一个链表 timeEventHead,每个结点为 aeTimeEvent 结构体,主要包含事件的ID(递增)、就绪的时间,处理函数、清理函数、client 数据等。

junnplus avatar Apr 25 '20 09:04 junnplus

Synchronous Event Demultiplexer

Blocks awaiting events to occur on a set of Handles.

阻塞等待事件一般使用 I/O 多路复用技术实现,I/O 多路复用允许我们同时检查多个文件描述符上的 I/O 就绪状态。

不同平台有自己实现的 I/O 多路复用,

  • ae_epoll.c:Linux 平台
  • ae_kqueue.c:BSD 平台
  • ae_evport.c:Solaris 平台
  • ae_select.c:其他 Unix 平台

Redis 为每个 I/O 多路复用函数库都实现了相同的 API。

static int aeApiCreate(aeEventLoop *eventLoop);

static int aeApiResize(aeEventLoop *eventLoop, int setsize);

static void aeApiFree(aeEventLoop *eventLoop);

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);

Redis 在源码中用 #include 宏定义了相应的规则,程序会在编译时自动选择系统中性能最高的 I/O 多路复用函数库来作为 Redis 的 I/O 多路复用程序的底层实现:

// 5.0 src/ae.c

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

epoll 实现

writing...

junnplus avatar Apr 25 '20 09:04 junnplus

Initiation Dispatcher

Defines an interface for registering, removing, and dispatching Event Handlers.

aeCreateEventLoop 创建并初始化事件循环实例。

// 5.0 src/ae.c

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

文件事件

aeCreateFileEvent 注册一个文件事件监听。

// 5.0 src/ae.c

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

将给定文件描述符 fd 的给定事件加入到 I/O 多路复用程序的监听范围之内,并对事件和事件处理器进行关联。

aeDeleteFileEvent 删除一个文件事件监听。

// 5.0 src/ae.c

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;

    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
     * is removed. */
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;

    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}

让 I/O 多路复用程序取消对给定文件描述符的给定事件的监听,并取消事件和事件处理器之间的关联。

时间事件

aeCreateTimeEvent 注册一个时间事件。

// 5.0 src/ae.c

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
    long cur_sec, cur_ms, when_sec, when_ms;

    aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec + milliseconds/1000;
    when_ms = cur_ms + milliseconds%1000;
    if (when_ms >= 1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms = when_ms;
}

aeDeleteTimeEvent 删除一个时间事件。

// 5.0 src/ae.c

int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            te->id = AE_DELETED_EVENT_ID;
            return AE_OK;
        }
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}

事件处理

Redis 中同时存在时间事件和文件事件,aeProcessEvents 负责处理这两种事件的调度。

// 5.0  src/ae.c
/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

对于存在文件事件或需要等待时间事件到达的,先开始计算等待时间。

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
  • 需要等待时间事件达到的,计算最近的时间事件需要等待的时间。
  • 没有时间事件的,看是否需要等待(tvp 值都为 0 表示不需要等待,NULL 表示永久等待)。

开始文件事件处理,阻塞等待 tvp 时间。

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

就绪的文件事件记录在 eventLoop->fired 中,通过 fd 可以在 eventLoop->events 中找到关联的注册事件。

            // ... 执行文件事件处理器

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

文件事件处理结束后接着处理时间事件 processTimeEvents。

junnplus avatar Apr 25 '20 09:04 junnplus

Event Handler

Specifies an interface consisting of a hook method that abstractly represents the dispatching operation for service-specific events.

Redis 的文件事件和时间事件的事件处理器定义:

// 5.0  src/ae.h

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

文件事件处理器

Redis 为文件事件编写了多个处理器,这些事件处理器分别用于实现不同的网络通信需求,比如说: ■ 为了对连接服务器的各个客户端进行应答,服务器要为监听套接字关联连接应答处理器。 ■ 为了接收客户端传来的命令请求,服务器要为客户端套接字关联命令请求处理器。 ■ 为了向客户端返回命令的执行结果,服务器要为客户端套接字关联命令回复处理器。 摘自:《Redis设计与实现》 — 黄健宏

时间事件处理器

Redis 中一般情况下只执行 serverCron 一个时间事件。

junnplus avatar Apr 25 '20 10:04 junnplus