nginx http pipeline 处理代码分析
背景
近期遇到一个问题,源站接受请求后无响应或者响应时间非常长,客户端长连接测试。 一个连接发了多个请求,等不到响应时关闭连接。查看access日志时只有第一个请求的一条日志4,error日志也没详细的错误。 接下来就看看nginx是怎么处理http的pipeline请求的。
代码分析
先看个测试例子:
use Test::Nginx::Socket::Lua;
use Cwd qw(cwd);
log_level('debug');
repeat_each(1);
plan tests => repeat_each() * (7 * blocks());
no_long_string();
run_tests();
__DATA__
=== TEST 1: test
--- http_config
variables_hash_max_size 2048;
# server {
# listen 127.0.0.1:8082;
# access_log off;
# location / {
# content_by_lua_block {
# -- ngx.sleep(10)
# ngx.print("OK!")
# ngx.exit(ngx.HTTP_OK)
# }
# }
# }
--- config
location /t/ {
proxy_read_timeout 500s;
proxy_send_timeout 500s;
proxy_pass http://127.0.0.1:8082;
}
--- pipelined_requests eval
["GET /t/label/1", "GET /t/label/2", "GET /t/label/3"]
--- more_headers
Connection: keep-alive
--- no_error_log
[error]
--- timeout: 20
http请求到达时先调用ngx_http_init_connection函数初始化请求结构体,
接着接受请求行,调用ngx_http_process_request_line函数处理请求行。
接着接受请求头,调用ngx_http_process_request_headers函数处理请求头。注意到,请求头有个长连接标志的头Connection: keep-alive ,调用了下面的回调函数处理。
static ngx_int_t
ngx_http_process_connection(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
if (ngx_strcasestrn(h->value.data, "close", 5 - 1)) {
r->headers_in.connection_type = NGX_HTTP_CONNECTION_CLOSE;
} else if (ngx_strcasestrn(h->value.data, "keep-alive", 10 - 1)) {
r->headers_in.connection_type = NGX_HTTP_CONNECTION_KEEP_ALIVE;
}
return NGX_OK;
}
处理完请求头后,调用ngx_http_process_request函数,在该函数中有如下代码:
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
可读函数应用层回调函数被赋值为ngx_http_block_reading,从字面意思就是阻止再读,就是不读请求内容了。
接着调用了ngx_http_handler函数,该函数中根据r->headers_in.connection_type的值赋值r->keepalive = 1;
接着调用了ngx_http_core_run_phases函数处理nginx的10个阶段,CONTENT阶段会调用ngx_http_core_content_phase回调函数,该函数有如下代码:
if (r->content_handler) {
r->write_event_handler = ngx_http_request_empty_handler;
ngx_http_finalize_request(r, r->content_handler(r));
return NGX_OK;
}
而r->content_handler是在FIND_CONFIG阶段的回调函数ngx_http_core_find_config_phase中调用了ngx_http_update_location_config函数,该函数中会根据ua浏览器和配置决定是否保持长连接,并赋值r->content_handler = clcf->handler;反向代理模式,该handler被赋值为ngx_http_proxy_handler函数,该函数有下面调用:
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
先调用ngx_http_read_client_request_body读取请求body,然后调用ngx_http_upstream_init初始化上游连接调用ngx_http_upstream_send_request发送请求,向上游发送完请求后,上游连接可读时,传输层回调函数ngx_http_upstream_handler被调用,从而应用层函数ngx_http_upstream_process_header被调用,处理完相应头后,调用 ngx_http_upstream_send_response发送响应。 发送完毕后返回到了ngx_http_core_content_phase函数中的,ngx_http_finalize_request(r, r->content_handler(r));代码处。 然后调用了ngx_http_finalize_connection,该函数有如下代码:
if (!ngx_terminate
&& !ngx_exiting
&& r->keepalive
&& clcf->keepalive_timeout > 0)
{
ngx_http_set_keepalive(r);
return;
}
接下来,重点看下ngx_http_set_keepalive函数:
static void
ngx_http_set_keepalive(ngx_http_request_t *r)
{
int tcp_nodelay;
ngx_buf_t *b, *f;
ngx_chain_t *cl, *ln;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
ngx_http_connection_t *hc;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
rev = c->read;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "set http keepalive handler");
if (r->discard_body) {
r->write_event_handler = ngx_http_request_empty_handler;
r->lingering_time = ngx_time() + (time_t) (clcf->lingering_time / 1000);
ngx_add_timer(rev, clcf->lingering_timeout);
return;
}
c->log->action = "closing request";
hc = r->http_connection;
b = r->header_in;
// 读回到pipeline下一个请求的内容
if (b->pos < b->last) {
/* the pipelined request */
if (b != c->buffer) { // 处理当前请求时重新分配过内存
/*
* If the large header buffers were allocated while the previous
* request processing then we do not use c->buffer for
* the pipelined request (see ngx_http_create_request()).
*
* Now we would move the large header buffers to the free list.
*/
for (cl = hc->busy; cl; /* void */) {
ln = cl;
cl = cl->next;
if (ln->buf == b) {
ngx_free_chain(c->pool, ln);
continue;
}
f = ln->buf;
f->pos = f->start;
f->last = f->start;
ln->next = hc->free;
hc->free = ln;
}
cl = ngx_alloc_chain_link(c->pool);
if (cl == NULL) {
ngx_http_close_request(r, 0);
return;
}
cl->buf = b;
cl->next = NULL;
hc->busy = cl;
hc->nbusy = 1;
}
}
/* guard against recursive call from ngx_http_finalize_connection() */
r->keepalive = 0;
ngx_http_free_request(r, 0);
c->data = hc;
// 传输层可读事件加入epoll监听起来
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_close_connection(c);
return;
}
wev = c->write;
wev->handler = ngx_http_empty_handler;
if (b->pos < b->last) {
// 当前已读到了下个请求的内容了
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "pipelined request");
c->log->action = "reading client pipelined request line";
r = ngx_http_create_request(c);
if (r == NULL) {
ngx_http_close_connection(c);
return;
}
r->pipeline = 1;
c->data = r;
c->sent = 0;
c->destroyed = 0;
if (rev->timer_set) {
ngx_del_timer(rev);
}
rev->handler = ngx_http_process_request_line;
ngx_post_event(rev, &ngx_posted_events);
return;
}
/*
* To keep a memory footprint as small as possible for an idle keepalive
* connection we try to free c->buffer's memory if it was allocated outside
* the c->pool. The large header buffers are always allocated outside the
* c->pool and are freed too.
*/
b = c->buffer;
if (ngx_pfree(c->pool, b->start) == NGX_OK) {
/*
* the special note for ngx_http_keepalive_handler() that
* c->buffer's memory was freed
*/
b->pos = NULL;
} else {
b->pos = b->start;
b->last = b->start;
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "hc free: %p",
hc->free);
if (hc->free) {
for (cl = hc->free; cl; /* void */) {
ln = cl;
cl = cl->next;
ngx_pfree(c->pool, ln->buf->start);
ngx_free_chain(c->pool, ln);
}
hc->free = NULL;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "hc busy: %p %i",
hc->busy, hc->nbusy);
if (hc->busy) {
for (cl = hc->busy; cl; /* void */) {
ln = cl;
cl = cl->next;
ngx_pfree(c->pool, ln->buf->start);
ngx_free_chain(c->pool, ln);
}
hc->busy = NULL;
hc->nbusy = 0;
}
#if (NGX_HTTP_SSL)
if (c->ssl) {
ngx_ssl_free_buffer(c);
}
#endif
// 下个请求还没收到
rev->handler = ngx_http_keepalive_handler;
if (wev->active && (ngx_event_flags & NGX_USE_LEVEL_EVENT)) {
if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) != NGX_OK) {
ngx_http_close_connection(c);
return;
}
}
c->log->action = "keepalive";
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == -1) {
ngx_connection_error(c, ngx_socket_errno, ngx_tcp_push_n " failed");
ngx_http_close_connection(c);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
tcp_nodelay = ngx_tcp_nodelay_and_tcp_nopush ? 1 : 0;
} else {
tcp_nodelay = 1;
}
if (tcp_nodelay && clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
ngx_http_close_connection(c);
return;
}
#if 0
/* if ngx_http_request_t was freed then we need some other place */
r->http_state = NGX_HTTP_KEEPALIVE_STATE;
#endif
c->idle = 1;
ngx_reusable_connection(c, 1);
// 和客户端的长连接保持的时间,keepalive_timeout配置指定。
ngx_add_timer(rev, clcf->keepalive_timeout);
if (rev->ready) {
ngx_post_event(rev, &ngx_posted_events);
}
}
总结
一条tcp连接上,发送多个请求,只有在当前请求处理完毕正确返回后,保持住长连接,才会处理下一个请求。