leevis.com icon indicating copy to clipboard operation
leevis.com copied to clipboard

ngx buf 和 buf_chain

Open vislee opened this issue 7 years ago • 0 comments

概述

ngx_buf 和 buf_chain 在nginx 被广泛使用,主要用在网络读。

struct ngx_buf_s {
    u_char          *pos;    // 未处理内存开始
    u_char          *last;    // 未处理内存结束
    off_t            file_pos;
    off_t            file_last;

    u_char          *start;         /* start of buffer */   // 内存开始
    u_char          *end;           /* end of buffer */    // 内存结束
    ngx_buf_tag_t    tag;    // 那个操作分配的,归还的时候使用
    ngx_file_t      *file;
    ngx_buf_t       *shadow;


    /* the buf's content could be changed */
    unsigned         temporary:1;    // 临时内存

    /*
     * the buf's content is in a memory cache or in a read only memory
     * and must not be changed
     */
    unsigned         memory:1;

    /* the buf's content is mmap()ed and must not be changed */
    unsigned         mmap:1;

    unsigned         recycled:1;
    unsigned         in_file:1;
    unsigned         flush:1;    // 是否需要刷新
    unsigned         sync:1;
    unsigned         last_buf:1;
    unsigned         last_in_chain:1;

    unsigned         last_shadow:1;
    unsigned         temp_file:1;

    /* STUB */ int   num;
};

struct ngx_chain_s {
    ngx_buf_t    *buf;
    ngx_chain_t  *next;
};

还有没用于网络写的ngx_output_chain_ctx_t


typedef struct {
    ngx_int_t    num; // 分配内存的个数
    size_t       size;    // 分配内存的大小 clcf->client_body_buffer_size
} ngx_bufs_t;

struct ngx_output_chain_ctx_s {
    ngx_buf_t                   *buf;
    ngx_chain_t                 *in;
    ngx_chain_t                 *free;    // 
    ngx_chain_t                 *busy;

    unsigned                     sendfile:1;
    unsigned                     directio:1;
    unsigned                     unaligned:1;
    unsigned                     need_in_memory:1;
    unsigned                     need_in_temp:1;
    unsigned                     aio:1;

#if (NGX_HAVE_FILE_AIO || NGX_COMPAT)
    ngx_output_chain_aio_pt      aio_handler;
#if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
    ssize_t                    (*aio_preload)(ngx_buf_t *file);
#endif
#endif

#if (NGX_THREADS || NGX_COMPAT)
    ngx_int_t                  (*thread_handler)(ngx_thread_task_t *task,
                                                 ngx_file_t *file);
    ngx_thread_task_t           *thread_task;
#endif

    off_t                        alignment;

    ngx_pool_t                  *pool;
    ngx_int_t                    allocated;
    ngx_bufs_t                   bufs;
    ngx_buf_tag_t                tag;

    ngx_output_chain_filter_pt   output_filter;  // 回调函数 ngx_chain_writer
    void                        *filter_ctx;                      // u->writer
};
#define ngx_alloc_buf(pool)  ngx_palloc(pool, sizeof(ngx_buf_t))
#define ngx_calloc_buf(pool) ngx_pcalloc(pool, sizeof(ngx_buf_t))
#define ngx_buf_size(b)                                                      \
    (ngx_buf_in_memory(b) ? (off_t) (b->last - b->pos):                      \
                            (b->file_last - b->file_pos))

ngx_buf_t *
ngx_create_temp_buf(ngx_pool_t *pool, size_t size)
{
    ngx_buf_t *b;

    b = ngx_calloc_buf(pool);
    if (b == NULL) {
        return NULL;
    }

    b->start = ngx_palloc(pool, size);
    if (b->start == NULL) {
        return NULL;
    }

    b->pos = b->start;
    b->last = b->start;
    b->end = b->last + size;
    b->temporary = 1;

    return b;
}

ngx_chain_t *
ngx_alloc_chain_link(ngx_pool_t *pool)
{
    ngx_chain_t  *cl;

    cl = pool->chain;

    if (cl) {
        pool->chain = cl->next;
        return cl;
    }

    cl = ngx_palloc(pool, sizeof(ngx_chain_t));
    if (cl == NULL) {
        return NULL;
    }

    return cl;
}

ngx_chain_t *
ngx_chain_get_free_buf(ngx_pool_t *p, ngx_chain_t **free)
{
    ngx_chain_t  *cl;

    if (*free) {
        cl = *free;
        *free = cl->next;
        cl->next = NULL;
        return cl;
    }

    cl = ngx_alloc_chain_link(p);
    if (cl == NULL) {
        return NULL;
    }

    cl->buf = ngx_calloc_buf(p);
    if (cl->buf == NULL) {
        return NULL;
    }

    cl->next = NULL;

    return cl;
}


// 重新分配chain 把in链表的buf挂到chain后面。
ngx_int_t
ngx_chain_add_copy(ngx_pool_t *pool, ngx_chain_t **chain, ngx_chain_t *in)
{
    ngx_chain_t  *cl, **ll;

    ll = chain;

    // 循环遍历,找到单链表的最后一个元素
    for (cl = *chain; cl; cl = cl->next) {
        ll = &cl->next;
    }

    while (in) {
        // 为什么要重新分配chain
        // buf没重新分配
        cl = ngx_alloc_chain_link(pool);
        if (cl == NULL) {
            return NGX_ERROR;
        }

        cl->buf = in->buf;
        *ll = cl;
        ll = &cl->next;
        in = in->next;
    }

    *ll = NULL;

    return NGX_OK;
}

void
ngx_chain_update_chains(ngx_pool_t *p, ngx_chain_t **free, ngx_chain_t **busy,
    ngx_chain_t **out, ngx_buf_tag_t tag)
{
    ngx_chain_t  *cl;

    if (*out) {
        if (*busy == NULL) {
            *busy = *out;

        } else {
            for (cl = *busy; cl->next; cl = cl->next) { /* void */ }

            cl->next = *out;
        }

        *out = NULL;
    }

    while (*busy) {
        cl = *busy;

        // buf 中的内容还未释放
        // ngx_http_write_request_body函数会释放
        if (ngx_buf_size(cl->buf) != 0) {
            break;
        }

        if (cl->buf->tag != tag) {
            *busy = cl->next;
            ngx_free_chain(p, cl);
            continue;
        }
        // 释放内存重新使用
        cl->buf->pos = cl->buf->start;
        cl->buf->last = cl->buf->start;

        *busy = cl->next;
        cl->next = *free;
        *free = cl;
    }
}

写出调用函数 向上游发送请求,ngx_http_upstream_send_request_body函数调用 向下游发送响应,ngx_http_copy_filter函数调用

ngx_int_t
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)
{
    off_t         bsize;
    ngx_int_t     rc, last;
    ngx_chain_t  *cl, *out, **last_out;

    if (ctx->in == NULL && ctx->busy == NULL) {
        /*
         * the short path for the case when the ctx->in and ctx->busy chains
         * are empty, the incoming chain is empty too or has the single buf
         * that does not require the copy
         */

        if (in == NULL) {
            return ctx->output_filter(ctx->filter_ctx, in);
        }

        if (in->next == NULL && ngx_output_chain_as_is(ctx, in->buf)) {
            return ctx->output_filter(ctx->filter_ctx, in);
        }
    }

    /* add the incoming buf to the chain ctx->in */

    if (in) {
        if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    out = NULL;
    last_out = &out;
    last = NGX_NONE;

    for ( ;; ) {
        while (ctx->in) {

            /*
             * cycle while there are the ctx->in bufs
             * and there are the free output bufs to copy in
             */

            bsize = ngx_buf_size(ctx->in->buf);

            if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {

                ngx_log_error(NGX_LOG_ALERT, ctx->pool->log, 0,
                              "zero size buf in output "
                              "t:%d r:%d f:%d %p %p-%p %p %O-%O",
                              ctx->in->buf->temporary,
                              ctx->in->buf->recycled,
                              ctx->in->buf->in_file,
                              ctx->in->buf->start,
                              ctx->in->buf->pos,
                              ctx->in->buf->last,
                              ctx->in->buf->file,
                              ctx->in->buf->file_pos,
                              ctx->in->buf->file_last);

                ngx_debug_point();

                ctx->in = ctx->in->next;

                continue;
            }

            if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {

                /* move the chain link to the output chain */

                cl = ctx->in;
                ctx->in = cl->next;

                *last_out = cl;
                last_out = &cl->next;
                cl->next = NULL;

                continue;
            }

            if (ctx->buf == NULL) {

                rc = ngx_output_chain_align_file_buf(ctx, bsize);

                if (rc == NGX_ERROR) {
                    return NGX_ERROR;
                }

                if (rc != NGX_OK) {

                    if (ctx->free) {

                        /* get the free buf */

                        cl = ctx->free;
                        ctx->buf = cl->buf;
                        ctx->free = cl->next;

                        ngx_free_chain(ctx->pool, cl);

                    } else if (out || ctx->allocated == ctx->bufs.num) {

                        break;

                    } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {
                       // 分配内存
                        return NGX_ERROR;
                    }
                }
            }
            // 内存拷贝
            rc = ngx_output_chain_copy_buf(ctx);

            if (rc == NGX_ERROR) {
                return rc;
            }

            if (rc == NGX_AGAIN) {
                if (out) {
                    break;
                }

                return rc;
            }

            /* delete the completed buf from the ctx->in chain */

            if (ngx_buf_size(ctx->in->buf) == 0) {
                ctx->in = ctx->in->next;
            }

            cl = ngx_alloc_chain_link(ctx->pool);
            if (cl == NULL) {
                return NGX_ERROR;
            }

            cl->buf = ctx->buf;
            cl->next = NULL;
            *last_out = cl;
            last_out = &cl->next;
            ctx->buf = NULL;
        }

        if (out == NULL && last != NGX_NONE) {

            if (ctx->in) {
                return NGX_AGAIN;
            }

            return last;
        }

        last = ctx->output_filter(ctx->filter_ctx, out);

        if (last == NGX_ERROR || last == NGX_DONE) {
            return last;
        }

        ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
                                ctx->tag);
        last_out = &out;
    }
}

通过ngx_alloc_buf 和 ngx_calloc_buf 宏来分配ngx_buf_s结构体。 ngx_buf_size 计算buf的大小,也就是说存了多少数据。待处理内容的长度。 通过ngx_create_temp_buf在内存创建buf,其中start指向内存起始位置,end指向内存结束位置。last一般用来指示内存使用位置,pos表示内存使用开始位置,也就是说pos前的内容是已经处理过的了。 ngx_alloc_chain_link 可以分配一个链的结构。 ngx_create_chain_of_bufs 分配一组buf chain,ngx_bufs_t 的num为buf的个数,size为buf的大小。 ngx_chain_add_copy(ngx_pool_t *pool, ngx_chain_t **chain, ngx_chain_t *in)把in这个chain上的结构复制到chain上,实际buf的内容没有复制。 ngx_chain_get_free_buf(ngx_pool_t *p, ngx_chain_t **free) 从free这个chain链上取一个chain,如果没有就分配一个。 ngx_chain_update_chains(ngx_pool_t *p, ngx_chain_t **free, ngx_chain_t **busy, ngx_chain_t **out, ngx_buf_tag_t tag) 把busy和out上没有内容的buf放回p上货free上,tag相同的就放回free上。 ngx_output_chain_add_copy(ngx_pool_t *pool, ngx_chain_t chain, ngx_chain_t *in) 把in这个chain挂到chain的结尾,重新分配chain。

使用

buf chain 在ngx_http_request_body_filter函数中大量使用,如果想要搞清楚这个函数的就必须先了解了buf 和chain。同时看这个函数的实现也可以更加清晰的理解buf 和 chain。 我们只看该函数调用的一个函数ngx_http_request_body_length_filter,该函数处理非chunked类型的body。

先看下函数的调用和传值, 在ngx_http_read_client_request_body函数中

out.buf = r->header_in;    // 这个buf直接指向ngx_create_temp_buf返回
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);

在ngx_http_do_read_client_request_body中,

// rb->buf = ngx_create_temp_buf(r->pool, size); 在ngx_http_read_client_request_body函数中。
out.buf = rb->buf;
out.next = NULL;
rc = ngx_http_request_body_filter(r, &out);

综上,在调用ngx_http_request_body_filter传递的值都直接指向recv设置的buf。在ngx_http_request_body_filter 函数所调用的ngx_http_request_body_length_filter函数中,会调用ngx_chain_get_free_buf函数分配chain和buf,看下该函数的完整实现。

static ngx_int_t
ngx_http_request_body_length_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    size_t                     size;
    ngx_int_t                  rc;
    ngx_buf_t                 *b;
    ngx_chain_t               *cl, *tl, *out, **ll;
    ngx_http_request_body_t   *rb;

    rb = r->request_body;

    if (rb->rest == -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http request body content length filter");

        rb->rest = r->headers_in.content_length_n;
    }

    out = NULL;
    ll = &out;

    for (cl = in; cl; cl = cl->next) {

        if (rb->rest == 0) {
            break;
        }
        // 分配了新的chain和buf
        tl = ngx_chain_get_free_buf(r->pool, &rb->free);
        if (tl == NULL) {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }
        // 新的buf
        b = tl->buf;

        ngx_memzero(b, sizeof(ngx_buf_t));
        // 新的buf赋值
        b->temporary = 1;
        b->tag = (ngx_buf_tag_t) &ngx_http_read_client_request_body;
        b->start = cl->buf->pos;
        b->pos = cl->buf->pos;
        b->last = cl->buf->last;
        b->end = cl->buf->end;
        b->flush = r->request_body_no_buffering;

        size = cl->buf->last - cl->buf->pos;

        // 还有req body没有读到入buff
        if ((off_t) size < rb->rest) {
            // 理解的关键点
            // 操作的原buf,recv也是操作的该buf
            cl->buf->pos = cl->buf->last;
            rb->rest -= size;

        } else {
            cl->buf->pos += (size_t) rb->rest;
            rb->rest = 0;
            b->last = cl->buf->pos;
            b->last_buf = 1;
        }

        *ll = tl;
        ll = &tl->next;
    }

    // 调用该函数传递的是复制过的buf。
    // ngx_http_top_request_body_filter指向ngx_http_request_body_save_filter函数
    // 该函数主要调用了ngx_chain_add_copy和ngx_http_write_request_body
    rc = ngx_http_top_request_body_filter(r, out);

    ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &out,
                            (ngx_buf_tag_t) &ngx_http_read_client_request_body);

    return rc;
}

看下该函数ngx_chain_add_copy,在该函数中又重新分配了chain,添加到rb->bufs这个队列结尾。此时out没有被清空。

如果r->request_body_no_buffering为真就直接返回调用ngx_chain_update_chains了,该函数把out赋值给rb->busy,out赋值为空。这就是为什么ngx_chain_add_copy函数又分配了chain的原因。 现在rb->bufs和rb->busy的chain都指向相同的buf。

如果r->request_body_no_buffering不为真,则调用ngx_http_write_request_body函数,把bufs链表中的buf指向的内存写入到文件中,然后更新bufs中的buf,因bufs和busy链表的buf是相同的。所以在该函数返回再调用ngx_chain_update_chains。

ngx_event_pipe

ngx_event_pipe 带缓存区的管道,在upstream模块使用,把上游的响应body通过pipe发送到下游。 ngx_http_upstream_t结构体引用了pipe结构体,ngx_event_pipe_t *pipe; pipe结构体定义如下:

struct ngx_event_pipe_s {
    ngx_connection_t  *upstream;  // 在ngx_http_upstream_send_response中赋为:u->peer.connection;
    ngx_connection_t  *downstream;    // 赋为 r->connection;

    ngx_chain_t       *free_raw_bufs;
    ngx_chain_t       *in;   // 读到的所有resp body
    ngx_chain_t      **last_in;

    ngx_chain_t       *writing;

    ngx_chain_t       *out;
    ngx_chain_t       *free;
    ngx_chain_t       *busy;

    /*
     * the input filter i.e. that moves HTTP/1.1 chunks
     * from the raw bufs to an incoming chain
     */

    // 从上游读入数据到管道缓冲区的回调处理函数
    ngx_event_pipe_input_filter_pt    input_filter;     //  在ngx_http_proxy_handler函数中赋为:ngx_http_proxy_copy_filter
    void                             *input_ctx;                         // 赋为:r

    // 把数据从管道缓冲区写入下游的回调处理函数
    ngx_event_pipe_output_filter_pt   output_filter;  // 在ngx_http_upstream_send_response函数中被赋为:ngx_http_upstream_output_filter
    void                             *output_ctx;     // 赋为:r

#if (NGX_THREADS || NGX_COMPAT)
    ngx_int_t                       (*thread_handler)(ngx_thread_task_t *task,
                                                      ngx_file_t *file);
    void                             *thread_ctx;
    ngx_thread_task_t                *thread_task;
#endif

    unsigned           read:1;    // 标识读到了内容
    unsigned           cacheable:1;
    unsigned           single_buf:1;
    unsigned           free_bufs:1;
    unsigned           upstream_done:1;   // 读完
    unsigned           upstream_error:1;
    unsigned           upstream_eof:1;  // read返回0,读到结束了
    unsigned           upstream_blocked:1;
    unsigned           downstream_done:1;
    unsigned           downstream_error:1;
    unsigned           cyclic_temp_file:1;
    unsigned           aio:1;

    ngx_int_t          allocated;  // 已经分配缓存块个数
    ngx_bufs_t         bufs;    //  指令proxy_buffers的结果,缓存resp body
    ngx_buf_tag_t      tag;

    ssize_t            busy_size;

    off_t              read_length;  // 已读到的resp body长度
    off_t              length;      // resp body总长度。input_filter_init中赋值。

    off_t              max_temp_file_size;
    ssize_t            temp_file_write_size;

    ngx_msec_t         read_timeout; // 读超时
    ngx_msec_t         send_timeout; // 写超时
    ssize_t            send_lowat;  // 

    ngx_pool_t        *pool;
    ngx_log_t         *log;

    ngx_chain_t       *preread_bufs;  // 读响应头时读到的resp body。
    size_t             preread_size;   // 读响应头读到的resp body的长度。
    ngx_buf_t         *buf_to_file;

    size_t             limit_rate;
    time_t             start_sec;

    ngx_temp_file_t   *temp_file;

    /* STUB */ int     num;
};

vislee avatar Feb 28 '17 12:02 vislee