xingbofeng.github.io
xingbofeng.github.io copied to clipboard
Linux epoll & Node.js Event Loop & I / O复用
select
、poll
、epoll
都是I / O
复用的机制,在《UNIX网络编程》
里重点讲了select
、poll
的机制,但select
、poll
并不是现代高性能服务器的最佳选择。包括现在的Node.js
中的事件循环机制(event loop)
也是基于epoll
实现的。
select和poll的缺点
按照《UNIX网络编程》
中所述,poll
与select
类似,没有解决以下的问题:
- 每次调用
select
,都需要把fd
集合从用户态拷贝到内核态,这个开销在fd
很多时会很大 - 同时每次调用
select
都需要在内核遍历传递进来的所有fd
,这个开销在fd
很多时也很大 -
select
支持的文件描述符数量太小了,默认是1024
epoll对于上述缺点的改进
epoll
既然是对select
和poll
的改进,就应该能避免上述的三个缺点。那epoll
都是怎么解决的呢?在此之前,我们先看一下epoll
和select
和poll
的调用接口上的不同,select
和poll
都只提供了一个函数——select
或者poll
函数。而epoll
提供了三个函数,epoll_create
,epoll_ctl
和epoll_wait
,epoll_create
是创建一个epoll
句柄;epoll_ctl
是注册要监听的事件类型;epoll_wait
则是等待事件的产生。
对于第一个缺点,epoll
的解决方案在epoll_ctl
函数中。每次注册新的事件到epoll
句柄中时(在epoll_ctl
中指定EPOLL_CTL_ADD
),会把所有的fd
拷贝进内核,而不是在epoll_wait
的时候重复拷贝。epoll
保证了每个fd
在整个过程中只会拷贝一次。
对于第二个缺点,epoll
的解决方案不像select
或poll
一样每次都把current
轮流加入fd
对应的设备等待队列中,而只在epoll_ctl
时把current
挂一遍(这一遍必不可少)并为每个fd
指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait
的工作实际上就是在这个就绪链表中查看有没有就绪的fd
(利用schedule_timeout()
实现睡一会,判断一会的效果,和select
实现中的第7步是类似的)。
对于第三个缺点,epoll
没有这个限制,它所支持的FD
上限是最大可以打开文件的数目,这个数字一般远大于2048
,举个例子,在1GB
内存的机器上大约是10万
左右,一般来说这个数目和系统内存关系很大。
epoll接口
epoll操作过程需要三个接口,分别如下:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll_create
方法
#include <sys/epoll.h>
int epoll_create(int size);
创建一个epoll
的句柄,size
用来告诉内核这个监听的数目一共有多大,这个参数不同于select()
中的第一个参数,给出最大监听的fd+1
的值,参数size
并不是限制了epoll
所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当创建好epoll
句柄后,它就会占用一个fd
值,在linux
下如果查看/proc/
进程id/fd/
,是能够看到这个fd
的,所以在使用完epoll
后,必须调用close()
关闭,否则可能导致fd
被耗尽。
#include <sys/epoll.h>
#define FDSIZE 1024
// ...
int main(int argc,char *argv[])
{
int epollfd = epoll_create(FDSIZE); // 这里并不是指最大文件描述符数量为1024,而是给内核初始化数据结构的一个建议。
return 0;
}
epoll_ctl
方法
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl
方法是epoll
的事件注册函数,它不同与select()
是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
-
epfd
:是epoll_create()
的返回值。 -
op
:表示对对应的fd
文件描述符的操作,一般情况下表示想要监听事件、删除事件和修改事件处理函数,用三个宏来表示:-
EPOLL_CTL_ADD
,表示对于对应的fd
文件描述符添加一组事件监听; -
EPOLL_CTL_DEL
,表示对于对应的fd
文件描述符删除该组事件监听; -
EPOLL_CTL_MOD
,表示对于对应的fd
文件描述符修改该组事件监听为新的events
;
-
-
fd
:表示需要监听的fd
(文件描述符) -
event
:是告诉内核需要监听的事件集合,传入一个指针,指向事件集合的第一项,struct epoll_event
的结构如下:
struct epoll_event {
__uint32_t events; // 表示一类epoll事件
epoll_data_t data; // 用户传递的数据
}
因为events
表示一类epoll
事件,它可以是以下几个宏的集合:
-
EPOLLIN
:表示对应的文件描述符可以读(包括对端SOCKET正常关闭); -
EPOLLOUT
:表示对应的文件描述符可以写; -
EPOLLPRI
:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); -
EPOLLERR
:表示对应的文件描述符发生错误; -
EPOLLHUP
:表示对应的文件描述符被挂断; -
EPOLLET
: 将EPOLL
设为边缘触发(Edge Triggered
)模式,这是相对于水平触发(Level Triggered
)来说的; -
EPOLLONESHOT
:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket
的话,需要再次把这个socket
加入到EPOLL
队列里;
例如,如果想让epoll
对于对应的文件描述符fd
添加一组事件,监听对应的文件描述符可读的情况:
static void add_event_epoll_in(int epollfd, int fd)
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
epoll_wait
方法
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll_wait
方法等待事件的产生,类似于select()
调用,返回需要处理的事件数目。
-
epfd
:是epoll_create()
的返回值; -
events
:用来从内核得到事件的集合,我们一般把需要处理的事件对应的文件描述符fd
放到events
结构体下data
参数内,这样我们可以在事件处理函数中取到对应的文件描述符fd
,执行对应操作(例如对于TCP
套接字,我们调用read
,write
等); -
maxevents
:告之内核这个events
的数量,这个maxevents
的值不能大于创建epoll_create()
时的size
,否则会造成溢出的风险; -
timeout
:超时时间,以毫秒为单位,如果设置为-1
,表示一直等待,设置为0
表示不等待;
举例:应用程序一般阻塞与epoll_wait
调用,一旦events
中任意一事件触发,epoll_wait
执行,等待指定的timeout
超时时间,如果I / O
完成则立即返回需要处理的事件数目。
static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
add_event_epoll_in(epollfd, listenfd);
for ( ; ; )
{
// 获取已经准备好的描述符事件数目
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd, events, ret, listenfd, buf);
}
close(epollfd);
}
static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
int i;
int fd;
//进行选好遍历
for (i = 0;i < num;i++)
{
// 在这里取到需要处理的文件描述符
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
使用epoll
重构服务器回射程序
服务端
// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd,char *buf);
//写处理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);
int main(int argc,char *argv[])
{
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}
static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);
for ( ; ; )
{
//获取已经准备好的描述符事件
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,listenfd,buf);
}
close(epollfd);
}
static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
int i;
int fd;
//进行选好遍历
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
static void handle_accpet(int epollfd,int listenfd)
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//添加一个客户描述符和事件
add_event(epollfd,clifd,EPOLLIN);
}
}
static void do_read(int epollfd,int fd,char *buf)
{
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else if (nread == 0)
{
fprintf(stderr,"client close.\n");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else
{
printf("read message is : %s",buf);
//修改描述符对应的事件,由读改为写
modify_event(epollfd,fd,EPOLLOUT);
}
}
static void do_write(int epollfd,int fd,char *buf)
{
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
delete_event(epollfd,fd,EPOLLOUT);
}
else
modify_event(epollfd,fd,EPOLLIN);
memset(buf,0,MAXSIZE);
}
static void add_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}
static void delete_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
客户端
// client.c
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#define MAXSIZE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787
#define FDSIZE 1024
#define EPOLLEVENTS 20
static void handle_connection(int sockfd);
static void
handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_write(int epollfd,int fd,int sockfd,char *buf);
static void add_event(int epollfd,int fd,int state);
static void delete_event(int epollfd,int fd,int state);
static void modify_event(int epollfd,int fd,int state);
int main(int argc,char *argv[])
{
int sockfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
//处理连接
handle_connection(sockfd);
close(sockfd);
return 0;
}
static void handle_connection(int sockfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];
int ret;
epollfd = epoll_create(FDSIZE);
add_event(epollfd,STDIN_FILENO,EPOLLIN);
for ( ; ; )
{
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,sockfd,buf);
}
close(epollfd);
}
static void
handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf)
{
int fd;
int i;
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
do_read(epollfd,fd,sockfd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,sockfd,buf);
}
}
static void do_read(int epollfd,int fd,int sockfd,char *buf)
{
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
}
else if (nread == 0)
{
fprintf(stderr,"server close.\n");
close(fd);
}
else
{
if (fd == STDIN_FILENO)
add_event(epollfd,sockfd,EPOLLOUT);
else
{
delete_event(epollfd,sockfd,EPOLLIN);
add_event(epollfd,STDOUT_FILENO,EPOLLOUT);
}
}
}
static void do_write(int epollfd,int fd,int sockfd,char *buf)
{
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
}
else
{
if (fd == STDOUT_FILENO)
delete_event(epollfd,fd,EPOLLOUT);
else
modify_event(epollfd,fd,EPOLLIN);
}
memset(buf,0,MAXSIZE);
}
static void add_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}
static void delete_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
运行结果:
Node.js的Event Loop
众所周知,Node.js
是单线程的,可用作高性能服务器。显然,若仅仅限定为1024
个描述符,对于高并发的请求显然是不支持的。Node.js
内部的Event Demultiplexer(事件多路分解器)
借助epoll
来实现事件循环机制。其基本步骤如下:
- 应用程序通过向
Event Demultiplexer(事件多路分解器)
提交请求来生成新的I / O
操作。应用程序还指定一个处理程序,当操作完成时将调用该处理程序。向Event Demultiplexer(事件多路分解器)
提交新请求是一种非阻塞调用,它立即将控制权返回给该应用程序。 - 当一组
I / O
操作完成时,事件多路分解器将新的事件推入Event Queue(事件队列)
。 - 此时
Event Loop
遍历Event Queue
的项目。 - 对于每个事件,调用关联的处理程序。
- 处理程序是应用程序代码的一部分,当它执行完成时将控制权返回给
Event Loop
。但是,在处理程序执行过程中可能会请求新的异步操作,从而导致新的操作被插入Event Demultiplexer(事件多路分解器)
- 当
Event Loop
中的所有项目被处理完时,循环将再次阻塞Event Demultiplexer(事件多路分解器)
,当有新事件可用时,Event Demultiplexer(事件多路分解器)
将触发另一个周期。
通过epoll
解析上述步骤操作:
- 事件多路分解器即为通过
epoll_create()
创建的epoll
句柄的抽象,在Node.js
启动时,事件多路分解器会阻塞于epoll_wait
调用。 - 对于第一步,注册事件处理程序时,调用
epoll_ctl()
方法,设定op
参数为EPOLL_CTL_ADD
,向事件多路分解器添加一组事件。 - 对于第二步,一旦
I / O
完成,又调用epoll_ctl()
方法,设定op
参数为EPOLL_CTL_DEL
,删除对应事件,此时把控制权返还给应用程序。 - 事件循环遍历事件队列,只要没有事件,就阻塞于
epoll_wait()
。 - 不断重复上述步骤,实现Node.js`的事件循环机制。