leevis.com
leevis.com copied to clipboard
nginx读取body
nginx作为代理服务器,需要读取body后转发。以proxy_pass这个指令为例说明。如下:
ngx_http_read_client_request_body(r, ngx_http_upstream_init)
读取body后向上游发起请求。在此暂时不介绍upstream相关的,仅仅介绍读取body。
首先会为保存body分配如下一个结构体:
typedef struct {
ngx_temp_file_t *temp_file; // 临时文件
ngx_chain_t *bufs; // 保存了读取到的body位置,可能是在内存,也可能在文件
ngx_buf_t *buf; // 读取body缓存区
off_t rest; // 未读取body大小
off_t received; //
ngx_chain_t *free; // 分配buf时使用,已经回收可以再分配的buf。
ngx_chain_t *busy; // 分配buf时使用,待回收的buf,buf指向的内存都是对应r->pool上的。
ngx_http_chunked_t *chunked; //
ngx_http_client_body_handler_pt post_handler; // 读body后的回调函数。
} ngx_http_request_body_t;
判断r->header_in 这个buf是否已经有读到的body。
-
如果有
- 调用ngx_http_request_body_filter复制,该函数判断如果不是chunked类型的body,则调用ngx_http_request_body_length_filter函数,该函数把读取回来的body的buf chain再复制一份出来(body本身的内容不复制,复制的是指向内容的指针),调用ngx_http_top_request_body_filter。 ngx_http_top_request_body_filter是个函数指针,指向ngx_http_request_body_save_filter。 该函数会把buf chain再复制到rb->bufs这个chain后面,如果rb->buf已经用完,则调用ngx_http_write_request_body函数,该函数会创建一个临时文件,把rb->bufs这个chain的内容写到文件中。
- 如果body不是chunked类型,且还有body未读取完毕,且header_in所指向的缓存区 够保存 剩余未读取的body,则让rb->buf指向剩余缓存区。
-
如果没有,则调用ngx_http_request_body_filter更新rb->rest。如果rb->rest 是0则没有body或body读取完毕,则更新request_body_no_buffering为0,调用设置的回调函数返回。 rb->rest小于0,则返回错误。 否则,根据配置的client_body_buffer_size分配一块内存,地址存到rb->buf。然后调用ngx_http_do_read_client_request_body处理。
ngx_http_read_client_request_body 函数
ngx_int_t
ngx_http_read_client_request_body(ngx_http_request_t *r,
ngx_http_client_body_handler_pt post_handler)
{
size_t preread;
ssize_t size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t out;
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
r->main->count++; // 请求结束时候会判单
if (r != r->main || r->request_body || r->discard_body) {
r->request_body_no_buffering = 0;
post_handler(r);
return NGX_OK;
}
if (ngx_http_test_expect(r) != NGX_OK) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
if (rb == NULL) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
/*
* set by ngx_pcalloc():
*
* rb->bufs = NULL;
* rb->buf = NULL;
* rb->free = NULL;
* rb->busy = NULL;
* rb->chunked = NULL;
*/
rb->rest = -1;
rb->post_handler = post_handler;
r->request_body = rb;
// 没有请求body
if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) {
r->request_body_no_buffering = 0;
post_handler(r);
return NGX_OK;
}
#if (NGX_HTTP_V2)
if (r->stream) {
rc = ngx_http_v2_read_request_body(r);
goto done;
}
#endif
preread = r->header_in->last - r->header_in->pos;
if (preread) {
// 已经读了一部分body
/* there is the pre-read part of the request body */
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http client request body preread %uz", preread);
out.buf = r->header_in;
out.next = NULL;
// 主要是调用ngx_http_request_body_save_filter保存到rb->bufs 或者文件中
rc = ngx_http_request_body_filter(r, &out);
if (rc != NGX_OK) {
goto done;
}
r->request_length += preread - (r->header_in->last - r->header_in->pos);
// 请求缓冲区够用
if (!r->headers_in.chunked
&& rb->rest > 0
&& rb->rest <= (off_t) (r->header_in->end - r->header_in->last))
{
/* the whole request body may be placed in r->header_in */
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
b->temporary = 1;
b->start = r->header_in->pos;
b->pos = r->header_in->pos;
b->last = r->header_in->last;
b->end = r->header_in->end;
rb->buf = b;
r->read_event_handler = ngx_http_read_client_request_body_handler;
r->write_event_handler = ngx_http_request_empty_handler;
rc = ngx_http_do_read_client_request_body(r);
goto done;
}
} else {
/* set rb->rest */
if (ngx_http_request_body_filter(r, NULL) != NGX_OK) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
}
if (rb->rest == 0) {
/* the whole request body was pre-read */
r->request_body_no_buffering = 0;
post_handler(r);
return NGX_OK;
}
if (rb->rest < 0) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"negative request body rest");
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
// 还有未处理完的body
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
size = clcf->client_body_buffer_size;
size += size >> 2;
/* TODO: honor r->request_body_in_single_buf */
if (!r->headers_in.chunked && rb->rest < size) {
size = (ssize_t) rb->rest;
if (r->request_body_in_single_buf) {
size += preread;
}
} else {
// client_body_buffer_size 还不够存剩余的body
size = clcf->client_body_buffer_size;
}
rb->buf = ngx_create_temp_buf(r->pool, size);
if (rb->buf == NULL) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
goto done;
}
r->read_event_handler = ngx_http_read_client_request_body_handler;
r->write_event_handler = ngx_http_request_empty_handler;
rc = ngx_http_do_read_client_request_body(r);
done:
if (r->request_body_no_buffering
&& (rc == NGX_OK || rc == NGX_AGAIN))
{
if (rc == NGX_OK) {
r->request_body_no_buffering = 0;
} else {
/* rc == NGX_AGAIN */
r->reading_body = 1;
}
r->read_event_handler = ngx_http_block_reading;
post_handler(r);
}
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
r->main->count--;
}
return rc;
}
ngx_http_request_body_save_filter函数 把bufs的body写到临时文件,如果rest==0时,也就是全部body写到文件中,则把保存body的临时文件赋到bufs。
ngx_int_t
ngx_http_request_body_save_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_request_body_t *rb;
rb = r->request_body;
// in 这个chain的实际的内容就是r->header_in这个buf指向的内存
// 或者是根据client_body_buffer_size分配的大块内存(ngx_http_read_client_request_body 函数中)
if (ngx_chain_add_copy(r->pool, &rb->bufs, in) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// 不缓存body内容,缓存区满了就调用处理函数处理body腾出缓存区
if (r->request_body_no_buffering) {
return NGX_OK;
}
// 还未读取完body
if (rb->rest > 0) {
// 已经分配了读取body所需的内存,且内存也用完了
// 则会调用ngx_http_write_request_body把内存中的内容写到临时文件中
if (rb->buf && rb->buf->last == rb->buf->end
&& ngx_http_write_request_body(r) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return NGX_OK;
}
/* rb->rest == 0 */
// body已经读取完毕
// 已经有部分body写入临时文件或配置要求body写入临时文件。
if (rb->temp_file || r->request_body_in_file_only) {
// bufs 链上的内容写入临时文件
if (ngx_http_write_request_body(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (rb->temp_file->file.offset != 0) {
cl = ngx_chain_get_free_buf(r->pool, &rb->free);
if (cl == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
b = cl->buf;
ngx_memzero(b, sizeof(ngx_buf_t));
b->in_file = 1;
b->file_last = rb->temp_file->file.offset;
b->file = &rb->temp_file->file;
rb->bufs = cl;
}
}
return NGX_OK;
}
ngx_http_write_request_body 把body内容写入临时文件,释放rb->bufs。
static ngx_int_t
ngx_http_write_request_body(ngx_http_request_t *r)
{
ssize_t n;
ngx_chain_t *cl, *ln;
ngx_temp_file_t *tf;
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
rb = r->request_body;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http write client request body, bufs %p", rb->bufs);
if (rb->temp_file == NULL) {
// 创建临时文件结构体
tf = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (tf == NULL) {
return NGX_ERROR;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
tf->file.fd = NGX_INVALID_FILE;
tf->file.log = r->connection->log;
tf->path = clcf->client_body_temp_path;
tf->pool = r->pool;
tf->warn = "a client request body is buffered to a temporary file";
tf->log_level = r->request_body_file_log_level;
tf->persistent = r->request_body_in_persistent_file;
tf->clean = r->request_body_in_clean_file;
if (r->request_body_file_group_access) {
tf->access = 0660;
}
rb->temp_file = tf;
if (rb->bufs == NULL) {
/* empty body with r->request_body_in_file_only */
if (ngx_create_temp_file(&tf->file, tf->path, tf->pool,
tf->persistent, tf->clean, tf->access)
!= NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
}
if (rb->bufs == NULL) {
return NGX_OK;
}
// 把buf chain中的内容写到临时文件
n = ngx_write_chain_to_temp_file(rb->temp_file, rb->bufs);
/* TODO: n == 0 or not complete and level event */
if (n == NGX_ERROR) {
return NGX_ERROR;
}
// 记录了临时文件偏移大小
rb->temp_file->offset += n;
/* mark all buffers as written */
for (cl = rb->bufs; cl; /* void */) {
cl->buf->pos = cl->buf->last; // 标注已写到临时文件中
ln = cl;
cl = cl->next;
ngx_free_chain(r->pool, ln);
}
rb->bufs = NULL;
return NGX_OK;
}
ngx_http_do_read_client_request_body 是真正调用recv接受客户端发送内容的函数。 会被可读事件回调函数ngx_http_read_client_request_body_handler触发执行。
static ngx_int_t
ngx_http_do_read_client_request_body(ngx_http_request_t *r)
{
off_t rest;
size_t size;
ssize_t n;
ngx_int_t rc;
ngx_chain_t out;
ngx_connection_t *c;
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
rb = r->request_body;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http read client request body");
for ( ;; ) {
for ( ;; ) {
// buf缓存区已用完
if (rb->buf->last == rb->buf->end) {
// buf缓存区没有处理完
if (rb->buf->pos != rb->buf->last) {
/* pass buffer to request body filter chain */
// out的buf是个指针,并没有分配新的ngx_buf_t
// 因此ngx_http_request_body_filter操作的就是原来分配的这个buf。
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
if (rc != NGX_OK) {
return rc;
}
} else {
/* update chains */
rc = ngx_http_request_body_filter(r, NULL);
if (rc != NGX_OK) {
return rc;
}
}
// ngx_chain_update_chains函数会更新busy这个chain。
if (rb->busy != NULL) {
// 缓存区内容被使用
if (r->request_body_no_buffering) {
// 不设置缓存,缓存区写满了也不会写入临时文件
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return NGX_AGAIN;
}
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// 缓存区的内容已经被写到临时文件,缓存区内容可以覆盖了。所以重置缓存区
rb->buf->pos = rb->buf->start;
rb->buf->last = rb->buf->start;
}
// size 是缓存区大小,rest是剩余body大小
size = rb->buf->end - rb->buf->last;
// rb->buf->last - rb->buf->pos 的内容是已经读取到缓存区未被处理的
rest = rb->rest - (rb->buf->last - rb->buf->pos);
if ((off_t) size > rest) {
size = (size_t) rest;
}
// linux环境是调用了ngx_unix_recv函数
n = c->recv(c, rb->buf->last, size);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http client request body recv %z", n);
if (n == NGX_AGAIN) {
break;
}
if (n == 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"client prematurely closed connection");
}
if (n == 0 || n == NGX_ERROR) {
c->error = 1;
return NGX_HTTP_BAD_REQUEST;
}
rb->buf->last += n;
r->request_length += n;
if (n == rest) {
// 读取完所有的body内容
/* pass buffer to request body filter chain */
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
if (rc != NGX_OK) {
return rc;
}
}
if (rb->rest == 0) {
break;
}
if (rb->buf->last < rb->buf->end) {
break;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http client request body rest %O", rb->rest);
if (rb->rest == 0) {
break;
}
// body未读完,且socket不可读
if (!c->read->ready) {
if (r->request_body_no_buffering
&& rb->buf->pos != rb->buf->last)
{
/* pass buffer to request body filter chain */
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
if (rc != NGX_OK) {
return rc;
}
}
// 设置body超时事件
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
ngx_add_timer(c->read, clcf->client_body_timeout);
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return NGX_AGAIN;
}
}
// body 读取完毕
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
// body 有缓冲区
if (!r->request_body_no_buffering) {
r->read_event_handler = ngx_http_block_reading;
rb->post_handler(r);
}
return NGX_OK;
}
buf缓存区参考
request 相关变量的意义
// 不缓存body内容,如果缓存区满了,会直接调用处理函数处理,不会写入临时文件。
r->request_body_no_buffering
// 指向接收body相关的结构体
r->request_body
// 包含请求头在内的长度
r->request_length
// body剩余读取长度
rb->rest
总结
- body 可能会缓存到两块内存上,也可能在一块内存上。或者可能会被全部写入临时文件。
- client_body_in_single_buffer 强制body写入到一块内存上,如果一块内存写不下就写到临时文件。如果body太大,大于(client_body_buffer_size+client_body_buffer_size/4)则会把body写入文件。则request_body变量也将获取不到body了。
- client_body_in_file_only 如果缓存body,则强制body写入临时文件。
- client_body_buffer_size 为缓存body分配的内存大小,如果body大于该大小则会把body写入临时文件。
大部分情况下都会缓存body,例如在配置了proxy_request_buffering为off,且proxy_pass_request_body为on,body不是chunked类型或proxy_http_version是1.1才可以不缓存body直接转发到上游。
附录:
事件驱动赋值
ngx_io = ngx_os_io;
ngx_event_actions = ngx_epoll_module_ctx.actions;
ngx_event_flags = NGX_USE_CLEAR_EVENT // 边沿触发
请求处理回调函数:
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
ls->handler = ngx_http_init_connection;
rev->handler = ngx_http_wait_request_handler;
c->write->handler = ngx_http_empty_handler;
size = cscf->client_header_buffer_size;
c->buffer = ngx_create_temp_buf(c->pool, size);
rev->handler = ngx_http_process_request_line;
r->read_event_handler = ngx_http_block_reading;
r->header_in = hc->busy ? hc->busy->buf : c->buffer;
r->main = r;
r->count = 1;
rev->handler = ngx_http_process_request_headers;
// 处理请求头
rc = ngx_http_process_request_header(r);
// 请求头处理结束以后调用该函数处理请求
ngx_http_process_request(r);
// 底层事件驱动函数,处理请求行、请求头一直再改底层读写回调函数
// 读取请求body时一直改为该函数
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
// 处理完请求头以后
ngx_http_handler(r);
r->write_event_handler = ngx_http_core_run_phases;
// 执行ngx http的11个阶段
ngx_http_core_run_phases(r);
ngx_http_core_find_config_phase(r, ph);
ngx_http_update_location_config(r);
if (r->content_handler) {
r->write_event_handler = ngx_http_request_empty_handler;
ngx_http_finalize_request(r, r->content_handler(r));
return NGX_OK;
}
ngx_http_proxy_handler(r);
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);