leevis.com
leevis.com copied to clipboard
ngx buf 和 buf_chain
概述
ngx_buf 和 buf_chain 在nginx 被广泛使用,主要用在网络读。
struct ngx_buf_s {
u_char *pos; // 未处理内存开始
u_char *last; // 未处理内存结束
off_t file_pos;
off_t file_last;
u_char *start; /* start of buffer */ // 内存开始
u_char *end; /* end of buffer */ // 内存结束
ngx_buf_tag_t tag; // 那个操作分配的,归还的时候使用
ngx_file_t *file;
ngx_buf_t *shadow;
/* the buf's content could be changed */
unsigned temporary:1; // 临时内存
/*
* the buf's content is in a memory cache or in a read only memory
* and must not be changed
*/
unsigned memory:1;
/* the buf's content is mmap()ed and must not be changed */
unsigned mmap:1;
unsigned recycled:1;
unsigned in_file:1;
unsigned flush:1; // 是否需要刷新
unsigned sync:1;
unsigned last_buf:1;
unsigned last_in_chain:1;
unsigned last_shadow:1;
unsigned temp_file:1;
/* STUB */ int num;
};
struct ngx_chain_s {
ngx_buf_t *buf;
ngx_chain_t *next;
};
还有没用于网络写的ngx_output_chain_ctx_t
typedef struct {
ngx_int_t num; // 分配内存的个数
size_t size; // 分配内存的大小 clcf->client_body_buffer_size
} ngx_bufs_t;
struct ngx_output_chain_ctx_s {
ngx_buf_t *buf;
ngx_chain_t *in;
ngx_chain_t *free; //
ngx_chain_t *busy;
unsigned sendfile:1;
unsigned directio:1;
unsigned unaligned:1;
unsigned need_in_memory:1;
unsigned need_in_temp:1;
unsigned aio:1;
#if (NGX_HAVE_FILE_AIO || NGX_COMPAT)
ngx_output_chain_aio_pt aio_handler;
#if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
ssize_t (*aio_preload)(ngx_buf_t *file);
#endif
#endif
#if (NGX_THREADS || NGX_COMPAT)
ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
ngx_file_t *file);
ngx_thread_task_t *thread_task;
#endif
off_t alignment;
ngx_pool_t *pool;
ngx_int_t allocated;
ngx_bufs_t bufs;
ngx_buf_tag_t tag;
ngx_output_chain_filter_pt output_filter; // 回调函数 ngx_chain_writer
void *filter_ctx; // u->writer
};
#define ngx_alloc_buf(pool) ngx_palloc(pool, sizeof(ngx_buf_t))
#define ngx_calloc_buf(pool) ngx_pcalloc(pool, sizeof(ngx_buf_t))
#define ngx_buf_size(b) \
(ngx_buf_in_memory(b) ? (off_t) (b->last - b->pos): \
(b->file_last - b->file_pos))
ngx_buf_t *
ngx_create_temp_buf(ngx_pool_t *pool, size_t size)
{
ngx_buf_t *b;
b = ngx_calloc_buf(pool);
if (b == NULL) {
return NULL;
}
b->start = ngx_palloc(pool, size);
if (b->start == NULL) {
return NULL;
}
b->pos = b->start;
b->last = b->start;
b->end = b->last + size;
b->temporary = 1;
return b;
}
ngx_chain_t *
ngx_alloc_chain_link(ngx_pool_t *pool)
{
ngx_chain_t *cl;
cl = pool->chain;
if (cl) {
pool->chain = cl->next;
return cl;
}
cl = ngx_palloc(pool, sizeof(ngx_chain_t));
if (cl == NULL) {
return NULL;
}
return cl;
}
ngx_chain_t *
ngx_chain_get_free_buf(ngx_pool_t *p, ngx_chain_t **free)
{
ngx_chain_t *cl;
if (*free) {
cl = *free;
*free = cl->next;
cl->next = NULL;
return cl;
}
cl = ngx_alloc_chain_link(p);
if (cl == NULL) {
return NULL;
}
cl->buf = ngx_calloc_buf(p);
if (cl->buf == NULL) {
return NULL;
}
cl->next = NULL;
return cl;
}
// 重新分配chain 把in链表的buf挂到chain后面。
ngx_int_t
ngx_chain_add_copy(ngx_pool_t *pool, ngx_chain_t **chain, ngx_chain_t *in)
{
ngx_chain_t *cl, **ll;
ll = chain;
// 循环遍历,找到单链表的最后一个元素
for (cl = *chain; cl; cl = cl->next) {
ll = &cl->next;
}
while (in) {
// 为什么要重新分配chain
// buf没重新分配
cl = ngx_alloc_chain_link(pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = in->buf;
*ll = cl;
ll = &cl->next;
in = in->next;
}
*ll = NULL;
return NGX_OK;
}
void
ngx_chain_update_chains(ngx_pool_t *p, ngx_chain_t **free, ngx_chain_t **busy,
ngx_chain_t **out, ngx_buf_tag_t tag)
{
ngx_chain_t *cl;
if (*out) {
if (*busy == NULL) {
*busy = *out;
} else {
for (cl = *busy; cl->next; cl = cl->next) { /* void */ }
cl->next = *out;
}
*out = NULL;
}
while (*busy) {
cl = *busy;
// buf 中的内容还未释放
// ngx_http_write_request_body函数会释放
if (ngx_buf_size(cl->buf) != 0) {
break;
}
if (cl->buf->tag != tag) {
*busy = cl->next;
ngx_free_chain(p, cl);
continue;
}
// 释放内存重新使用
cl->buf->pos = cl->buf->start;
cl->buf->last = cl->buf->start;
*busy = cl->next;
cl->next = *free;
*free = cl;
}
}
写出调用函数 向上游发送请求,ngx_http_upstream_send_request_body函数调用 向下游发送响应,ngx_http_copy_filter函数调用
ngx_int_t
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)
{
off_t bsize;
ngx_int_t rc, last;
ngx_chain_t *cl, *out, **last_out;
if (ctx->in == NULL && ctx->busy == NULL) {
/*
* the short path for the case when the ctx->in and ctx->busy chains
* are empty, the incoming chain is empty too or has the single buf
* that does not require the copy
*/
if (in == NULL) {
return ctx->output_filter(ctx->filter_ctx, in);
}
if (in->next == NULL && ngx_output_chain_as_is(ctx, in->buf)) {
return ctx->output_filter(ctx->filter_ctx, in);
}
}
/* add the incoming buf to the chain ctx->in */
if (in) {
if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
return NGX_ERROR;
}
}
out = NULL;
last_out = &out;
last = NGX_NONE;
for ( ;; ) {
while (ctx->in) {
/*
* cycle while there are the ctx->in bufs
* and there are the free output bufs to copy in
*/
bsize = ngx_buf_size(ctx->in->buf);
if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {
ngx_log_error(NGX_LOG_ALERT, ctx->pool->log, 0,
"zero size buf in output "
"t:%d r:%d f:%d %p %p-%p %p %O-%O",
ctx->in->buf->temporary,
ctx->in->buf->recycled,
ctx->in->buf->in_file,
ctx->in->buf->start,
ctx->in->buf->pos,
ctx->in->buf->last,
ctx->in->buf->file,
ctx->in->buf->file_pos,
ctx->in->buf->file_last);
ngx_debug_point();
ctx->in = ctx->in->next;
continue;
}
if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {
/* move the chain link to the output chain */
cl = ctx->in;
ctx->in = cl->next;
*last_out = cl;
last_out = &cl->next;
cl->next = NULL;
continue;
}
if (ctx->buf == NULL) {
rc = ngx_output_chain_align_file_buf(ctx, bsize);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}
if (rc != NGX_OK) {
if (ctx->free) {
/* get the free buf */
cl = ctx->free;
ctx->buf = cl->buf;
ctx->free = cl->next;
ngx_free_chain(ctx->pool, cl);
} else if (out || ctx->allocated == ctx->bufs.num) {
break;
} else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {
// 分配内存
return NGX_ERROR;
}
}
}
// 内存拷贝
rc = ngx_output_chain_copy_buf(ctx);
if (rc == NGX_ERROR) {
return rc;
}
if (rc == NGX_AGAIN) {
if (out) {
break;
}
return rc;
}
/* delete the completed buf from the ctx->in chain */
if (ngx_buf_size(ctx->in->buf) == 0) {
ctx->in = ctx->in->next;
}
cl = ngx_alloc_chain_link(ctx->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ctx->buf;
cl->next = NULL;
*last_out = cl;
last_out = &cl->next;
ctx->buf = NULL;
}
if (out == NULL && last != NGX_NONE) {
if (ctx->in) {
return NGX_AGAIN;
}
return last;
}
last = ctx->output_filter(ctx->filter_ctx, out);
if (last == NGX_ERROR || last == NGX_DONE) {
return last;
}
ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
ctx->tag);
last_out = &out;
}
}
通过ngx_alloc_buf 和 ngx_calloc_buf 宏来分配ngx_buf_s结构体。 ngx_buf_size 计算buf的大小,也就是说存了多少数据。待处理内容的长度。 通过ngx_create_temp_buf在内存创建buf,其中start指向内存起始位置,end指向内存结束位置。last一般用来指示内存使用位置,pos表示内存使用开始位置,也就是说pos前的内容是已经处理过的了。 ngx_alloc_chain_link 可以分配一个链的结构。 ngx_create_chain_of_bufs 分配一组buf chain,ngx_bufs_t 的num为buf的个数,size为buf的大小。 ngx_chain_add_copy(ngx_pool_t *pool, ngx_chain_t **chain, ngx_chain_t *in)把in这个chain上的结构复制到chain上,实际buf的内容没有复制。 ngx_chain_get_free_buf(ngx_pool_t *p, ngx_chain_t **free) 从free这个chain链上取一个chain,如果没有就分配一个。 ngx_chain_update_chains(ngx_pool_t *p, ngx_chain_t **free, ngx_chain_t **busy, ngx_chain_t **out, ngx_buf_tag_t tag) 把busy和out上没有内容的buf放回p上货free上,tag相同的就放回free上。 ngx_output_chain_add_copy(ngx_pool_t *pool, ngx_chain_t chain, ngx_chain_t *in) 把in这个chain挂到chain的结尾,重新分配chain。
使用
buf chain 在ngx_http_request_body_filter函数中大量使用,如果想要搞清楚这个函数的就必须先了解了buf 和chain。同时看这个函数的实现也可以更加清晰的理解buf 和 chain。 我们只看该函数调用的一个函数ngx_http_request_body_length_filter,该函数处理非chunked类型的body。
先看下函数的调用和传值, 在ngx_http_read_client_request_body函数中
out.buf = r->header_in; // 这个buf直接指向ngx_create_temp_buf返回
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
在ngx_http_do_read_client_request_body中,
// rb->buf = ngx_create_temp_buf(r->pool, size); 在ngx_http_read_client_request_body函数中。
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);
综上,在调用ngx_http_request_body_filter传递的值都直接指向recv设置的buf。在ngx_http_request_body_filter 函数所调用的ngx_http_request_body_length_filter函数中,会调用ngx_chain_get_free_buf函数分配chain和buf,看下该函数的完整实现。
static ngx_int_t
ngx_http_request_body_length_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
size_t size;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *cl, *tl, *out, **ll;
ngx_http_request_body_t *rb;
rb = r->request_body;
if (rb->rest == -1) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http request body content length filter");
rb->rest = r->headers_in.content_length_n;
}
out = NULL;
ll = &out;
for (cl = in; cl; cl = cl->next) {
if (rb->rest == 0) {
break;
}
// 分配了新的chain和buf
tl = ngx_chain_get_free_buf(r->pool, &rb->free);
if (tl == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// 新的buf
b = tl->buf;
ngx_memzero(b, sizeof(ngx_buf_t));
// 新的buf赋值
b->temporary = 1;
b->tag = (ngx_buf_tag_t) &ngx_http_read_client_request_body;
b->start = cl->buf->pos;
b->pos = cl->buf->pos;
b->last = cl->buf->last;
b->end = cl->buf->end;
b->flush = r->request_body_no_buffering;
size = cl->buf->last - cl->buf->pos;
// 还有req body没有读到入buff
if ((off_t) size < rb->rest) {
// 理解的关键点
// 操作的原buf,recv也是操作的该buf
cl->buf->pos = cl->buf->last;
rb->rest -= size;
} else {
cl->buf->pos += (size_t) rb->rest;
rb->rest = 0;
b->last = cl->buf->pos;
b->last_buf = 1;
}
*ll = tl;
ll = &tl->next;
}
// 调用该函数传递的是复制过的buf。
// ngx_http_top_request_body_filter指向ngx_http_request_body_save_filter函数
// 该函数主要调用了ngx_chain_add_copy和ngx_http_write_request_body
rc = ngx_http_top_request_body_filter(r, out);
ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &out,
(ngx_buf_tag_t) &ngx_http_read_client_request_body);
return rc;
}
看下该函数ngx_chain_add_copy,在该函数中又重新分配了chain,添加到rb->bufs这个队列结尾。此时out没有被清空。
如果r->request_body_no_buffering为真就直接返回调用ngx_chain_update_chains了,该函数把out赋值给rb->busy,out赋值为空。这就是为什么ngx_chain_add_copy函数又分配了chain的原因。 现在rb->bufs和rb->busy的chain都指向相同的buf。
如果r->request_body_no_buffering不为真,则调用ngx_http_write_request_body函数,把bufs链表中的buf指向的内存写入到文件中,然后更新bufs中的buf,因bufs和busy链表的buf是相同的。所以在该函数返回再调用ngx_chain_update_chains。
ngx_event_pipe
ngx_event_pipe 带缓存区的管道,在upstream模块使用,把上游的响应body通过pipe发送到下游。
ngx_http_upstream_t结构体引用了pipe结构体,ngx_event_pipe_t *pipe;
pipe结构体定义如下:
struct ngx_event_pipe_s {
ngx_connection_t *upstream; // 在ngx_http_upstream_send_response中赋为:u->peer.connection;
ngx_connection_t *downstream; // 赋为 r->connection;
ngx_chain_t *free_raw_bufs;
ngx_chain_t *in; // 读到的所有resp body
ngx_chain_t **last_in;
ngx_chain_t *writing;
ngx_chain_t *out;
ngx_chain_t *free;
ngx_chain_t *busy;
/*
* the input filter i.e. that moves HTTP/1.1 chunks
* from the raw bufs to an incoming chain
*/
// 从上游读入数据到管道缓冲区的回调处理函数
ngx_event_pipe_input_filter_pt input_filter; // 在ngx_http_proxy_handler函数中赋为:ngx_http_proxy_copy_filter
void *input_ctx; // 赋为:r
// 把数据从管道缓冲区写入下游的回调处理函数
ngx_event_pipe_output_filter_pt output_filter; // 在ngx_http_upstream_send_response函数中被赋为:ngx_http_upstream_output_filter
void *output_ctx; // 赋为:r
#if (NGX_THREADS || NGX_COMPAT)
ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
ngx_file_t *file);
void *thread_ctx;
ngx_thread_task_t *thread_task;
#endif
unsigned read:1; // 标识读到了内容
unsigned cacheable:1;
unsigned single_buf:1;
unsigned free_bufs:1;
unsigned upstream_done:1; // 读完
unsigned upstream_error:1;
unsigned upstream_eof:1; // read返回0,读到结束了
unsigned upstream_blocked:1;
unsigned downstream_done:1;
unsigned downstream_error:1;
unsigned cyclic_temp_file:1;
unsigned aio:1;
ngx_int_t allocated; // 已经分配缓存块个数
ngx_bufs_t bufs; // 指令proxy_buffers的结果,缓存resp body
ngx_buf_tag_t tag;
ssize_t busy_size;
off_t read_length; // 已读到的resp body长度
off_t length; // resp body总长度。input_filter_init中赋值。
off_t max_temp_file_size;
ssize_t temp_file_write_size;
ngx_msec_t read_timeout; // 读超时
ngx_msec_t send_timeout; // 写超时
ssize_t send_lowat; //
ngx_pool_t *pool;
ngx_log_t *log;
ngx_chain_t *preread_bufs; // 读响应头时读到的resp body。
size_t preread_size; // 读响应头读到的resp body的长度。
ngx_buf_t *buf_to_file;
size_t limit_rate;
time_t start_sec;
ngx_temp_file_t *temp_file;
/* STUB */ int num;
};