Blog
Blog copied to clipboard
Linux epoll原理及内核实现
epoll技术简介
epoll是一种典型的I/O多路复用技术,epoll技术的最大特点是支持高并发。传统多路复用技术select,poll当并发量达到一两千的时候性能就会下降,而epoll能支持上万的并发连接,并且依然能快速稳定的响应。当然,并发连接每增加一个,必定要消耗一定的内存去保存这个连接相关的数据。
epoll_create()函数
int epoll_create(int size)
功能:创建一个epoll对象,返回该对象的描述符(文件描述符),这个描述符就代表这个epoll对象,后续会用到。从内核代码可以看出来,这个size貌似只需要大于0即可。
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
再看看epoll_create1的实现
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
//省略部分代码
struct eventpoll *ep = NULL;
...
error = ep_alloc(&ep);
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
fd_install(fd, file);
ep->file = file;
return fd;
}
首先分配一个eventpoll
结构并初始化,这个结构有两个非常重要的数据结构struct list_head rdllist
和struct rb_root rbr
。
rdllist指向一个双向链表,用来指向已经准备好(有数据)的文件描述符。
rbr指向一棵红黑树的根,
像下面这个图这样,
接着,找到一个没有被使用的文件描述符
fd
,并设置file
结构,然后再将fd
和file
绑定起来。
总结:epoll_create()函数主要是创建一个eventpoll的结构,并把fd返回给用户空间。
epoll_ctl()函数
int epoll_ctl(int efpd,int op,int sockid,struct epoll_event *event);
功能:把一个socket以及这个socket相关的事件添加到这个epoll对象描述符中去,目的就是通过这个epoll对象来监视这个socke上数据的来往情况,当有数据来往时,系统会通知我们。 op:动作,添加/删除/修改 ,对应数字是1,2,3, 对应宏是EPOLL_CTL_ADD, EPOLL_CTL_DEL ,EPOLL_CTL_MOD。后面我们会分析每个动作内核具体做了什么。 sockid:表示某个文件句柄,这个是红黑树里边的key。 event:事件信息,EPOLL_CTL_ADD和EPOLL_CTL_MOD都要用到这个event参数里边的事件信息; 下面来看看当用户调用'epoll_ctl'函数时,内核做了哪些事情。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
...
/* Get the "struct file *" for the eventpoll file */
file = fget(epfd);
//监听的目标句柄的file_operatons结构中必须实现了poll函数
tfile = fget(fd);
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
// 如果将epoll自己的句柄加入进来会直接返回EINVAL错误
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
// 下面会判断是不是会进入一个死循环,这里没看明白,略过...
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0)
goto error_tgt_fput;
} else
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}
// 接下来根据不同的op来操作,下面分开讲解
....
return error;
}
EPOLL_CTL_ADD、EPOLL_CTL_DEL、EPOLL_CTL_MOD实现
// 通过传入的fd在红黑树中找到epitem结构
// 这个结构主要指向已经准备好(有数据)的rbn红黑树和rdllink链表
epi = ep_find(ep, tfile, fd);
epitem结构设计的高明之处:既能够作为红黑树中的节点,又能够作为双向链表中的节点。
红黑树这种数据结构可以简单的理解为通过
key
来找value
,且它的查找速度特别快,内核里面很多地方都用到了这种数据结构。下面看看如何在红黑树中查找epitem
结构
static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd)
{
for (rbp = ep->rbr.rb_node; rbp; ) {
epi = rb_entry(rbp, struct epitem, rbn);
kcmp = ep_cmp_ffd(&ffd, &epi->ffd);
if (kcmp > 0)
rbp = rbp->rb_right;
else if (kcmp < 0)
rbp = rbp->rb_left;
else {
epir = epi;
break;
}
}
}
/* Compare RB tree keys */
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
struct epoll_filefd *p2)
{
return (p1->file > p2->file ? +1:
(p1->file < p2->file ? -1 : p1->fd - p2->fd));
}
下面看具体的增删改操作
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
// 如果没有找到eip就将fd插入红黑树
error = ep_insert(ep, &epds, tfile, fd);
else
// 否则当前节点已经存在,返回错误
error = -EEXIST;
ep_insert函数首先会判断epoll是否超过了系统监听的最多数目max_user_watches。
这个值可以通过cat /proc/sys/fs/epoll/max_user_watches
获取,再我的系统上是1646305
,当然这个值是可以改的,后面有机会再说。
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
// 判断是否超过系统能监听的最大值
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
接下来是初始化`epitem`结构。
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 下面很重要,也很复制,还没怎么看明白,以后在分析。。。
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
case EPOLL_CTL_DEL:
// 如果是删除操作,直接从红黑树中删除节点
error = ep_remove(ep, epi);
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
// 如果是MOD,就相关相关节点的event数据
error = ep_modify(ep, epi, &epds);
总结一下: EPOLL_CTL_ADD:等价于往红黑树中增加节点 EPOLL_CTL_DEL:等价于从红黑树中删除节点 EPOLL_CTL_MOD:等价于修改已有的红黑树的节点
epoll_wait()函数
那么当事件发生时,我们如何拿到系统的通知呢?这就需要epoll_wait函数了。
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
功能:阻塞一小段时间并等待事件发生,返回事件集合,也就是获取内核的事件通知。 epfd:是epoll_create()返回的epoll对象描述符; 参数events:是内存,也是数组,长度 是maxevents,表示此次epoll_wait调用可以收集到的maxevents个已经准备好的读写事件;
// 首先仍然是一些判断
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
// 然后进入最关键的ep_poll
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
而ep_poll最关键的是遍历链表
if (ep_events_available(ep) || timed_out)
..
static inline int ep_events_available(struct eventpoll *ep)
{
// 查看链表中是否有可以读的事件,有就返回
return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}
epoll_wait主要是遍历这个双向链表,把这个双向链表里边的节点数据拷贝出去,拷贝完毕的就从双向链表里移除。
参考资料
https://www.cnblogs.com/sduzh/p/6714281.html https://idndx.com/2014/09/01/the-implementation-of-epoll-1/ https://idndx.com/2014/09/02/the-implementation-of-epoll-2/