leevis.com
leevis.com copied to clipboard
nginx stream 模块代码解析
概述
15年4月发布的ngx1.9.0版本,nginx以stream模块支持了4层代理的功能。
代码分析
启动阶段
stream模块定义在ngx_stream.c文件中。在ngx_init_cycle函数中会调用ngx_conf_parse解析配置文件。 stream 的block会由核心模块的ngx_stream_block函数来解析。 在函数中会分配一个ngx_stream_conf_ctx_t类型的上下文,来保存stream模块的配置结构体。 接着会调用stream类型模块的create_main_conf回调和create_srv_conf回调创建相关模块的配置结构体。 接着还会调用stream类型模块的preconfiguration回调来添加变量等。接着调用ngx_conf_parse函数解析stream模块的配置。解析完配置会调用stream模块的init_main_conf和merge_srv_conf初始化配置并合并一些配置。 解析完配置文件中stream中的配置后,调用ngx_stream_init_phases开始初始化执行阶段。 调用stream模块的postconfiguration回调函数添加执行阶段回调函数。
接着调用ngx_stream_init_phase_handlers函数初始化执行阶段。在该函数中,首先会根据stream模块安装到个阶段的所有回调的个数分配一个数组,赋值到cmcf->phase_engine.handlers,其类型为:ngx_stream_phase_handler_t
static ngx_int_t
ngx_stream_init_phase_handlers(ngx_conf_t *cf,
ngx_stream_core_main_conf_t *cmcf)
{
ngx_int_t j;
ngx_uint_t i, n;
ngx_stream_handler_pt *h;
ngx_stream_phase_handler_t *ph;
ngx_stream_phase_handler_pt checker;
n = 1 /* content phase */;
// 7个阶段中的6个阶段的回调函数的个数
// NGX_STREAM_CONTENT_PHASE阶段是不能添加回调函数的,只能赋值cscf->handler一个回调
// 例如: stream proxy模块 stream return 模块
for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) {
n += cmcf->phases[i].handlers.nelts;
}
ph = ngx_pcalloc(cf->pool,
n * sizeof(ngx_stream_phase_handler_t) + sizeof(void *));
if (ph == NULL) {
return NGX_ERROR;
}
cmcf->phase_engine.handlers = ph;
n = 0;
for (i = 0; i < NGX_STREAM_LOG_PHASE; i++) {
h = cmcf->phases[i].handlers.elts;
switch (i) {
case NGX_STREAM_PREREAD_PHASE:
checker = ngx_stream_core_preread_phase;
break;
case NGX_STREAM_CONTENT_PHASE:
ph->checker = ngx_stream_core_content_phase;
n++;
ph++;
continue;
default:
checker = ngx_stream_core_generic_phase;
}
n += cmcf->phases[i].handlers.nelts;
for (j = cmcf->phases[i].handlers.nelts - 1; j >= 0; j--) {
ph->checker = checker;
ph->handler = h[j];
ph->next = n;
ph++;
}
}
return NGX_OK;
}
调用ngx_stream_add_ports和ngx_stream_optimize_servers添加端口监听的回调函数。在ngx_stream_optimize_servers函数中,把绑定的ip和端口以及对应的配置赋值到所监听结构体的servers的变量。
添加的目的主要是为了能够在请求到达的时候找到对应的配置,也就是为了根据五元组中目的ip和端口来获取监听该ip和端口所对应的配置。在ngx_stream_init_connection函数有: s->main_conf = addr_conf->ctx->main_conf; s->srv_conf = addr_conf->ctx->srv_conf;
static char *
ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
{
ngx_uint_t i, p, last, bind_wildcard;
ngx_listening_t *ls;
ngx_stream_port_t *stport;
ngx_stream_conf_port_t *port;
ngx_stream_conf_addr_t *addr;
ngx_stream_core_srv_conf_t *cscf;
port = ports->elts;
for (p = 0; p < ports->nelts; p++) {
// 对port下的ip地址排序。宽绑定(通配符绑定)的排序到最后。
// 设置了特殊标识需要bind的排在最前。不需要bind的排到中间位置。
ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs);
addr = port[p].addrs.elts;
last = port[p].addrs.nelts;
/*
* if there is the binding to the "*:port" then we need to bind()
* to the "*:port" only and ignore the other bindings
*/
// 最后一个地址是否是宽绑定的,
// 如果是那么没有特殊标识设置的就不需要绑定监听了,
// 只需要绑定监听这个宽绑定ip就可以。
if (addr[last - 1].opt.wildcard) {
addr[last - 1].opt.bind = 1;
bind_wildcard = 1;
} else {
bind_wildcard = 0;
}
i = 0;
while (i < last) {
// 有宽绑定且不需要单独绑定
if (bind_wildcard && !addr[i].opt.bind) {
i++;
continue;
}
// 如果没有宽绑定,每个地址都需要监听
// 如果有宽绑定,只需要绑定设置了特殊标识的地址和宽绑定地址,
// 不需要绑定监听的都会被添加到宽绑定地址的监听数组中(ls->servers->addrs)
ls = ngx_create_listening(cf, &addr[i].opt.sockaddr.sockaddr,
addr[i].opt.socklen);
if (ls == NULL) {
return NGX_CONF_ERROR;
}
ls->addr_ntop = 1;
ls->handler = ngx_stream_init_connection;
ls->pool_size = 256;
ls->type = addr[i].opt.type;
cscf = addr->opt.ctx->srv_conf[ngx_stream_core_module.ctx_index];
ls->logp = cscf->error_log;
ls->log.data = &ls->addr_text;
ls->log.handler = ngx_accept_log_error;
ls->backlog = addr[i].opt.backlog;
ls->rcvbuf = addr[i].opt.rcvbuf;
ls->sndbuf = addr[i].opt.sndbuf;
ls->wildcard = addr[i].opt.wildcard;
ls->keepalive = addr[i].opt.so_keepalive;
#if (NGX_HAVE_KEEPALIVE_TUNABLE)
ls->keepidle = addr[i].opt.tcp_keepidle;
ls->keepintvl = addr[i].opt.tcp_keepintvl;
ls->keepcnt = addr[i].opt.tcp_keepcnt;
#endif
#if (NGX_HAVE_INET6)
ls->ipv6only = addr[i].opt.ipv6only;
#endif
#if (NGX_HAVE_REUSEPORT)
ls->reuseport = addr[i].opt.reuseport;
#endif
stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t));
if (stport == NULL) {
return NGX_CONF_ERROR;
}
ls->servers = stport;
stport->naddrs = i + 1;
switch (ls->sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) {
return NGX_CONF_ERROR;
}
break;
#endif
default: /* AF_INET */
if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) {
return NGX_CONF_ERROR;
}
break;
}
if (ngx_clone_listening(cf, ls) != NGX_OK) {
return NGX_CONF_ERROR;
}
addr++;
last--;
}
}
return NGX_CONF_OK;
}
执行阶段
4层的stream模块不像7层的http模块有明确的http协议的实现,而4层就是一个流式的。7层的http模块围绕ngx_http_request_t结构体。而4层围绕ngx_stream_session_s结构体。 stream模块建立链接的回调函数为ngx_stream_init_connection。http模块的是ngx_http_init_connection。 我们从ngx_stream_init_connection函数开始,跟踪一次stream请求。 当tcp握手请求到达时,会调用事件模块的ngx_event_accept回调函数建立链接。当建立起链接后,如果是stream模块则会调用回调函数ngx_http_init_connection建立4层会话,主要创建一个ngx_stream_session_t结构体来保存会话。 建立起会话后会调用会话处理回调函数ngx_stream_session_handler。在该函数中调用了ngx_stream_core_run_phases函数,运行stream 模块7个执行阶段的前6个。
struct ngx_stream_session_s {
uint32_t signature; /* "STRM" */
ngx_connection_t *connection;
off_t received;
time_t start_sec;
ngx_msec_t start_msec;
ngx_log_handler_pt log_handler;
void **ctx;
void **main_conf;
void **srv_conf;
ngx_stream_upstream_t *upstream;
ngx_array_t *upstream_states;
/* of ngx_stream_upstream_state_t */
ngx_stream_variable_value_t *variables;
#if (NGX_PCRE)
ngx_uint_t ncaptures;
int *captures;
u_char *captures_data;
#endif
ngx_int_t phase_handler;
ngx_uint_t status;
unsigned ssl:1;
unsigned stat_processing:1;
unsigned health_check:1;
};
void
ngx_stream_init_connection(ngx_connection_t *c)
{
u_char text[NGX_SOCKADDR_STRLEN];
size_t len;
ngx_uint_t i;
ngx_time_t *tp;
ngx_event_t *rev;
struct sockaddr *sa;
ngx_stream_port_t *port;
struct sockaddr_in *sin;
ngx_stream_in_addr_t *addr;
ngx_stream_session_t *s;
ngx_stream_addr_conf_t *addr_conf;
#if (NGX_HAVE_INET6)
struct sockaddr_in6 *sin6;
ngx_stream_in6_addr_t *addr6;
#endif
ngx_stream_core_srv_conf_t *cscf;
ngx_stream_core_main_conf_t *cmcf;
/* find the server configuration for the address:port */
port = c->listening->servers;
if (port->naddrs > 1) {
/*
* There are several addresses on this port and one of them
* is the "*:port" wildcard so getsockname() is needed to determine
* the server address.
*
* AcceptEx() and recvmsg() already gave this address.
*/
if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
ngx_stream_close_connection(c);
return;
}
sa = c->local_sockaddr;
switch (sa->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
sin6 = (struct sockaddr_in6 *) sa;
addr6 = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) {
if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
break;
}
}
addr_conf = &addr6[i].conf;
break;
#endif
default: /* AF_INET */
sin = (struct sockaddr_in *) sa;
addr = port->addrs;
/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) {
if (addr[i].addr == sin->sin_addr.s_addr) {
break;
}
}
addr_conf = &addr[i].conf;
break;
}
} else {
switch (c->local_sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
addr6 = port->addrs;
addr_conf = &addr6[0].conf;
break;
#endif
default: /* AF_INET */
addr = port->addrs;
addr_conf = &addr[0].conf;
break;
}
}
s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
if (s == NULL) {
ngx_stream_close_connection(c);
return;
}
s->signature = NGX_STREAM_MODULE;
s->main_conf = addr_conf->ctx->main_conf;
s->srv_conf = addr_conf->ctx->srv_conf;
#if (NGX_STREAM_SSL)
s->ssl = addr_conf->ssl;
#endif
if (c->buffer) {
s->received += c->buffer->last - c->buffer->pos;
}
s->connection = c;
c->data = s;
cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
ngx_set_connection_log(c, cscf->error_log);
len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1);
ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%uA %sclient %*s connected to %V",
c->number, c->type == SOCK_DGRAM ? "udp " : "",
len, text, &addr_conf->addr_text);
c->log->connection = c->number;
c->log->handler = ngx_stream_log_error;
c->log->data = s;
c->log->action = "initializing session";
c->log_error = NGX_ERROR_INFO;
s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
if (s->ctx == NULL) {
ngx_stream_close_connection(c);
return;
}
cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
s->variables = ngx_pcalloc(s->connection->pool,
cmcf->variables.nelts
* sizeof(ngx_stream_variable_value_t));
if (s->variables == NULL) {
ngx_stream_close_connection(c);
return;
}
tp = ngx_timeofday();
s->start_sec = tp->sec;
s->start_msec = tp->msec;
rev = c->read;
rev->handler = ngx_stream_session_handler;
if (addr_conf->proxy_protocol) {
c->log->action = "reading PROXY protocol";
rev->handler = ngx_stream_proxy_protocol_handler;
if (!rev->ready) {
ngx_add_timer(rev, cscf->proxy_protocol_timeout);
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_stream_finalize_session(s,
NGX_STREAM_INTERNAL_SERVER_ERROR);
}
return;
}
}
if (ngx_use_accept_mutex) {
ngx_post_event(rev, &ngx_posted_events);
return;
}
rev->handler(rev);
}
void
ngx_stream_session_handler(ngx_event_t *rev)
{
ngx_connection_t *c;
ngx_stream_session_t *s;
c = rev->data;
s = c->data;
ngx_stream_core_run_phases(s);
}
void
ngx_stream_core_run_phases(ngx_stream_session_t *s)
{
ngx_int_t rc;
ngx_stream_phase_handler_t *ph;
ngx_stream_core_main_conf_t *cmcf;
cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
ph = cmcf->phase_engine.handlers;
while (ph[s->phase_handler].checker) {
rc = ph[s->phase_handler].checker(s, &ph[s->phase_handler]);
if (rc == NGX_OK) {
return;
}
}
}
总结
分析的很棒
分析的很棒 我也在看