9.6.3 ngx_epoll_module模块的实现

本节主要介绍事件驱动模块接口与epoll用法是如何结合起来发挥作用的。首先看一下ngx_epoll_module模块究竟对哪些配置项感兴趣,其中ngx_epoll_commands数组指明了影响其可定制性的两个配置项。


static ngx_command_t ngx_epoll_commands[]={

/在调用epoll_wait时,将由第2和第3个参数告诉Linux内核一次最多可返回多少个事件。这个配置项表示调用一次epoll_wait时最多可以返回的事件数,当然,它也会预分配那么多epoll_event结构体用于存储事件/

{ngx_string("epoll_events"),

NGX_EVENT_CONF|NGX_CONF_TAKE1,

ngx_conf_set_num_slot,

0,

offsetof(ngx_epoll_conf_t,events),

NULL},

/指明在开启异步I/O且使用io_setup系统调用初始化异步I/O上下文环境时,初始分配的异步I/O事件个数,详见9.9节/

{ngx_string("worker_aio_requests"),

NGX_EVENT_CONF|NGX_CONF_TAKE1,

ngx_conf_set_num_slot,

0,

offsetof(ngx_epoll_conf_t,aio_requests),

NULL},

ngx_null_command

};


上面使用了预分配的ngx_conf_set_num_slot方法来解析这两个配置项,下面看一下存储配置项的结构体ngx_epoll_conf_t。


typedef struct{

ngx_uint_t events;

ngx_uint_t aio_requests;

}ngx_epoll_conf_t;


其中,events是调用epoll_wait方法时传入的第3个参数maxevents,而第2个参数events数组的大小也是由它决定的,下面将在ngx_epoll_init方法中初始化这个数组。

接下来看一下epoll是如何定义ngx_event_module_t事件模块接口的,代码如下。


static ngx_str_t epoll_name=ngx_string("epoll");

ngx_event_module_t ngx_epoll_module_ctx={

&epoll_name,

ngx_epoll_create_conf,

ngx_epoll_init_conf,

{

//对应于ngx_event_actions_t中的add方法

ngx_epoll_add_event,

//对应于ngx_event_actions_t中的del方法

ngx_epoll_del_event,

//对应于ngx_event_actions_t中的enable方法,与add方法一致

ngx_epoll_add_event,

//对应于ngx_event_actions_t中的disable方法,与del方法一致

ngx_epoll_del_event,

//对应于ngx_event_actions_t中的add_conn方法

ngx_epoll_add_connection,

//对应于ngx_event_actions_t中的del_conn方法

ngx_epoll_del_connection,

//未实现ngx_event_actions_t中的process_changes方法

NULL,

//对应于ngx_event_actions_t中的process_events方法

ngx_epoll_process_events,

//对应于ngx_event_actions_t中的init方法

ngx_epoll_init,

//对应于ngx_event_actions_t中的done方法

ngx_epoll_done,

}

};


其中,ngx_epoll_create_conf方法和ngx_epoll_init_conf方法只是为了解析配置项,略过不提,下面重点看一下ngx_event_actions_t中的10个接口是如何实现的。

首先从实现init接口的ngx_epoll_init方法讲起。ngx_epoll-init方法是在什么时候被调用的呢?在图9-4的第4步中它会被调用,也就是Nginx的启动过程中。ngx_epoll_init方法主要做了两件事:

1)调用epoll_create方法创建epoll对象。

2)创建event_list数组,用于进行epoll_wait调用时传递内核态的事件。

event_list数组就是用于在epoll_wait调用中接收事件的参数,如下所示。


static int ep=-1;

static struct epoll_event*event_list;

static ngx_uint_t nevents;


其中,ep是epoll对象的描述符,nevents是上面说到的epoll_events配置项参数,它既指明了epoll_wait一次返回的最大事件数,也告诉了event_list应该分配的数组大小。ngx_epoll_init方法代码如下所示。


static ngx_int_t ngx_epoll_init(ngx_cycle_t*cycle,ngx_msec_t timer)

{

ngx_epoll_conf_t*epcf;

/获取create_conf中生成的ngx_epoll_conf_t结构体,它已经被赋予解析完配置文件后的值。详细内容可参见9.4.1节中关于ngx_event_get_conf宏的用法/

epcf=ngx_event_get_conf(cycle->conf_ctx,ngx_epoll_module);

if(ep==-1){

/调用epoll_create在内核中创建epoll对象。上文已经讲过,参数size不是用于指明epoll能够处理的最大事件个数,因为在许多Linux内核版本中,epoll是不处理这个参数的,所以设为cycle->connection_n/2(而不是cycle->connection_n)也不要紧/

ep=epoll_create(cycle->connection_n/2);

if(ep==-1){

ngx_log_error(NGX_LOG_EMERG,cycle->log,ngx_errno,

"epoll_create()failed");

return NGX_ERROR;

}

if(NGX_HAVE_FILE_AIO)

//异步I/O内容可参见9.9节

ngx_epoll_aio_init(cycle,epcf);

endif

}

if(nevents<epcf->events){

if(event_list){

ngx_free(event_list);

}

//初始化event_list数组。数组的个数是配置项epoll_events的参数

event_list=ngx_alloc(sizeof(struct epoll_event)*epcf->events,cycle->log);

if(event_list==NULL){

return NGX_ERROR;

}

}

//nevents也是配置项epoll_events的参数

nevents=epcf->events;

//指明读写I/O的方法,本章不做具体说明

ngx_io=ngx_os_io;

//设置ngx_event_actions接口

ngx_event_actions=ngx_epoll_module_ctx.actions;

if(NGX_HAVE_CLEAR_EVENT)

/默认是采用ET模式来使用epoll的,NGX_USE_CLEAR_EVENT宏实际上就是在告诉Nginx使用ET模式/

ngx_event_flags=NGX_USE_CLEAR_EVENT

else

ngx_event_flags=NGX_USE_LEVEL_EVENT

endif

|NGX_USE_GREEDY_EVENT

|NGX_USE_EPOLL_EVENT;

return NGX_OK;

}


ngx_event_actions是在Nginx事件框架处理事件时封装的接口,我们会在9.8节中说明它的用法。

对于epoll而言,并没有enable事件和disable事件的概念,另外,从ngx_epoll_module_ctx结构体中可以看出,enable和add接口都是使用ngx_epoll_add_event方法实现的,而disable和del接口都是使用ngx_epoll_del_event方法实现的。下面以ngx_epoll_add_event方法为例介绍一下它们是如何调用epoll_ctl向epoll中添加事件或从epoll中删除事件的。


static ngx_int_t

ngx_epoll_add_event(ngx_event_t*ev,ngx_int_t event,ngx_uint_t flags)

{

int

op;

uint32_t

events,prev;

ngx_event_t

*e;

ngx_connection_t

*c;

struct epoll_event ee;

//每个事件的data成员都存放着其对应的ngx_connection_t连接

c=ev->data;

/下面会根据event参数确定当前事件是读事件还是写事件,这会决定events是加上EPOLLIN标志位还是EPOLLOUT标志位/

events=(uint32_t)event;

……

//根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件

if(e->active){

op=EPOLL_CTL_MOD;

……

}else{

op=EPOLL_CTL_ADD;

}

//加入flags参数到events标志位中

ee.events=events|(uint32_t)flags;

/ptr成员存储的是ngx_connection_t连接,可参见9.6.2节中epoll的使用方式。在9.2节中曾经提到过事件的instance标志位,下面就配合ngx_epoll_process_events方法说明它的用法/

ee.data.ptr=(void*)((uintptr_t)c|ev->instance);

//调用epoll_ctl方法向epoll中添加事件或者在epoll中修改事件

if(epoll_ctl(ep,op,c->fd,&ee)==-1){

ngx_log_error(NGX_LOG_ALERT,ev->log,ngx_errno,

"epoll_ctl(%d,%d)failed",op,c->fd);

return NGX_ERROR;

}

//将事件的active标志位置为1,表示当前事件是活跃的

ev->active=1;

return NGX_OK;

}


ngx_epoll_del_event方法也通过epoll_ctl删除epoll中的事件,具体代码这里不再罗列,读者可参照ngx_epoll_add_event的实现理解其意义。

对于ngx_epoll_add_connection方法和ngx_epoll_del_connection方法,也是调用epoll_ctl方法向epoll中添加事件或者在epoll中删除事件的,只是每一个连接都对应读/写事件。因此,ngx_epoll_add_connection方法和ngx_epoll_del_connection方法在每次执行时也都是同时将每个连接对应的读、写事件active标志位置为1的,这里将不再给出其代码。

对于事件的instance标志位,已经在9.2节中简单地介绍了它的意义,下面将结合ngx_epoll_process_events方法具体说明其意义。ngx_epoll_process_events是实现了收集、分发事件的process_events接口的方法,其主要代码如下所示。


static ngx_int_t ngx_epoll_process_events(ngx_cycle_t*cycle,ngx_msec_t timer,ngx_uint_t flags)

{

int

events;

uint32_t

revents;

ngx_int_t

instance,i;

ngx_event_t

rev,wev,**queue;

ngx_connection_t*c;

/调用epoll_wait获取事件。注意,timer参数是在process_events调用时传入的,在9.7和9.8节中会提到这个参数/

events=epoll_wait(ep,event_list,(int)nevents,timer);

……

/在9.7节中会介绍Nginx对时间的缓存和管理。当flags标志位指示要更新时间时,就是在这里更新的/

if(flags&NGX_UPDATE_TIME||ngx_event_timer_alarm){

//更新时间,参见9.7.1节

ngx_time_update();

}

……

//遍历本次epoll_wait返回的所有事件

for(i=0;i<events;i++){

/对照着上面提到的ngx_epoll_add_event方法,可以看到ptr成员就是ngx_connection_t连接的地址,但最后1位有特殊含义,需要把它屏蔽掉/

c=event_list[i].data.ptr;

//将地址的最后一位取出来,用instance变量标识

instance=(uintptr_t)c&1;

/无论是32位还是64位机器,其地址的最后1位肯定是0,可以用下面这行语句把ngx_connection_t的地址还原到真正的地址值/

c=(ngx_connection_t*)((uintptr_t)c&(uintptr_t)~1);

//取出读事件

rev=c->read;

//判断这个读事件是否为过期事件

if(c->fd==-1||rev->instance!=instance){

/当fd套接字描述符为-1或者instance标志位不相等时,表示这个事件已经过期了,不用处理/

continue;

}

//取出事件类型

revents=event_list[i].events;

……

//如果是读事件且该事件是活跃的

if((revents&EPOLLIN)&&rev->active){

……

//flags参数中含有NGX_POST_EVENTS表示这批事件要延后处理

if(flags&NGX_POST_EVENTS){

/如果要在post队列中延后处理该事件,首先要判断它是新连接事件还是普通事件,以决定把它加入到ngx_posted_accept_events队列或者ngx_posted_events队列中。关于post队列中的事件何时执行,可参见9.8节内容/

queue=(ngx_event_t**)(rev->accept?&ngx_posted_accept_events:&ngx_posted_events);

//将这个事件添加到相应的延后执行队列中

ngx_locked_post_event(rev,queue);

}else{

//立即调用读事件的回调方法来处理这个事件

rev->handler(rev);

}

}

//取出写事件

wev=c->write;

if((revents&EPOLLOUT)&&wev->active){

//判断这个读事件是否为过期事件

if(c->fd==-1||wev->instance!=instance){

/当fd套接字描述符为-1或者instance标志位不相等时,表示这个事件已经过期了,不用处理/

continue;

}

……

if(flags&NGX_POST_EVENTS){

//将这个事件添加到post队列中延后处理

ngx_locked_post_event(wev,&ngx_posted_events);

}else{

//立即调用这个写事件的回调方法来处理这个事件

wev->handler(wev);

}

}

}

……

return NGX_OK;

}


ngx_epoll_process_events方法会收集当前触发的所有事件,对于不需要加入到post队列延后处理的事件,该方法会立刻执行它们的回调方法,这其实是在做分发事件的工作,只是它会在自己的进程中调用这些回调方法而已,因此,每一个回调方法都不能导致进程休眠或者消耗太多的时间,以免epoll不能即时地处理其他事件。

instance标志位为什么可以判断事件是否过期?从上面的代码可以看出,instance标志位的使用其实很简单,它利用了指针的最后一位一定是0这一特性。既然最后一位始终都是0,那么不如用来表示instance。这样,在使用ngx_epoll_add_event方法向epoll中添加事件时,就把epoll_event中联合成员data的ptr成员指向ngx_connection_t连接的地址,同时把最后一位置为这个事件的instance标志。而在ngx_epoll_process_events方法中取出指向连接的ptr地址时,先把最后一位instance取出来,再把ptr还原成正常的地址赋给ngx_connection_t连接。这样,instance究竟放在何处的问题也就解决了。

那么,过期事件又是怎么回事呢?举个例子,假设epoll_wait一次返回3个事件,在第1个事件的处理过程中,由于业务的需要,所以关闭了一个连接,而这个连接恰好对应第3个事件。这样的话,在处理到第3个事件时,这个事件就已经是过期事件了,一旦处理必然出错。既然如此,把关闭的这个连接的fd套接字置为-1能解决问题吗?答案是不能处理所有情况。

下面先来看看这种貌似不可能发生的场景到底是怎么发生的:假设第3个事件对应的ngx_connection_t连接中的fd套接字原先是50,处理第1个事件时把这个连接的套接字关闭了,同时置为-1,并且调用ngx_free_connection将该连接归还给连接池。在ngx_epoll_process_events方法的循环中开始处理第2个事件,恰好第2个事件是建立新连接事件,调用ngx_get_connection从连接池中取出的连接非常可能就是刚刚释放的第3个事件对应的连接。由于套接字50刚刚被释放,Linux内核非常有可能把刚刚释放的套接字50又分配给新建立的连接。因此,在循环中处理第3个事件时,这个事件就是过期的了!它对应的事件是关闭的连接,而不是新建立的连接。

如何解决这个问题?依靠instance标志位。当调用ngx_get_connection从连接池中获取一个新连接时,instance标志位就会置反,代码如下所示。


ngx_connection_t*

ngx_get_connection(ngx_socket_t s,ngx_log_t*log)

{

……

//从连接池中获取一个连接

ngx_connection_t*c;

c=ngx_cycle->free_connections;

……

rev=c->read;

wev=c->write;

……

instance=rev->instance;

//将instance标志位置为原来的相反值

rev->instance=!instance;

wev->instance=!instance;

……

return c;

}


这样,当这个ngx_connection_t连接重复使用时,它的instance标志位一定是不同的。因此,在ngx_epoll_process_events方法中一旦判断instance发生了变化,就认为这是过期事件而不予处理。这种设计方法是非常值得读者学习的,因为它几乎没有增加任何成本就很好地解决了服务器开发时一定会出现的过期事件问题。

目前,在ngx_event_actions_t接口中,所有事件模块都没有实现process_changes方法。done接口是由ngx_epoll_done方法实现的,在Nginx退出服务时它会得到调用。ngx_epoll_done主要是关闭epoll描述符ep,同时释放event_list数组。

了解了ngx_epoll_module_ctx中所有接口的实现后,ngx_epoll_module模块的定义就非常简单了,如下所示。


ngx_module_t ngx_epoll_module={

NGX_MODULE_V1,

&ngx_epoll_module_ctx,

/module context/

ngx_epoll_commands,

/module directives/

NGX_EVENT_MODULE,

/module type/

NULL,

/init master/

NULL,

/init module/

NULL,

/init process/

NULL,

/init thread/

NULL,

/exit thread/

NULL,

/exit process/

NULL,

/exit master/

NGX_MODULE_V1_PADDING

};


这里不需要再实现ngx_module_t接口中的7个回调方法了。

至此,我们完整地介绍了ngx_epoll_module模块是如何实现事件驱动机制的内容的。事实上,其他事件驱动模块的实现与ngx_epoll_module模块的差别并不是很大,读者可以参照本节内容阅读其他事件模块的源代码。