leevis.com icon indicating copy to clipboard operation
leevis.com copied to clipboard

nginx读取body

Open vislee opened this issue 7 years ago • 0 comments

nginx作为代理服务器,需要读取body后转发。以proxy_pass这个指令为例说明。如下:

ngx_http_read_client_request_body(r, ngx_http_upstream_init)

读取body后向上游发起请求。在此暂时不介绍upstream相关的,仅仅介绍读取body。

首先会为保存body分配如下一个结构体:

typedef struct {
    ngx_temp_file_t                  *temp_file; // 临时文件
    ngx_chain_t                      *bufs;   //  保存了读取到的body位置,可能是在内存,也可能在文件
    ngx_buf_t                        *buf;    // 读取body缓存区
    off_t                             rest;  //  未读取body大小
    off_t                             received; //
    ngx_chain_t                      *free;  //  分配buf时使用,已经回收可以再分配的buf。
    ngx_chain_t                      *busy;  // 分配buf时使用,待回收的buf,buf指向的内存都是对应r->pool上的。
    ngx_http_chunked_t               *chunked;  //
    ngx_http_client_body_handler_pt   post_handler; // 读body后的回调函数。
} ngx_http_request_body_t;

判断r->header_in 这个buf是否已经有读到的body。

  • 如果有

    • 调用ngx_http_request_body_filter复制,该函数判断如果不是chunked类型的body,则调用ngx_http_request_body_length_filter函数,该函数把读取回来的body的buf chain再复制一份出来(body本身的内容不复制,复制的是指向内容的指针),调用ngx_http_top_request_body_filter。 ngx_http_top_request_body_filter是个函数指针,指向ngx_http_request_body_save_filter。 该函数会把buf chain再复制到rb->bufs这个chain后面,如果rb->buf已经用完,则调用ngx_http_write_request_body函数,该函数会创建一个临时文件,把rb->bufs这个chain的内容写到文件中。
    • 如果body不是chunked类型,且还有body未读取完毕,且header_in所指向的缓存区 够保存 剩余未读取的body,则让rb->buf指向剩余缓存区。
  • 如果没有,则调用ngx_http_request_body_filter更新rb->rest。如果rb->rest 是0则没有body或body读取完毕,则更新request_body_no_buffering为0,调用设置的回调函数返回。 rb->rest小于0,则返回错误。 否则,根据配置的client_body_buffer_size分配一块内存,地址存到rb->buf。然后调用ngx_http_do_read_client_request_body处理。

ngx_http_read_client_request_body 函数

ngx_int_t
ngx_http_read_client_request_body(ngx_http_request_t *r,
    ngx_http_client_body_handler_pt post_handler)
{
    size_t                     preread;
    ssize_t                    size;
    ngx_int_t                  rc;
    ngx_buf_t                 *b;
    ngx_chain_t                out;
    ngx_http_request_body_t   *rb;
    ngx_http_core_loc_conf_t  *clcf;

    r->main->count++;  // 请求结束时候会判单

    if (r != r->main || r->request_body || r->discard_body) {
        r->request_body_no_buffering = 0;
        post_handler(r);
        return NGX_OK;
    }

    if (ngx_http_test_expect(r) != NGX_OK) {
        rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
        goto done;
    }

    rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
    if (rb == NULL) {
        rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
        goto done;
    }

    /*
     * set by ngx_pcalloc():
     *
     *     rb->bufs = NULL;
     *     rb->buf = NULL;
     *     rb->free = NULL;
     *     rb->busy = NULL;
     *     rb->chunked = NULL;
     */

    rb->rest = -1;
    rb->post_handler = post_handler;

    r->request_body = rb;

    // 没有请求body
    if (r->headers_in.content_length_n < 0 && !r->headers_in.chunked) {
        r->request_body_no_buffering = 0;
        post_handler(r);
        return NGX_OK;
    }

#if (NGX_HTTP_V2)
    if (r->stream) {
        rc = ngx_http_v2_read_request_body(r);
        goto done;
    }
#endif

    preread = r->header_in->last - r->header_in->pos;

    if (preread) {
        // 已经读了一部分body
        /* there is the pre-read part of the request body */

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http client request body preread %uz", preread);

        out.buf = r->header_in;
        out.next = NULL;
        // 主要是调用ngx_http_request_body_save_filter保存到rb->bufs 或者文件中
        rc = ngx_http_request_body_filter(r, &out);

        if (rc != NGX_OK) {
            goto done;
        }

        r->request_length += preread - (r->header_in->last - r->header_in->pos);

        // 请求缓冲区够用
        if (!r->headers_in.chunked
            && rb->rest > 0
            && rb->rest <= (off_t) (r->header_in->end - r->header_in->last))
        {
            /* the whole request body may be placed in r->header_in */

            b = ngx_calloc_buf(r->pool);
            if (b == NULL) {
                rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
                goto done;
            }

            b->temporary = 1;
            b->start = r->header_in->pos;
            b->pos = r->header_in->pos;
            b->last = r->header_in->last;
            b->end = r->header_in->end;

            rb->buf = b;

            r->read_event_handler = ngx_http_read_client_request_body_handler;
            r->write_event_handler = ngx_http_request_empty_handler;

            rc = ngx_http_do_read_client_request_body(r);
            goto done;
        }

    } else {
        /* set rb->rest */

        if (ngx_http_request_body_filter(r, NULL) != NGX_OK) {
            rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
            goto done;
        }
    }

    if (rb->rest == 0) {
        /* the whole request body was pre-read */
        r->request_body_no_buffering = 0;
        post_handler(r);
        return NGX_OK;
    }

    if (rb->rest < 0) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "negative request body rest");
        rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
        goto done;
    }

    // 还有未处理完的body
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    size = clcf->client_body_buffer_size;
    size += size >> 2;

    /* TODO: honor r->request_body_in_single_buf */

    if (!r->headers_in.chunked && rb->rest < size) {
        size = (ssize_t) rb->rest;

        if (r->request_body_in_single_buf) {
            size += preread;
        }

    } else {
        // client_body_buffer_size 还不够存剩余的body
        size = clcf->client_body_buffer_size;
    }

    rb->buf = ngx_create_temp_buf(r->pool, size);
    if (rb->buf == NULL) {
        rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
        goto done;
    }

    r->read_event_handler = ngx_http_read_client_request_body_handler;
    r->write_event_handler = ngx_http_request_empty_handler;

    rc = ngx_http_do_read_client_request_body(r);

done:

    if (r->request_body_no_buffering
        && (rc == NGX_OK || rc == NGX_AGAIN))
    {
        if (rc == NGX_OK) {
            r->request_body_no_buffering = 0;

        } else {
            /* rc == NGX_AGAIN */
            r->reading_body = 1;
        }

        r->read_event_handler = ngx_http_block_reading;
        post_handler(r);
    }

    if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
        r->main->count--;
    }

    return rc;
}

ngx_http_request_body_save_filter函数 把bufs的body写到临时文件,如果rest==0时,也就是全部body写到文件中,则把保存body的临时文件赋到bufs。

ngx_int_t
ngx_http_request_body_save_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_buf_t                 *b;
    ngx_chain_t               *cl;
    ngx_http_request_body_t   *rb;

    rb = r->request_body;

    // in 这个chain的实际的内容就是r->header_in这个buf指向的内存
    //     或者是根据client_body_buffer_size分配的大块内存(ngx_http_read_client_request_body 函数中)
    if (ngx_chain_add_copy(r->pool, &rb->bufs, in) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }


    // 不缓存body内容,缓存区满了就调用处理函数处理body腾出缓存区
    if (r->request_body_no_buffering) {
        return NGX_OK;
    }

    // 还未读取完body
    if (rb->rest > 0) {
        // 已经分配了读取body所需的内存,且内存也用完了
        // 则会调用ngx_http_write_request_body把内存中的内容写到临时文件中
        if (rb->buf && rb->buf->last == rb->buf->end
            && ngx_http_write_request_body(r) != NGX_OK)
        {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        return NGX_OK;
    }

    /* rb->rest == 0 */
    // body已经读取完毕
    // 已经有部分body写入临时文件或配置要求body写入临时文件。
    if (rb->temp_file || r->request_body_in_file_only) {
        // bufs 链上的内容写入临时文件
        if (ngx_http_write_request_body(r) != NGX_OK) {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        if (rb->temp_file->file.offset != 0) {

            cl = ngx_chain_get_free_buf(r->pool, &rb->free);
            if (cl == NULL) {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
            }

            b = cl->buf;

            ngx_memzero(b, sizeof(ngx_buf_t));

            b->in_file = 1;
            b->file_last = rb->temp_file->file.offset;
            b->file = &rb->temp_file->file;

            rb->bufs = cl;
        }
    }

    return NGX_OK;
}

ngx_http_write_request_body 把body内容写入临时文件,释放rb->bufs。


static ngx_int_t
ngx_http_write_request_body(ngx_http_request_t *r)
{
    ssize_t                    n;
    ngx_chain_t               *cl, *ln;
    ngx_temp_file_t           *tf;
    ngx_http_request_body_t   *rb;
    ngx_http_core_loc_conf_t  *clcf;

    rb = r->request_body;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http write client request body, bufs %p", rb->bufs);

    if (rb->temp_file == NULL) {
        // 创建临时文件结构体
        tf = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
        if (tf == NULL) {
            return NGX_ERROR;
        }

        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

        tf->file.fd = NGX_INVALID_FILE;
        tf->file.log = r->connection->log;
        tf->path = clcf->client_body_temp_path;
        tf->pool = r->pool;
        tf->warn = "a client request body is buffered to a temporary file";
        tf->log_level = r->request_body_file_log_level;
        tf->persistent = r->request_body_in_persistent_file;
        tf->clean = r->request_body_in_clean_file;

        if (r->request_body_file_group_access) {
            tf->access = 0660;
        }

        rb->temp_file = tf;

        if (rb->bufs == NULL) {
            /* empty body with r->request_body_in_file_only */

            if (ngx_create_temp_file(&tf->file, tf->path, tf->pool,
                                     tf->persistent, tf->clean, tf->access)
                != NGX_OK)
            {
                return NGX_ERROR;
            }

            return NGX_OK;
        }
    }

    if (rb->bufs == NULL) {
        return NGX_OK;
    }

    // 把buf chain中的内容写到临时文件
    n = ngx_write_chain_to_temp_file(rb->temp_file, rb->bufs);

    /* TODO: n == 0 or not complete and level event */

    if (n == NGX_ERROR) {
        return NGX_ERROR;
    }
    // 记录了临时文件偏移大小
    rb->temp_file->offset += n;

    /* mark all buffers as written */

    for (cl = rb->bufs; cl; /* void */) {

        cl->buf->pos = cl->buf->last;  // 标注已写到临时文件中

        ln = cl;
        cl = cl->next;
        ngx_free_chain(r->pool, ln);
    }

    rb->bufs = NULL;

    return NGX_OK;
}


ngx_http_do_read_client_request_body 是真正调用recv接受客户端发送内容的函数。 会被可读事件回调函数ngx_http_read_client_request_body_handler触发执行。


static ngx_int_t
ngx_http_do_read_client_request_body(ngx_http_request_t *r)
{
    off_t                      rest;
    size_t                     size;
    ssize_t                    n;
    ngx_int_t                  rc;
    ngx_chain_t                out;
    ngx_connection_t          *c;
    ngx_http_request_body_t   *rb;
    ngx_http_core_loc_conf_t  *clcf;

    c = r->connection;
    rb = r->request_body;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http read client request body");

    for ( ;; ) {
        for ( ;; ) {
            // buf缓存区已用完
            if (rb->buf->last == rb->buf->end) {
                // buf缓存区没有处理完
                if (rb->buf->pos != rb->buf->last) {

                    /* pass buffer to request body filter chain */
                    // out的buf是个指针,并没有分配新的ngx_buf_t
                    // 因此ngx_http_request_body_filter操作的就是原来分配的这个buf。
                    out.buf = rb->buf;
                    out.next = NULL;

                    rc = ngx_http_request_body_filter(r, &out);

                    if (rc != NGX_OK) {
                        return rc;
                    }

                } else {

                    /* update chains */

                    rc = ngx_http_request_body_filter(r, NULL);

                    if (rc != NGX_OK) {
                        return rc;
                    }
                }

                // ngx_chain_update_chains函数会更新busy这个chain。
                if (rb->busy != NULL) {
                    // 缓存区内容被使用
                    if (r->request_body_no_buffering) {
                        // 不设置缓存,缓存区写满了也不会写入临时文件
                        if (c->read->timer_set) {
                            ngx_del_timer(c->read);
                        }

                        if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                            return NGX_HTTP_INTERNAL_SERVER_ERROR;
                        }

                        return NGX_AGAIN;
                    }

                    return NGX_HTTP_INTERNAL_SERVER_ERROR;
                }
                // 缓存区的内容已经被写到临时文件,缓存区内容可以覆盖了。所以重置缓存区
                rb->buf->pos = rb->buf->start;
                rb->buf->last = rb->buf->start;
            }

            // size 是缓存区大小,rest是剩余body大小
            size = rb->buf->end - rb->buf->last;
            // rb->buf->last - rb->buf->pos 的内容是已经读取到缓存区未被处理的
            rest = rb->rest - (rb->buf->last - rb->buf->pos);

            if ((off_t) size > rest) {
                size = (size_t) rest;
            }

            // linux环境是调用了ngx_unix_recv函数
            n = c->recv(c, rb->buf->last, size);

            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                           "http client request body recv %z", n);

            if (n == NGX_AGAIN) {
                break;
            }

            if (n == 0) {
                ngx_log_error(NGX_LOG_INFO, c->log, 0,
                              "client prematurely closed connection");
            }

            if (n == 0 || n == NGX_ERROR) {
                c->error = 1;
                return NGX_HTTP_BAD_REQUEST;
            }

            rb->buf->last += n;
            r->request_length += n;

            if (n == rest) {
                // 读取完所有的body内容
                /* pass buffer to request body filter chain */

                out.buf = rb->buf;
                out.next = NULL;

                rc = ngx_http_request_body_filter(r, &out);

                if (rc != NGX_OK) {
                    return rc;
                }
            }

            if (rb->rest == 0) {
                break;
            }

            if (rb->buf->last < rb->buf->end) {
                break;
            }
        }

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http client request body rest %O", rb->rest);

        if (rb->rest == 0) {
            break;
        }

        // body未读完,且socket不可读
        if (!c->read->ready) {
            
            if (r->request_body_no_buffering
                && rb->buf->pos != rb->buf->last)
            {
                /* pass buffer to request body filter chain */

                out.buf = rb->buf;
                out.next = NULL;

                rc = ngx_http_request_body_filter(r, &out);

                if (rc != NGX_OK) {
                    return rc;
                }
            }

            // 设置body超时事件
            clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
            ngx_add_timer(c->read, clcf->client_body_timeout);

            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
            }

            return NGX_AGAIN;
        }
    }

    // body 读取完毕
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    // body 有缓冲区
    if (!r->request_body_no_buffering) {
        r->read_event_handler = ngx_http_block_reading;
        rb->post_handler(r);
    }

    return NGX_OK;
}

buf缓存区参考

request 相关变量的意义

// 不缓存body内容,如果缓存区满了,会直接调用处理函数处理,不会写入临时文件。
r->request_body_no_buffering
// 指向接收body相关的结构体
r->request_body
// 包含请求头在内的长度
r->request_length

// body剩余读取长度
rb->rest

总结

  • body 可能会缓存到两块内存上,也可能在一块内存上。或者可能会被全部写入临时文件。
  • client_body_in_single_buffer 强制body写入到一块内存上,如果一块内存写不下就写到临时文件。如果body太大,大于(client_body_buffer_size+client_body_buffer_size/4)则会把body写入文件。则request_body变量也将获取不到body了。
  • client_body_in_file_only 如果缓存body,则强制body写入临时文件。
  • client_body_buffer_size 为缓存body分配的内存大小,如果body大于该大小则会把body写入临时文件。

大部分情况下都会缓存body,例如在配置了proxy_request_buffering为off,且proxy_pass_request_body为on,body不是chunked类型或proxy_http_version是1.1才可以不缓存body直接转发到上游。

附录:

事件驱动赋值

ngx_io = ngx_os_io;

ngx_event_actions = ngx_epoll_module_ctx.actions;

ngx_event_flags = NGX_USE_CLEAR_EVENT    // 边沿触发

请求处理回调函数:

rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                        : ngx_event_recvmsg;

ls->handler = ngx_http_init_connection;

rev->handler = ngx_http_wait_request_handler;
c->write->handler = ngx_http_empty_handler;

size = cscf->client_header_buffer_size;
c->buffer = ngx_create_temp_buf(c->pool, size);

rev->handler = ngx_http_process_request_line;

r->read_event_handler = ngx_http_block_reading;
        r->header_in = hc->busy ? hc->busy->buf : c->buffer;
        r->main = r;
        r->count = 1;

rev->handler = ngx_http_process_request_headers;

// 处理请求头
rc = ngx_http_process_request_header(r);
// 请求头处理结束以后调用该函数处理请求
ngx_http_process_request(r);

// 底层事件驱动函数,处理请求行、请求头一直再改底层读写回调函数
// 读取请求body时一直改为该函数 
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
// 处理完请求头以后
ngx_http_handler(r);

r->write_event_handler = ngx_http_core_run_phases;
// 执行ngx http的11个阶段
ngx_http_core_run_phases(r);

ngx_http_core_find_config_phase(r, ph);
ngx_http_update_location_config(r);
    if (r->content_handler) {
        r->write_event_handler = ngx_http_request_empty_handler;
        ngx_http_finalize_request(r, r->content_handler(r));
        return NGX_OK;
    }


ngx_http_proxy_handler(r);

rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);

vislee avatar Mar 23 '17 06:03 vislee