leevis.com
leevis.com copied to clipboard
nginx的http模块的upstream机制
概述
upstream是nginx向上游发起tcp请求的一种机制。在nginx中有很多有用的模块都用到了该机制,例如proxy模块,memcache模块等。
upstream模块提供了两个配置指令:upstream和server来指定上游服务器地址。
指令解析
upstream机制的实现是在ngx_http_upstream.h|c, ngx_http_upstream_round_robin.h|c 这几个文件中。 同时使用upstream的模块(如proxy)也会初始化和调用ups模块的一些变量和函数,先从程序启动解析配置指令开始。调用逻辑在ngx_http_block函数中。
- 首先会调用ups模块ctx的函数指针指向的ngx_http_upstream_create_main_conf函数创建一个结构体保存配置:
typedef struct {
ngx_hash_t headers_in_hash;
ngx_array_t upstreams; // upstream数组,多组上游服务器
/* ngx_http_upstream_srv_conf_t */
} ngx_http_upstream_main_conf_t;
- 同时,也会调用proxy模块ctx的ngx_http_proxy_create_loc_conf创建一个结构体保存配置:
typedef struct {
// ups 机制相关的配置,常常被使用模块定义并配置。
ngx_http_upstream_conf_t upstream;
ngx_array_t *body_flushes;
ngx_array_t *body_lengths;
ngx_array_t *body_values;
ngx_str_t body_source;
ngx_http_proxy_headers_t headers;
#if (NGX_HTTP_CACHE)
ngx_http_proxy_headers_t headers_cache;
#endif
ngx_array_t *headers_source;
ngx_array_t *proxy_lengths;
ngx_array_t *proxy_values;
ngx_array_t *redirects;
ngx_array_t *cookie_domains;
ngx_array_t *cookie_paths;
ngx_http_complex_value_t *method;
ngx_str_t location;
ngx_str_t url;
#if (NGX_HTTP_CACHE)
ngx_http_complex_value_t cache_key;
#endif
ngx_http_proxy_vars_t vars;
ngx_flag_t redirect;
ngx_uint_t http_version;
ngx_uint_t headers_hash_max_size;
ngx_uint_t headers_hash_bucket_size;
#if (NGX_HTTP_SSL)
ngx_uint_t ssl;
ngx_uint_t ssl_protocols;
ngx_str_t ssl_ciphers;
ngx_uint_t ssl_verify_depth;
ngx_str_t ssl_trusted_certificate;
ngx_str_t ssl_crl;
ngx_str_t ssl_certificate;
ngx_str_t ssl_certificate_key;
ngx_array_t *ssl_passwords;
#endif
} ngx_http_proxy_loc_conf_t;
- 接着会调用ups模块的ctx另一个函数指针指向的函数ngx_http_upstream_add_variables添加变量。
static ngx_int_t
ngx_http_upstream_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;
for (v = ngx_http_upstream_vars; v->name.len; v++) {
var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL) {
return NGX_ERROR;
}
var->get_handler = v->get_handler;
var->data = v->data;
}
return NGX_OK;
}
- 接着会解析NGX_HTTP_MAIN_CONF级别的配置,upstream模块只有upstream指令是该级别的。读取配置内容,直到处理到upstream name { ... } ,会调用ngx_http_upstream函数处理。 和ups机制相关的proxy模块的配置也可以配置到该级别,同时也可以配置到NGX_HTTP_LOC_CONF级别,暂时就在NGX_HTTP_LOC_CONF解析的时候介绍了。
// 对应一组上游服务器,也就是一个upstream指令
struct ngx_http_upstream_srv_conf_s {
ngx_http_upstream_peer_t peer; // 负载均衡策略相关
void **srv_conf;
ngx_array_t *servers; /* ngx_http_upstream_server_t */ // server 指令对应的配置,实际的上游服务器的地址
ngx_uint_t flags;
ngx_str_t host; // 一组上游服务器的名称
u_char *file_name;
ngx_uint_t line;
in_port_t port;
ngx_uint_t no_port; /* unsigned no_port:1 */
#if (NGX_HTTP_UPSTREAM_ZONE)
ngx_shm_zone_t *shm_zone;
#endif
};
// 上游服务器地址
typedef struct {
ngx_str_t name;
ngx_addr_t *addrs;
ngx_uint_t naddrs;
ngx_uint_t weight;
ngx_uint_t max_conns;
ngx_uint_t max_fails;
time_t fail_timeout;
ngx_msec_t slow_start;
unsigned down:1;
unsigned backup:1;
NGX_COMPAT_BEGIN(6)
NGX_COMPAT_END
} ngx_http_upstream_server_t;
static char *
ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
{
char *rv;
void *mconf;
ngx_str_t *value;
ngx_url_t u;
ngx_uint_t m;
ngx_conf_t pcf;
ngx_http_module_t *module;
ngx_http_conf_ctx_t *ctx, *http_ctx;
ngx_http_upstream_srv_conf_t *uscf;
ngx_memzero(&u, sizeof(ngx_url_t));
value = cf->args->elts;
u.host = value[1]; // name 一组上游服务器的名称。
u.no_resolve = 1;
u.no_port = 1;
// 分配ngx_http_upstream_srv_conf_t结构体添加到umcf->upstreams数组
uscf = ngx_http_upstream_add(cf, &u, NGX_HTTP_UPSTREAM_CREATE
|NGX_HTTP_UPSTREAM_WEIGHT
|NGX_HTTP_UPSTREAM_MAX_CONNS
|NGX_HTTP_UPSTREAM_MAX_FAILS
|NGX_HTTP_UPSTREAM_FAIL_TIMEOUT
|NGX_HTTP_UPSTREAM_DOWN
|NGX_HTTP_UPSTREAM_BACKUP);
if (uscf == NULL) {
return NGX_CONF_ERROR;
}
// http模块下每个block配置都会对应一个这样的上下文
// 指明所属block的模块配置 和 本block的模块配置
ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}
http_ctx = cf->ctx;
// main_conf 指向所属的http{}
ctx->main_conf = http_ctx->main_conf;
/* the upstream{}'s srv_conf */
ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}
// upstream 模块srv级别创建配置文件结构体的函数指针为空,所以再下边数组赋值不会被覆盖掉
ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf;
uscf->srv_conf = ctx->srv_conf;
/* the upstream{}'s loc_conf */
ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module);
if (ctx->loc_conf == NULL) {
return NGX_CONF_ERROR;
}
for (m = 0; cf->cycle->modules[m]; m++) {
if (cf->cycle->modules[m]->type != NGX_HTTP_MODULE) {
continue;
}
module = cf->cycle->modules[m]->ctx;
if (module->create_srv_conf) {
mconf = module->create_srv_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}
// srv_conf 数组包含了所有模块的srv级别的配置
ctx->srv_conf[cf->cycle->modules[m]->ctx_index] = mconf;
}
if (module->create_loc_conf) {
mconf = module->create_loc_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}
// loc_conf 数组包含了所有模块的loc级别的配置
ctx->loc_conf[cf->cycle->modules[m]->ctx_index] = mconf;
}
}
uscf->servers = ngx_array_create(cf->pool, 4,
sizeof(ngx_http_upstream_server_t));
if (uscf->servers == NULL) {
return NGX_CONF_ERROR;
}
/* parse inside upstream{} */
pcf = *cf;
cf->ctx = ctx;
cf->cmd_type = NGX_HTTP_UPS_CONF;
// 解析NGX_HTTP_UPS_CONF级别的配置
// 该级别的配置指令有server,配置上游服务器地址的。有ip_hash 配置负载均衡策略的。
rv = ngx_conf_parse(cf, NULL);
*cf = pcf;
if (rv != NGX_CONF_OK) {
return rv;
}
if (uscf->servers->nelts == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"no servers are inside upstream");
return NGX_CONF_ERROR;
}
return rv;
}
- 开始解析NGX_HTTP_UPS_CONF级别的server指令,对应的函数ngx_http_upstream_server。 指令格式:server 127.0.0.1:8080 weight=5 max_fails=3 fail_timeout=30s;
static char *
ngx_http_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_upstream_srv_conf_t *uscf = conf;
time_t fail_timeout;
ngx_str_t *value, s;
ngx_url_t u;
ngx_int_t weight, max_conns, max_fails;
ngx_uint_t i;
ngx_http_upstream_server_t *us;
us = ngx_array_push(uscf->servers);
if (us == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(us, sizeof(ngx_http_upstream_server_t));
value = cf->args->elts;
weight = 1;
max_conns = 0;
max_fails = 1;
fail_timeout = 10;
for (i = 2; i < cf->args->nelts; i++) {
if (ngx_strncmp(value[i].data, "weight=", 7) == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_WEIGHT)) {
goto not_supported;
}
weight = ngx_atoi(&value[i].data[7], value[i].len - 7);
if (weight == NGX_ERROR || weight == 0) {
goto invalid;
}
continue;
}
if (ngx_strncmp(value[i].data, "max_conns=", 10) == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_MAX_CONNS)) {
goto not_supported;
}
max_conns = ngx_atoi(&value[i].data[10], value[i].len - 10);
if (max_conns == NGX_ERROR) {
goto invalid;
}
continue;
}
if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_MAX_FAILS)) {
goto not_supported;
}
max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10);
if (max_fails == NGX_ERROR) {
goto invalid;
}
continue;
}
if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_FAIL_TIMEOUT)) {
goto not_supported;
}
s.len = value[i].len - 13;
s.data = &value[i].data[13];
fail_timeout = ngx_parse_time(&s, 1);
if (fail_timeout == (time_t) NGX_ERROR) {
goto invalid;
}
continue;
}
if (ngx_strcmp(value[i].data, "backup") == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_BACKUP)) {
goto not_supported;
}
us->backup = 1;
continue;
}
if (ngx_strcmp(value[i].data, "down") == 0) {
if (!(uscf->flags & NGX_HTTP_UPSTREAM_DOWN)) {
goto not_supported;
}
us->down = 1;
continue;
}
goto invalid;
}
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1]; // 上游服务器ip或域名
u.default_port = 80;
// 域名解析
if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in upstream \"%V\"", u.err, &u.url);
}
return NGX_CONF_ERROR;
}
us->name = u.url;
us->addrs = u.addrs; // 上游服务器ip地址的个数
us->naddrs = u.naddrs; // 上游服务器ip地址
us->weight = weight; // 权重
us->max_conns = max_conns;
us->max_fails = max_fails;
us->fail_timeout = fail_timeout;
return NGX_CONF_OK;
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
not_supported:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"balancing method does not support parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
- 解析NGX_HTTP_LOC_CONF级别的配置,和ups机制相关的配置在proxy模块中,例如:proxy_pass, proxy_buffering, proxy_request_buffering, proxy_connect_timeout 等。 类似proxy_buffering很多的配置都是定义ngx_http_upstream_conf_t结构体的。 而proxy_pass是使用ups机制的指令,处理函数为ngx_http_proxy_pass。
// ups机制的配置,通常被使用模块定义并配置。
// proxy 模块ngx_http_proxy_loc_conf_t结构体就引用了
typedef struct {
ngx_http_upstream_srv_conf_t *upstream;
ngx_msec_t connect_timeout;
ngx_msec_t send_timeout;
ngx_msec_t read_timeout;
ngx_msec_t next_upstream_timeout;
size_t send_lowat;
size_t buffer_size;
size_t limit_rate;
size_t busy_buffers_size;
size_t max_temp_file_size;
size_t temp_file_write_size;
size_t busy_buffers_size_conf;
size_t max_temp_file_size_conf;
size_t temp_file_write_size_conf;
ngx_bufs_t bufs;
ngx_uint_t ignore_headers;
ngx_uint_t next_upstream;
ngx_uint_t store_access;
ngx_uint_t next_upstream_tries;
ngx_flag_t buffering;
ngx_flag_t request_buffering;
ngx_flag_t pass_request_headers;
ngx_flag_t pass_request_body;
ngx_flag_t ignore_client_abort;
ngx_flag_t intercept_errors;
ngx_flag_t cyclic_temp_file;
ngx_flag_t force_ranges;
ngx_path_t *temp_path;
ngx_hash_t hide_headers_hash;
ngx_array_t *hide_headers;
ngx_array_t *pass_headers;
ngx_http_upstream_local_t *local;
#if (NGX_HTTP_CACHE)
ngx_shm_zone_t *cache_zone;
ngx_http_complex_value_t *cache_value;
ngx_uint_t cache_min_uses;
ngx_uint_t cache_use_stale;
ngx_uint_t cache_methods;
off_t cache_max_range_offset;
ngx_flag_t cache_lock;
ngx_msec_t cache_lock_timeout;
ngx_msec_t cache_lock_age;
ngx_flag_t cache_revalidate;
ngx_flag_t cache_convert_head;
ngx_flag_t cache_background_update;
ngx_array_t *cache_valid;
ngx_array_t *cache_bypass;
ngx_array_t *cache_purge;
ngx_array_t *no_cache;
#endif
ngx_array_t *store_lengths;
ngx_array_t *store_values;
#if (NGX_HTTP_CACHE)
signed cache:2;
#endif
signed store:2;
unsigned intercept_404:1;
unsigned change_buffering:1;
#if (NGX_HTTP_SSL || NGX_COMPAT)
ngx_ssl_t *ssl;
ngx_flag_t ssl_session_reuse;
ngx_http_complex_value_t *ssl_name;
ngx_flag_t ssl_server_name;
ngx_flag_t ssl_verify;
#endif
ngx_str_t module;
NGX_COMPAT_BEGIN(2)
NGX_COMPAT_END
} ngx_http_upstream_conf_t;
static char *
ngx_http_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_proxy_loc_conf_t *plcf = conf;
size_t add;
u_short port;
ngx_str_t *value, *url;
ngx_url_t u;
ngx_uint_t n;
ngx_http_core_loc_conf_t *clcf;
ngx_http_script_compile_t sc;
if (plcf->upstream.upstream || plcf->proxy_lengths) {
return "is duplicate";
}
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
// 配置了proxy_pass的location的 content阶段的处理函数
clcf->handler = ngx_http_proxy_handler;
if (clcf->name.data[clcf->name.len - 1] == '/') {
clcf->auto_redirect = 1;
}
value = cf->args->elts;
// proxy_pass 配置的url
url = &value[1];
n = ngx_http_script_variables_count(url);
if (n) {
ngx_memzero(&sc, sizeof(ngx_http_script_compile_t));
sc.cf = cf;
sc.source = url;
sc.lengths = &plcf->proxy_lengths;
sc.values = &plcf->proxy_values;
sc.variables = n;
sc.complete_lengths = 1;
sc.complete_values = 1;
if (ngx_http_script_compile(&sc) != NGX_OK) {
return NGX_CONF_ERROR;
}
#if (NGX_HTTP_SSL)
plcf->ssl = 1;
#endif
return NGX_CONF_OK;
}
if (ngx_strncasecmp(url->data, (u_char *) "http://", 7) == 0) {
add = 7;
port = 80;
} else if (ngx_strncasecmp(url->data, (u_char *) "https://", 8) == 0) {
#if (NGX_HTTP_SSL)
plcf->ssl = 1;
add = 8;
port = 443;
#else
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"https protocol requires SSL support");
return NGX_CONF_ERROR;
#endif
} else {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid URL prefix");
return NGX_CONF_ERROR;
}
ngx_memzero(&u, sizeof(ngx_url_t));
u.url.len = url->len - add;
u.url.data = url->data + add;
u.default_port = port;
u.uri_part = 1;
u.no_resolve = 1;
// 一组上游服务器 配置文件中的一个upstream name {...}
plcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (plcf->upstream.upstream == NULL) {
return NGX_CONF_ERROR;
}
plcf->vars.schema.len = add; // schema = http/https
plcf->vars.schema.data = url->data;
plcf->vars.key_start = plcf->vars.schema;
// host_header = u->host port = 80/443
// key_start 包含了schema + host
// 还有uri的设置,ngx_http_proxy_create_request函数中用到
ngx_http_proxy_set_vars(&u, &plcf->vars);
plcf->location = clcf->name;
if (clcf->named
#if (NGX_PCRE)
|| clcf->regex
#endif
|| clcf->noname)
{
if (plcf->vars.uri.len) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"\"proxy_pass\" cannot have URI part in "
"location given by regular expression, "
"or inside named location, "
"or inside \"if\" statement, "
"or inside \"limit_except\" block");
return NGX_CONF_ERROR;
}
plcf->location.len = 0;
}
plcf->url = *url;
return NGX_CONF_OK;
}
- 在ngx_http_block 中解析完http{}下的配置文件后,开始调用http模块上下文的init_main_conf,ups模块该指针指向ngx_http_upstream_init_main_conf函数。
static char *
ngx_http_upstream_init_main_conf(ngx_conf_t *cf, void *conf)
{
ngx_http_upstream_main_conf_t *umcf = conf;
ngx_uint_t i;
ngx_array_t headers_in;
ngx_hash_key_t *hk;
ngx_hash_init_t hash;
ngx_http_upstream_init_pt init;
ngx_http_upstream_header_t *header;
ngx_http_upstream_srv_conf_t **uscfp;
// 对应一个upstream{} 配置块
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
// 负载均衡策略从此开始介入,默认为round robin。
init = uscfp[i]->peer.init_upstream ? uscfp[i]->peer.init_upstream:
ngx_http_upstream_init_round_robin;
if (init(cf, uscfp[i]) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
/* upstream_headers_in_hash */
if (ngx_array_init(&headers_in, cf->temp_pool, 32, sizeof(ngx_hash_key_t))
!= NGX_OK)
{
return NGX_CONF_ERROR;
}
for (header = ngx_http_upstream_headers_in; header->name.len; header++) {
hk = ngx_array_push(&headers_in);
if (hk == NULL) {
return NGX_CONF_ERROR;
}
hk->key = header->name;
hk->key_hash = ngx_hash_key_lc(header->name.data, header->name.len);
hk->value = header;
}
hash.hash = &umcf->headers_in_hash;
hash.key = ngx_hash_key_lc;
hash.max_size = 512;
hash.bucket_size = ngx_align(64, ngx_cacheline_size);
hash.name = "upstream_headers_in_hash";
hash.pool = cf->pool;
hash.temp_pool = NULL;
if (ngx_hash_init(&hash, headers_in.elts, headers_in.nelts) != NGX_OK) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
默认的round robin负载均衡策略初始化函数。
typedef struct {
ngx_http_upstream_init_pt init_upstream;
ngx_http_upstream_init_peer_pt init;
void *data;
} ngx_http_upstream_peer_t;
typedef struct ngx_http_upstream_rr_peer_s ngx_http_upstream_rr_peer_t;
struct ngx_http_upstream_rr_peer_s {
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t name;
ngx_str_t server;
ngx_int_t current_weight;
ngx_int_t effective_weight;
ngx_int_t weight;
// 当前连接数 ngx_http_upstream_get_round_robin_peer 函数增加
ngx_uint_t conns;
ngx_uint_t max_conns; // 最大连接数
ngx_uint_t fails; // 失败的次数
time_t accessed;
time_t checked; // 上游节点被选中的时间
ngx_uint_t max_fails; // 最大失败次数
time_t fail_timeout; // 失败最长下线时间
ngx_msec_t slow_start;
ngx_msec_t start_time;
ngx_uint_t down; // 标志下线
#if (NGX_HTTP_SSL || NGX_COMPAT)
void *ssl_session;
int ssl_session_len;
#endif
#if (NGX_HTTP_UPSTREAM_ZONE)
ngx_atomic_t lock;
#endif
ngx_http_upstream_rr_peer_t *next;
NGX_COMPAT_BEGIN(32)
NGX_COMPAT_END
};
typedef struct ngx_http_upstream_rr_peers_s ngx_http_upstream_rr_peers_t;
struct ngx_http_upstream_rr_peers_s {
ngx_uint_t number; // 上游服务器ip个数
#if (NGX_HTTP_UPSTREAM_ZONE)
ngx_slab_pool_t *shpool;
ngx_atomic_t rwlock;
ngx_http_upstream_rr_peers_t *zone_next;
#endif
ngx_uint_t total_weight;
unsigned single:1; // 是否是一个上游服务器,包括backup的服务器
unsigned weighted:1; // 是否加权
ngx_str_t *name; // 一组ups的名称
ngx_http_upstream_rr_peers_t *next; // backup 的server
ngx_http_upstream_rr_peer_t *peer; // 实际的上游服务器地址
};
ngx_int_t
ngx_http_upstream_init_round_robin(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us)
{
ngx_url_t u;
ngx_uint_t i, j, n, w;
ngx_http_upstream_server_t *server;
ngx_http_upstream_rr_peer_t *peer, **peerp;
ngx_http_upstream_rr_peers_t *peers, *backup;
us->peer.init = ngx_http_upstream_init_round_robin_peer;
if (us->servers) {
// 一组ups配置下的server
server = us->servers->elts;
n = 0; // 一组ups总的ip地址的个数 ,不包括backup的server。
w = 0; // 权重
for (i = 0; i < us->servers->nelts; i++) {
if (server[i].backup) {
continue;
}
n += server[i].naddrs;
w += server[i].naddrs * server[i].weight;
}
if (n == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no servers in upstream \"%V\" in %s:%ui",
&us->host, us->file_name, us->line);
return NGX_ERROR;
}
// 一个ups配置
peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t));
if (peers == NULL) {
return NGX_ERROR;
}
// n 个上游服务器地址
peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1);
peers->number = n;
peers->weighted = (w != n);
peers->total_weight = w;
peers->name = &us->host;
n = 0;
peerp = &peers->peer;
for (i = 0; i < us->servers->nelts; i++) {
// 遍历的server 指令配置的上游服务器
if (server[i].backup) {
continue;
}
// 如果server配置的上游服务器ip,则naddrs为1
// 如果配置的是上游服务器域名,则naddrs为多个
for (j = 0; j < server[i].naddrs; j++) {
peer[n].sockaddr = server[i].addrs[j].sockaddr;
peer[n].socklen = server[i].addrs[j].socklen;
peer[n].name = server[i].addrs[j].name;
peer[n].weight = server[i].weight;
peer[n].effective_weight = server[i].weight;
peer[n].current_weight = 0;
peer[n].max_conns = server[i].max_conns;
peer[n].max_fails = server[i].max_fails;
peer[n].fail_timeout = server[i].fail_timeout;
peer[n].down = server[i].down;
peer[n].server = server[i].name;
*peerp = &peer[n];
peerp = &peer[n].next;
n++;
}
}
us->peer.data = peers;
/* backup servers */
n = 0;
w = 0;
for (i = 0; i < us->servers->nelts; i++) {
if (!server[i].backup) {
continue;
}
n += server[i].naddrs;
w += server[i].naddrs * server[i].weight;
}
if (n == 0) {
return NGX_OK;
}
backup = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t));
if (backup == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = 0;
backup->single = 0;
backup->number = n;
backup->weighted = (w != n);
backup->total_weight = w;
backup->name = &us->host;
n = 0;
peerp = &backup->peer;
for (i = 0; i < us->servers->nelts; i++) {
if (!server[i].backup) {
continue;
}
for (j = 0; j < server[i].naddrs; j++) {
peer[n].sockaddr = server[i].addrs[j].sockaddr;
peer[n].socklen = server[i].addrs[j].socklen;
peer[n].name = server[i].addrs[j].name;
peer[n].weight = server[i].weight;
peer[n].effective_weight = server[i].weight;
peer[n].current_weight = 0;
peer[n].max_conns = server[i].max_conns;
peer[n].max_fails = server[i].max_fails;
peer[n].fail_timeout = server[i].fail_timeout;
peer[n].down = server[i].down;
peer[n].server = server[i].name;
*peerp = &peer[n];
peerp = &peer[n].next;
n++;
}
}
peers->next = backup;
return NGX_OK;
}
/* an upstream implicitly defined by proxy_pass, etc. */
if (us->port == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no port in upstream \"%V\" in %s:%ui",
&us->host, us->file_name, us->line);
return NGX_ERROR;
}
ngx_memzero(&u, sizeof(ngx_url_t));
u.host = us->host;
u.port = us->port;
if (ngx_inet_resolve_host(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"%s in upstream \"%V\" in %s:%ui",
u.err, &us->host, us->file_name, us->line);
}
return NGX_ERROR;
}
n = u.naddrs;
peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t));
if (peers == NULL) {
return NGX_ERROR;
}
peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n);
if (peer == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1);
peers->number = n;
peers->weighted = 0;
peers->total_weight = n;
peers->name = &us->host;
peerp = &peers->peer;
for (i = 0; i < u.naddrs; i++) {
peer[i].sockaddr = u.addrs[i].sockaddr;
peer[i].socklen = u.addrs[i].socklen;
peer[i].name = u.addrs[i].name;
peer[i].weight = 1;
peer[i].effective_weight = 1;
peer[i].current_weight = 0;
peer[i].max_conns = 0;
peer[i].max_fails = 1;
peer[i].fail_timeout = 10;
*peerp = &peer[i];
peerp = &peer[i].next;
}
us->peer.data = peers;
/* implicitly defined upstream has no backup servers */
return NGX_OK;
}
- 运行阶段,当请求执行到content阶段,且该location配置了proxy_pass指令,请求就由ngx_http_proxy_handler函数处理了。
struct ngx_peer_connection_s {
ngx_connection_t *connection;
struct sockaddr *sockaddr; // 调用get函数选的上游服务器ip。
socklen_t socklen;
ngx_str_t *name;
// 一个上游失败后可以尝试的次数,包括backup的上游。默认情况所有后端都会尝试
ngx_uint_t tries;
ngx_msec_t start_time;
// 获取上游ip地址 ngx_http_upstream_get_round_robin_peer
ngx_event_get_peer_pt get;
ngx_event_free_peer_pt free;
ngx_event_notify_peer_pt notify;
// 上游服务器地址 类型可变,一般ngx_http_upstream_rr_peer_data_t
// 由ngx_http_upstream_init_round_robin_peer函数设置
void *data;
#if (NGX_SSL || NGX_COMPAT)
ngx_event_set_peer_session_pt set_session;
ngx_event_save_peer_session_pt save_session;
#endif
ngx_addr_t *local;
int type;
int rcvbuf;
ngx_log_t *log;
unsigned cached:1;
unsigned transparent:1;
/* ngx_connection_log_error_e */
unsigned log_error:2;
NGX_COMPAT_BEGIN(2)
NGX_COMPAT_END
};
// 记录连接一个上游服务器的信息
typedef struct {
ngx_uint_t status; // 连接上游服务器状态
ngx_msec_t response_time;
ngx_msec_t connect_time;
ngx_msec_t header_time; // 接收并解析完header的耗时
off_t response_length;
off_t bytes_received; // 已接收上游服务器数据长度
ngx_str_t *peer; // 上游服务器名称
} ngx_http_upstream_state_t;
struct ngx_http_upstream_s {
ngx_http_upstream_handler_pt read_event_handler;
ngx_http_upstream_handler_pt write_event_handler;
// 上游服务器的地址和负载均衡策略回调函数
ngx_peer_connection_t peer;
ngx_event_pipe_t *pipe;
ngx_chain_t *request_bufs; // 要发送到上游的请求内容
ngx_output_chain_ctx_t output; // 发送到上游回调函数
ngx_chain_writer_ctx_t writer; // 发送到上游回调函数的上下文参数
ngx_http_upstream_conf_t *conf; // plcf->upstream
ngx_http_upstream_srv_conf_t *upstream; // 一组选好的上游服务器
#if (NGX_HTTP_CACHE)
ngx_array_t *caches;
#endif
// 上游服务器返回的header
// 该结构体的变量由ngx_http_upstream_headers_in数组中的handler赋值,
// 由ngx_http_proxy_process_header函数回调。
ngx_http_upstream_headers_in_t headers_in;
ngx_http_upstream_resolved_t *resolved;
ngx_buf_t from_client;
ngx_buf_t buffer; // 接收上游服务器返回缓存区
off_t length;
ngx_chain_t *out_bufs; 从上游接受到的响应体,要发送到下游
ngx_chain_t *busy_bufs;
ngx_chain_t *free_bufs;
ngx_int_t (*input_filter_init)(void *data);
ngx_int_t (*input_filter)(void *data, ssize_t bytes); // 从上游接收响应体回调函数
void *input_filter_ctx;
#if (NGX_HTTP_CACHE)
ngx_int_t (*create_key)(ngx_http_request_t *r);
#endif
ngx_int_t (*create_request)(ngx_http_request_t *r); // 组装发送到上游的请求的内容
ngx_int_t (*reinit_request)(ngx_http_request_t *r);
ngx_int_t (*process_header)(ngx_http_request_t *r);
void (*abort_request)(ngx_http_request_t *r);
void (*finalize_request)(ngx_http_request_t *r,
ngx_int_t rc);
ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r,
ngx_table_elt_t *h, size_t prefix);
ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r,
ngx_table_elt_t *h);
ngx_msec_t timeout;
// 指向原请求upstream_states数组中的一个元素,且每连接一个上游服务器会重新赋值
ngx_http_upstream_state_t *state;
ngx_str_t method;
ngx_str_t schema;
ngx_str_t uri;
#if (NGX_HTTP_SSL || NGX_COMPAT)
ngx_str_t ssl_name;
#endif
ngx_http_cleanup_pt *cleanup;
unsigned store:1;
unsigned cacheable:1;
unsigned accel:1;
unsigned ssl:1;
#if (NGX_HTTP_CACHE)
unsigned cache_status:3;
#endif
unsigned buffering:1;
unsigned keepalive:1;
unsigned upgrade:1;
unsigned request_sent:1; // 开始向上游发送请求
unsigned request_body_sent:1; // 请求成功发送到上游
unsigned header_sent:1; // resp 的header 已经发完到下游
};
static ngx_int_t
ngx_http_proxy_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_proxy_ctx_t *ctx;
ngx_http_proxy_loc_conf_t *plcf;
#if (NGX_HTTP_CACHE)
ngx_http_proxy_main_conf_t *pmcf;
#endif
// 创建和上游请求的结构体
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_proxy_ctx_t));
if (ctx == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_set_ctx(r, ctx, ngx_http_proxy_module);
plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
u = r->upstream;
if (plcf->proxy_lengths == NULL) {
ctx->vars = plcf->vars;
u->schema = plcf->vars.schema;
#if (NGX_HTTP_SSL)
u->ssl = (plcf->upstream.ssl != NULL);
#endif
} else {
if (ngx_http_proxy_eval(r, ctx, plcf) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
u->output.tag = (ngx_buf_tag_t) &ngx_http_proxy_module;
u->conf = &plcf->upstream;
#if (NGX_HTTP_CACHE)
pmcf = ngx_http_get_module_main_conf(r, ngx_http_proxy_module);
u->caches = &pmcf->caches;
u->create_key = ngx_http_proxy_create_key;
#endif
u->create_request = ngx_http_proxy_create_request;
u->reinit_request = ngx_http_proxy_reinit_request;
u->process_header = ngx_http_proxy_process_status_line;
u->abort_request = ngx_http_proxy_abort_request;
u->finalize_request = ngx_http_proxy_finalize_request;
r->state = 0;
if (plcf->redirects) {
u->rewrite_redirect = ngx_http_proxy_rewrite_redirect;
}
if (plcf->cookie_domains || plcf->cookie_paths) {
u->rewrite_cookie = ngx_http_proxy_rewrite_cookie;
}
// 处理上游响应body是否有缓存区
u->buffering = plcf->upstream.buffering;
// 有缓存区则需要pipe方式处理
u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
if (u->pipe == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// 有缓存区处理响应body回调函数
u->pipe->input_filter = ngx_http_proxy_copy_filter;
u->pipe->input_ctx = r;
// 该回调函数会根据上游响应头判断响应的body是否是chunked协议,从而赋值input_filter不同的处理函数。
u->input_filter_init = ngx_http_proxy_input_filter_init;
// 无缓存区处理响应body回调函数
u->input_filter = ngx_http_proxy_non_buffered_copy_filter;
u->input_filter_ctx = r;
u->accel = 1;
if (!plcf->upstream.request_buffering
&& plcf->body_values == NULL && plcf->upstream.pass_request_body
&& (!r->headers_in.chunked
|| plcf->http_version == NGX_HTTP_VERSION_11))
{
// 不缓存请求body。
r->request_body_no_buffering = 1;
}
// 读取请求的body并启动ups
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}
- 调用ngx_http_upstream_init函数启动ups,该函数把下游的读超时删除,添加写事件。并调用ngx_http_upstream_init_request初始化上游请求。 会调用u->create_request(r) 创建发给上游的请求内容。 并根据proxy_pass的url的host选择一组上游服务器,也就是upstream name {...} 然后调用ngx_http_upstream_connect向上游服务器发起链接,且把连接可读可写事件以水平触发方式加到epoll中。该函数是重点,从此开始涉及到http的众多知识。
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
ngx_str_t *host;
ngx_uint_t i;
ngx_resolver_ctx_t *ctx, temp;
ngx_http_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
ngx_http_upstream_srv_conf_t *uscf, **uscfp;
ngx_http_upstream_main_conf_t *umcf;
if (r->aio) {
return;
}
u = r->upstream;
#if (NGX_HTTP_CACHE)
// 是否开启缓存。proxy_cache 指令配置。
if (u->conf->cache) {
ngx_int_t rc;
// 获取缓存目录结构
rc = ngx_http_upstream_cache(r, u);
if (rc == NGX_BUSY) {
r->write_event_handler = ngx_http_upstream_init_request;
return;
}
r->write_event_handler = ngx_http_request_empty_handler;
if (rc == NGX_ERROR) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (rc == NGX_OK) {
rc = ngx_http_upstream_cache_send(r, u);
if (rc == NGX_DONE) {
return;
}
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
rc = NGX_DECLINED;
r->cached = 0;
}
if (ngx_http_upstream_cache_background_update(r, u) != NGX_OK) {
rc = NGX_ERROR;
}
}
if (rc != NGX_DECLINED) {
ngx_http_finalize_request(r, rc);
return;
}
// 返回 NGX_DECLINED 关闭cache, bypass。
}
#endif
// proxy_store 指令配置,是否需要保存resp到磁盘文件。
u->store = u->conf->store;
// 是否要检查客户端过早的关闭连接
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
if (r->request_body) {
// 请求的body
u->request_bufs = r->request_body->bufs;
}
// http_proxy_module: ngx_http_proxy_create_request
// 拼接发送上游的内容,写入r->u->request_bufs中。
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 设置向上游发起链接绑定的本地ip和端口,设置到u->peer.local
if (ngx_http_upstream_set_local(r, u, u->conf->local) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
u->output.alignment = clcf->directio_alignment;
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size;
if (u->output.output_filter == NULL) {
// 请求body发送到上游调用。ngx_http_upstream_send_request_body 的 ngx_output_chain
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
}
u->writer.pool = r->pool;
if (r->upstream_states == NULL) {
r->upstream_states = ngx_array_create(r->pool, 1,
sizeof(ngx_http_upstream_state_t));
if (r->upstream_states == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
} else {
// u->state 对应一个上游的本次请求的信息
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
}
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
if (u->resolved == NULL) {
// 一组上游服务器
uscf = u->conf->upstream;
} else {
#if (NGX_HTTP_SSL)
u->ssl_name = u->resolved->host;
#endif
host = &u->resolved->host;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
{
goto found;
}
}
if (u->resolved->sockaddr) {
if (u->resolved->port == 0
&& u->resolved->sockaddr->sa_family != AF_UNIX)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream \"%V\"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
return;
}
if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream \"%V\"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
temp.name = *host;
ctx = ngx_resolve_start(clcf->resolver, &temp);
if (ctx == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no resolver defined to resolve %V", host);
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}
ctx->name = *host;
ctx->handler = ngx_http_upstream_resolve_handler;
ctx->data = r;
ctx->timeout = clcf->resolver_timeout;
u->resolved->ctx = ctx;
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
found:
if (uscf == NULL) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"no upstream configuration");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->upstream = uscf;
#if (NGX_HTTP_SSL)
u->ssl_name = uscf->host;
#endif
// 默认为ngx_http_upstream_init_round_robin_peer
// uscf 为选好的一组上游(一个upstream xxx {})
// 而 init回调会真正的把这一组上游服务器ip赋值到u->peer 结构
// 包括获取一台ip的回调和释放一台回调等
if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->peer.start_time = ngx_current_msec;
if (u->conf->next_upstream_tries
&& u->peer.tries > u->conf->next_upstream_tries)
{
// 配置尝试的后端的次数,默认是所有后端都尝试一次
u->peer.tries = u->conf->next_upstream_tries;
}
// 选择一台上游服务器并发起连接请求
ngx_http_upstream_connect(r, u);
}
// ngx_http_upstream_round_robin.c
// u->peer.data 的指向结构体
typedef struct {
ngx_uint_t config;
ngx_http_upstream_rr_peers_t *peers; // 一组上游的所有ip。 这里是指针,会修改
ngx_http_upstream_rr_peer_t *current; // 被选出来的一台上游服务器
uintptr_t *tried; // 和data 用于上游错误的标记
uintptr_t data; //
} ngx_http_upstream_rr_peer_data_t;
ngx_int_t
ngx_http_upstream_init_round_robin_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
ngx_uint_t n;
ngx_http_upstream_rr_peer_data_t *rrp;
rrp = r->upstream->peer.data;
if (rrp == NULL) {
rrp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_rr_peer_data_t));
if (rrp == NULL) {
return NGX_ERROR;
}
r->upstream->peer.data = rrp;
}
rrp->peers = us->peer.data;
rrp->current = NULL;
rrp->config = 0;
n = rrp->peers->number;
if (rrp->peers->next && rrp->peers->next->number > n) {
n = rrp->peers->next->number;
}
if (n <= 8 * sizeof(uintptr_t)) {
rrp->tried = &rrp->data;
rrp->data = 0;
} else {
n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t));
rrp->tried = ngx_pcalloc(r->pool, n * sizeof(uintptr_t));
if (rrp->tried == NULL) {
return NGX_ERROR;
}
}
r->upstream->peer.get = ngx_http_upstream_get_round_robin_peer;
r->upstream->peer.free = ngx_http_upstream_free_round_robin_peer;
r->upstream->peer.tries = ngx_http_upstream_tries(rrp->peers);
#if (NGX_HTTP_SSL)
r->upstream->peer.set_session =
ngx_http_upstream_set_round_robin_peer_session;
r->upstream->peer.save_session =
ngx_http_upstream_save_round_robin_peer_session;
#endif
return NGX_OK;
}
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
r->connection->log->action = "connecting to upstream";
if (u->state && u->state->response_time) {
u->state->response_time = ngx_current_msec - u->state->response_time;
}
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
u->state->response_time = ngx_current_msec;
u->state->connect_time = (ngx_msec_t) -1;
u->state->header_time = (ngx_msec_t) -1;
// 选取一个上游服务器,发起链接请求
// NGX_DECLINED connect错误
// NGX_AGAIN 还没完全建立好链接,因为connnect是非阻塞的fd
// NGX_OK 成功建立好链接,可写。
rc = ngx_event_connect_peer(&u->peer);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 选择的上游服务器
u->state->peer = u->peer.name;
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
// 和上游服务器的链接
c = u->peer.connection;
// 和上游的链接的data
c->data = r;
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
// 超时或可写事件 回调函数
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;
if (c->pool == NULL) {
/* we need separate pool here to be able to cache SSL connections */
c->pool = ngx_create_pool(128, r->connection->log);
if (c->pool == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
c->log = r->connection->log;
c->pool->log = c->log;
c->read->log = c->log;
c->write->log = c->log;
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
if (u->request_sent) {
if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (r->request_body
&& r->request_body->buf
&& r->request_body->temp_file
&& r == r->main)
{
/*
* the r->request_body->buf can be reused for one request only,
* the subrequests should allocate their own temporary bufs
*/
u->output.free = ngx_alloc_chain_link(r->pool);
if (u->output.free == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->output.free->buf = r->request_body->buf;
u->output.free->next = NULL;
u->output.allocated = 1;
r->request_body->buf->pos = r->request_body->buf->start;
r->request_body->buf->last = r->request_body->buf->start;
r->request_body->buf->tag = u->output.tag;
}
u->request_sent = 0;
u->request_body_sent = 0;
if (rc == NGX_AGAIN) {
// 非阻塞链接返回EINPROGRESS
// 可写事件超时,如果成功连接后会触发epoll可写
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
ngx_http_upstream_send_request(r, u, 1);
}
向上游发起链接请求,删除了部分epoll无关的代码。
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
int rc, type;
ngx_int_t event;
ngx_err_t err;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
// ngx_http_upstream_get_round_robin_peer
// 上游IP赋值到pc->sockaddr
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}
type = (pc->type ? pc->type : SOCK_STREAM);
s = ngx_socket(pc->sockaddr->sa_family, type, 0);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",
(type == SOCK_STREAM) ? "stream" : "dgram", s);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
c = ngx_get_connection(s, pc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
c->type = type;
if (pc->rcvbuf) {
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(const void *) &pc->rcvbuf, sizeof(int)) == -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_RCVBUF) failed");
goto failed;
}
}
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
goto failed;
}
if (pc->local) {
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
}
if (type == SOCK_STREAM) {
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
c->sendfile = 1;
} else { /* type == SOCK_DGRAM */
c->recv = ngx_udp_recv;
c->send = ngx_send;
c->send_chain = ngx_udp_send_chain;
}
c->log_error = pc->log_error;
rev = c->read;
wev = c->write;
rev->log = pc->log;
wev->log = pc->log;
pc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"connect to %V, fd:%d #%uA", pc->name, s, c->number);
rc = connect(s, pc->sockaddr, pc->socklen);
if (rc == -1) {
err = ngx_socket_errno;
if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
&& err != NGX_EAGAIN
#endif
)
{
if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
/*
* Linux returns EAGAIN instead of ECONNREFUSED
* for unix sockets if listen queue is full
*/
|| err == NGX_EAGAIN
#endif
|| err == NGX_ECONNRESET
|| err == NGX_ENETDOWN
|| err == NGX_ENETUNREACH
|| err == NGX_EHOSTDOWN
|| err == NGX_EHOSTUNREACH)
{
level = NGX_LOG_ERR;
} else {
level = NGX_LOG_CRIT;
}
ngx_log_error(level, c->log, err, "connect() to %V failed",
pc->name);
ngx_close_connection(c);
pc->connection = NULL;
return NGX_DECLINED;
}
}
if (ngx_add_conn) {
if (rc == -1) {
/* NGX_EINPROGRESS */
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
}
failed:
ngx_close_connection(c);
pc->connection = NULL;
return NGX_ERROR;
}
// ngx_http_upstream_round_robin.c
ngx_int_t
ngx_http_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_http_upstream_rr_peer_data_t *rrp = data;
ngx_int_t rc;
ngx_uint_t i, n;
ngx_http_upstream_rr_peer_t *peer;
ngx_http_upstream_rr_peers_t *peers;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get rr peer, try: %ui", pc->tries);
pc->cached = 0;
pc->connection = NULL;
// peers指向ups conf的配置
peers = rrp->peers;
// 因可能下线上游,因此需要加锁
ngx_http_upstream_rr_peers_wlock(peers);
if (peers->single) {
peer = peers->peer;
if (peer->down) {
goto failed;
}
if (peer->max_conns && peer->conns >= peer->max_conns) {
goto failed;
}
rrp->current = peer;
} else {
/* there are several peers */
peer = ngx_http_upstream_get_peer(rrp);
if (peer == NULL) {
goto failed;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get rr peer, current: %p %i",
peer, peer->current_weight);
}
pc->sockaddr = peer->sockaddr;
pc->socklen = peer->socklen;
pc->name = &peer->name;
peer->conns++;
ngx_http_upstream_rr_peers_unlock(peers);
return NGX_OK;
failed:
if (peers->next) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "backup servers");
rrp->peers = peers->next;
n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
/ (8 * sizeof(uintptr_t));
for (i = 0; i < n; i++) {
rrp->tried[i] = 0;
}
ngx_http_upstream_rr_peers_unlock(peers);
rc = ngx_http_upstream_get_round_robin_peer(pc, rrp);
if (rc != NGX_BUSY) {
return rc;
}
ngx_http_upstream_rr_peers_wlock(peers);
}
ngx_http_upstream_rr_peers_unlock(peers);
pc->name = peers->name;
return NGX_BUSY;
}
向上游发送请求信息
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u,
ngx_uint_t do_write)
{
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
if (u->state->connect_time == (ngx_msec_t) -1) {
// 连接耗时
u->state->connect_time = ngx_current_msec - u->state->response_time;
}
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
// 发送body
rc = ngx_http_upstream_send_request_body(r, u, do_write);
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
if (rc == NGX_AGAIN) {
if (!c->write->ready) {
ngx_add_timer(c->write, u->conf->send_timeout);
} else if (c->write->timer_set) {
ngx_del_timer(c->write);
}
// 可写事件加入epoll中
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
// body 发送完毕
u->request_body_sent = 1;
if (c->write->timer_set) {
// 删除发送超时
ngx_del_timer(c->write);
}
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
// 上游body已发送完毕,回调函数为空函数。
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 添加读超时
ngx_add_timer(c->read, u->conf->read_timeout);
if (c->read->ready) {
// 处理上游响应
ngx_http_upstream_process_header(r, u);
return;
}
}
// 向上游发送请求内容
// 把u->request_bufs 的内容调用ngx_output_chain写到上游。
// ngx_output_chain 又调用了 u->output->output_filter 回调函数。
// u->output.output_filter = ngx_chain_writer;
// u->output.filter_ctx = &u->writer;
static ngx_int_t
ngx_http_upstream_send_request_body(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_uint_t do_write)
{
ngx_int_t rc;
ngx_chain_t *out, *cl, *ln;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request body");
// 请求是否有缓存
if (!r->request_body_no_buffering) {
// 请求已全部读到内存
/* buffered request body */
// 请求还未发送到上游
if (!u->request_sent) {
u->request_sent = 1;
// 整个请求内容。 ngx_http_upstream_init_request函数中调用create_request
out = u->request_bufs;
} else {
out = NULL;
}
rc = ngx_output_chain(&u->output, out);
if (rc == NGX_AGAIN) {
u->request_body_blocked = 1;
} else {
u->request_body_blocked = 0;
}
return rc;
}
if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;
if (r->request_body->bufs) {
for (cl = out; cl->next; cl = cl->next) { /* void */ }
cl->next = r->request_body->bufs;
r->request_body->bufs = NULL;
}
c = u->peer.connection;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
return NGX_ERROR;
}
r->read_event_handler = ngx_http_upstream_read_request_handler;
} else {
out = NULL;
}
// 请求有缓存,需要循环多次发送到上游
for ( ;; ) {
if (do_write) {
// 发送到上游,out是构造好的要发送到上游的内容,请求行+请求头+body
//
rc = ngx_output_chain(&u->output, out);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}
while (out) {
ln = out;
out = out->next;
ngx_free_chain(r->pool, ln);
}
if (rc == NGX_AGAIN) {
u->request_body_blocked = 1;
} else {
u->request_body_blocked = 0;
}
if (rc == NGX_OK && !r->reading_body) {
break;
}
}
if (r->reading_body) {
/* read client request body */
rc = ngx_http_read_unbuffered_request_body(r);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
out = r->request_body->bufs;
r->request_body->bufs = NULL;
}
/* stop if there is nothing to send */
if (out == NULL) {
rc = NGX_AGAIN;
break;
}
do_write = 1;
}
if (!r->reading_body) {
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler =
ngx_http_upstream_rd_check_broken_connection;
}
}
return rc;
}
上游响应处理函数
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process header");
c->log->action = "reading response header from upstream";
if (c->read->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
if (u->buffer.start == NULL) {
// 接受上游响应缓存区,和proxy_buffer_size相关,默认一页内存大小,建议改大一些
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
u->buffer.end = u->buffer.start + u->conf->buffer_size;
u->buffer.temporary = 1;
u->buffer.tag = u->output.tag;
// 初始化响应头链表
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
#if (NGX_HTTP_CACHE)
if (r->cache) {
u->buffer.pos += r->cache->header_start;
u->buffer.last = u->buffer.pos;
}
#endif
}
for ( ;; ) {
// 读取上游的resp
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
if (n == NGX_AGAIN) {
#if 0
ngx_add_timer(rev, u->read_timeout);
#endif
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
if (n == 0) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream prematurely closed connection");
}
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
u->state->bytes_received += n;
u->buffer.last += n;
#if 0
u->valid_header_in = 0;
u->peer.cached = 0;
#endif
// proxy 模块 u->process_header = ngx_http_proxy_process_status_line;
// ngx_http_proxy_process_status_line 处理上游返回的resp
rc = u->process_header(r);
if (rc == NGX_AGAIN) {
if (u->buffer.last == u->buffer.end) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream sent too big header");
ngx_http_upstream_next(r, u,
NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
continue;
}
break;
}
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* rc == NGX_OK */
// 成功处理完上游返回的响应头
u->state->header_time = ngx_current_msec - u->state->response_time;
if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {
// 根据上游的状态码判断是否要重新选择一台上游服务器
// 最终还会调用ngx_http_upstream_next
if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
return;
}
// 是否要截获上游服务器返回的错误,根据配置proxy_intercept_errors。
// 如果配置为on,则查看error_pages是否配置了对应的错误码,如果配置了就返回对应的错误页面。否则返回上游页面。
if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
return;
}
}
// 根据响应头跳转location、隐藏响应头等
// 复制上游的响应头到下游
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
if (!r->subrequest_in_memory) {
ngx_http_upstream_send_response(r, u);
return;
}
/* subrequest content in memory */
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
// u->out_bufs
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
n = u->buffer.last - u->buffer.pos;
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
ngx_http_upstream_process_body_in_memory(r, u);
}
// ngx_http_proxy_module.c 处理上游返回的状态行和header信息
static ngx_int_t
ngx_http_proxy_process_status_line(ngx_http_request_t *r)
{
size_t len;
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_proxy_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
if (ctx == NULL) {
return NGX_ERROR;
}
u = r->upstream;
rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status);
if (rc == NGX_AGAIN) {
return rc;
}
if (rc == NGX_ERROR) {
#if (NGX_HTTP_CACHE)
if (r->cache) {
r->http_version = NGX_HTTP_VERSION_9;
return NGX_OK;
}
#endif
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"upstream sent no valid HTTP/1.0 header");
#if 0
if (u->accel) {
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
#endif
r->http_version = NGX_HTTP_VERSION_9;
u->state->status = NGX_HTTP_OK;
u->headers_in.connection_close = 1;
return NGX_OK;
}
if (u->state && u->state->status == 0) {
u->state->status = ctx->status.code;
}
u->headers_in.status_n = ctx->status.code;
len = ctx->status.end - ctx->status.start;
u->headers_in.status_line.len = len;
u->headers_in.status_line.data = ngx_pnalloc(r->pool, len);
if (u->headers_in.status_line.data == NULL) {
return NGX_ERROR;
}
ngx_memcpy(u->headers_in.status_line.data, ctx->status.start, len);
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http proxy status %ui \"%V\"",
u->headers_in.status_n, &u->headers_in.status_line);
if (ctx->status.http_version < NGX_HTTP_VERSION_11) {
u->headers_in.connection_close = 1;
}
// 处理上游resp的header的回调
u->process_header = ngx_http_proxy_process_header;
return ngx_http_proxy_process_header(r);
}
static ngx_int_t
ngx_http_proxy_process_header(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_table_elt_t *h;
ngx_http_upstream_t *u;
ngx_http_proxy_ctx_t *ctx;
ngx_http_upstream_header_t *hh;
ngx_http_upstream_main_conf_t *umcf;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
for ( ;; ) {
rc = ngx_http_parse_header_line(r, &r->upstream->buffer, 1);
if (rc == NGX_OK) {
/* a header line has been parsed successfully */
h = ngx_list_push(&r->upstream->headers_in.headers);
if (h == NULL) {
return NGX_ERROR;
}
h->hash = r->header_hash;
h->key.len = r->header_name_end - r->header_name_start;
h->value.len = r->header_end - r->header_start;
h->key.data = ngx_pnalloc(r->pool,
h->key.len + 1 + h->value.len + 1 + h->key.len);
if (h->key.data == NULL) {
h->hash = 0;
return NGX_ERROR;
}
h->value.data = h->key.data + h->key.len + 1;
h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1;
ngx_memcpy(h->key.data, r->header_name_start, h->key.len);
h->key.data[h->key.len] = '\0';
ngx_memcpy(h->value.data, r->header_start, h->value.len);
h->value.data[h->value.len] = '\0';
if (h->key.len == r->lowcase_index) {
ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len);
} else {
ngx_strlow(h->lowcase_key, h->key.data, h->key.len);
}
hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,
h->lowcase_key, h->key.len);
if (hh && hh->handler(r, h, hh->offset) != NGX_OK) {
return NGX_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http proxy header: \"%V: %V\"",
&h->key, &h->value);
continue;
}
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
/* a whole header has been parsed successfully */
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http proxy header done");
/*
* if no "Server" and "Date" in header line,
* then add the special empty headers
*/
if (r->upstream->headers_in.server == NULL) {
h = ngx_list_push(&r->upstream->headers_in.headers);
if (h == NULL) {
return NGX_ERROR;
}
h->hash = ngx_hash(ngx_hash(ngx_hash(ngx_hash(
ngx_hash('s', 'e'), 'r'), 'v'), 'e'), 'r');
ngx_str_set(&h->key, "Server");
ngx_str_null(&h->value);
h->lowcase_key = (u_char *) "server";
}
if (r->upstream->headers_in.date == NULL) {
h = ngx_list_push(&r->upstream->headers_in.headers);
if (h == NULL) {
return NGX_ERROR;
}
h->hash = ngx_hash(ngx_hash(ngx_hash('d', 'a'), 't'), 'e');
ngx_str_set(&h->key, "Date");
ngx_str_null(&h->value);
h->lowcase_key = (u_char *) "date";
}
/* clear content length if response is chunked */
u = r->upstream;
if (u->headers_in.chunked) {
u->headers_in.content_length_n = -1;
}
/*
* set u->keepalive if response has no body; this allows to keep
* connections alive in case of r->header_only or X-Accel-Redirect
*/
ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
|| u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
|| ctx->head
|| (!u->headers_in.chunked
&& u->headers_in.content_length_n == 0))
{
u->keepalive = !u->headers_in.connection_close;
}
// 如果上游resp状态码是101 则说明需要协议变更
if (u->headers_in.status_n == NGX_HTTP_SWITCHING_PROTOCOLS) {
u->keepalive = 0;
if (r->headers_in.upgrade) {
u->upgrade = 1;
}
}
return NGX_OK;
}
if (rc == NGX_AGAIN) {
return NGX_AGAIN;
}
/* there was error while a header line parsing */
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"upstream sent invalid header");
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
}
解析上游resp的header copy到下游request的headers_out
static ngx_int_t
ngx_http_upstream_process_headers(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_str_t uri, args;
ngx_uint_t i, flags;
ngx_list_part_t *part;
ngx_table_elt_t *h;
ngx_http_upstream_header_t *hh;
ngx_http_upstream_main_conf_t *umcf;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
if (u->headers_in.x_accel_redirect
&& !(u->conf->ignore_headers & NGX_HTTP_UPSTREAM_IGN_XA_REDIRECT))
{
// 处理x-accel-redirect 这个头执行内部跳转
ngx_http_upstream_finalize_request(r, u, NGX_DECLINED);
part = &u->headers_in.headers.part;
h = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
h = part->elts;
i = 0;
}
hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash,
h[i].lowcase_key, h[i].key.len);
if (hh && hh->redirect) {
if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) {
ngx_http_finalize_request(r,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return NGX_DONE;
}
}
}
uri = u->headers_in.x_accel_redirect->value;
if (uri.data[0] == '@') {
ngx_http_named_location(r, &uri);
} else {
ngx_str_null(&args);
flags = NGX_HTTP_LOG_UNSAFE;
if (ngx_http_parse_unsafe_uri(r, &uri, &args, &flags) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_NOT_FOUND);
return NGX_DONE;
}
if (r->method != NGX_HTTP_HEAD) {
r->method = NGX_HTTP_GET;
r->method_name = ngx_http_core_get_method;
}
ngx_http_internal_redirect(r, &uri, &args);
}
ngx_http_finalize_request(r, NGX_DONE);
return NGX_DONE;
}
part = &u->headers_in.headers.part;
h = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
h = part->elts;
i = 0;
}
if (ngx_hash_find(&u->conf->hide_headers_hash, h[i].hash,
h[i].lowcase_key, h[i].key.len))
{
continue;
}
hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash,
h[i].lowcase_key, h[i].key.len);
if (hh) {
if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return NGX_DONE;
}
continue;
}
if (ngx_http_upstream_copy_header_line(r, &h[i], 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return NGX_DONE;
}
}
if (r->headers_out.server && r->headers_out.server->value.data == NULL) {
r->headers_out.server->hash = 0;
}
if (r->headers_out.date && r->headers_out.date->value.data == NULL) {
r->headers_out.date->hash = 0;
}
r->headers_out.status = u->headers_in.status_n;
r->headers_out.status_line = u->headers_in.status_line;
r->headers_out.content_length_n = u->headers_in.content_length_n;
r->disable_not_modified = !u->cacheable;
if (u->conf->force_ranges) {
r->allow_ranges = 1;
r->single_range = 1;
#if (NGX_HTTP_CACHE)
if (r->cached) {
r->single_range = 0;
}
#endif
}
u->length = -1;
return NGX_OK;
}
// 发送resp给下游
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
u->header_sent = 1;
if (u->upgrade) {
#if (NGX_HTTP_CACHE)
if (r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}
#endif
// 支持upgrade websocket ...
ngx_http_upstream_upgrade(r, u);
return;
}
c = r->connection;
if (r->header_only) { // 仅需要发送header到下游(head 请求,204响应码,304响应码)
if (!u->buffering) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
if (!u->cacheable && !u->store) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
u->pipe->downstream_error = 1;
}
if (r->request_body && r->request_body->temp_file
&& r == r->main && !r->preserve_body)
{
// 如果body被保存到了临时文件且不需要保留body(preserve_body应该是保护body的flag)
// 清理保存body的临时文件
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (!u->buffering) { // proxy_buffering off;
#if (NGX_HTTP_CACHE)
if (r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}
#endif
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
n = u->buffer.last - u->buffer.pos;
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
return;
}
/* TODO: preallocate event_pipe bufs, look "Content-Length" */
#if (NGX_HTTP_CACHE)
if (r->cache && r->cache->file.fd != NGX_INVALID_FILE) {
ngx_pool_run_cleanup_file(r->pool, r->cache->file.fd);
r->cache->file.fd = NGX_INVALID_FILE;
}
switch (ngx_http_test_predicates(r, u->conf->no_cache)) {
case NGX_ERROR:
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
case NGX_DECLINED:
u->cacheable = 0;
break;
default: /* NGX_OK */
if (u->cache_status == NGX_HTTP_CACHE_BYPASS) {
/* create cache if previously bypassed */
if (ngx_http_file_cache_create(r) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
break;
}
if (u->cacheable) {
time_t now, valid;
now = ngx_time();
valid = r->cache->valid_sec;
if (valid == 0) {
valid = ngx_http_file_cache_valid(u->conf->cache_valid,
u->headers_in.status_n);
if (valid) {
r->cache->valid_sec = now + valid;
}
}
if (valid) {
r->cache->date = now;
r->cache->body_start = (u_short) (u->buffer.pos - u->buffer.start);
if (u->headers_in.status_n == NGX_HTTP_OK
|| u->headers_in.status_n == NGX_HTTP_PARTIAL_CONTENT)
{
r->cache->last_modified = u->headers_in.last_modified_time;
if (u->headers_in.etag) {
r->cache->etag = u->headers_in.etag->value;
} else {
ngx_str_null(&r->cache->etag);
}
} else {
r->cache->last_modified = -1;
ngx_str_null(&r->cache->etag);
}
if (ngx_http_file_cache_set_header(r, u->buffer.start) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
} else {
u->cacheable = 0;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http cacheable: %d", u->cacheable);
if (u->cacheable == 0 && r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}
if (r->header_only && !u->cacheable && !u->store) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
#endif
// 配置了缓存(proxy_buffering=on)
p = u->pipe;
// ngx_http_upstream_output_filter resp body 发到下游的时候调用了body的过滤函数
p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs; // 用于读取resp缓存块的大小和个数
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection; // 和上游也就是源站的连接
p->downstream = c; // 和下游也就是和客户端的连接
p->pool = r->pool;
p->log = c->log;
p->limit_rate = u->conf->limit_rate;
p->start_sec = ngx_time();
p->cacheable = u->cacheable || u->store;
p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path;
p->temp_file->pool = r->pool;
if (p->cacheable) {
p->temp_file->persistent = 1;
#if (NGX_HTTP_CACHE)
if (r->cache && !r->cache->file_cache->use_temp_path) {
p->temp_file->path = r->cache->file_cache->path;
p->temp_file->file.name = r->cache->file.name;
}
#endif
} else {
p->temp_file->log_level = NGX_LOG_WARN;
p->temp_file->warn = "an upstream response is buffered "
"to a temporary file";
}
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;
#if (NGX_THREADS)
if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
p->thread_handler = ngx_http_upstream_thread_handler;
p->thread_ctx = r;
}
#endif
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->preread_bufs->buf = &u->buffer; // 注意这里是指针
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
// 读响应头读回来的响应body大小
p->preread_size = u->buffer.last - u->buffer.pos;
if (u->cacheable) {
p->buf_to_file = ngx_calloc_buf(r->pool);
if (p->buf_to_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->buf_to_file->start = u->buffer.start;
p->buf_to_file->pos = u->buffer.start;
p->buf_to_file->last = u->buffer.pos;
p->buf_to_file->temporary = 1;
}
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
/* the posted aio operation may corrupt a shadow buffer */
p->single_buf = 1;
}
/* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
p->free_bufs = 1;
/*
* event_pipe would do u->buffer.last += p->preread_size
* as though these bytes were read
*/
// 注意这里也影响了p->preread_bufs。否则很难理解ngx_event_pipe_read_upstream
u->buffer.last = u->buffer.pos;
if (u->conf->cyclic_temp_file) {
/*
* we need to disable the use of sendfile() if we use cyclic temp file
* because the writing a new data may interfere with sendfile()
* that uses the same kernel file pages (at least on FreeBSD)
*/
p->cyclic_temp_file = 1;
c->sendfile = 0;
} else {
p->cyclic_temp_file = 0;
}
p->read_timeout = u->conf->read_timeout;
p->send_timeout = clcf->send_timeout;
p->send_lowat = clcf->send_lowat;
p->length = -1;
// proxy 模块定义的是ngx_http_proxy_input_filter_init
// 根据响应头状态码是否不需要body、是否是chunked编码
// 设置 u->pipe->input_filter = ngx_http_proxy_chunked_filter; 或 u->pipe->length 和 u->length
if (u->input_filter_init
&& u->input_filter_init(p->input_ctx) != NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 两个回调函数主要是调用ngx_event_pipe函数
// ngx_event_pipe(p, 0)
u->read_event_handler = ngx_http_upstream_process_upstream;
// ngx_event_pipe(p, 1)
r->write_event_handler = ngx_http_upstream_process_downstream;
ngx_http_upstream_process_upstream(r, u);
}
// 另选择一台上游服务器连接
```c
static void
ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u,
ngx_uint_t ft_type)
{
ngx_msec_t timeout;
ngx_uint_t status, state;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http next upstream, %xi", ft_type);
if (u->peer.sockaddr) {
// 发生错误的上游服务器
if (ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_403
|| ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_404)
{
state = NGX_PEER_NEXT;
} else {
state = NGX_PEER_FAILED;
}
// ngx_http_upstream_free_round_robin_peer
u->peer.free(&u->peer, u->peer.data, state);
u->peer.sockaddr = NULL;
}
if (ft_type == NGX_HTTP_UPSTREAM_FT_TIMEOUT) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, NGX_ETIMEDOUT,
"upstream timed out");
}
if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) {
/* TODO: inform balancer instead */
u->peer.tries++;
}
switch (ft_type) {
case NGX_HTTP_UPSTREAM_FT_TIMEOUT:
case NGX_HTTP_UPSTREAM_FT_HTTP_504:
status = NGX_HTTP_GATEWAY_TIME_OUT;
break;
case NGX_HTTP_UPSTREAM_FT_HTTP_500:
status = NGX_HTTP_INTERNAL_SERVER_ERROR;
break;
case NGX_HTTP_UPSTREAM_FT_HTTP_503:
status = NGX_HTTP_SERVICE_UNAVAILABLE;
break;
case NGX_HTTP_UPSTREAM_FT_HTTP_403:
status = NGX_HTTP_FORBIDDEN;
break;
case NGX_HTTP_UPSTREAM_FT_HTTP_404:
status = NGX_HTTP_NOT_FOUND;
break;
case NGX_HTTP_UPSTREAM_FT_HTTP_429:
status = NGX_HTTP_TOO_MANY_REQUESTS;
break;
/*
* NGX_HTTP_UPSTREAM_FT_BUSY_LOCK and NGX_HTTP_UPSTREAM_FT_MAX_WAITING
* never reach here
*/
default:
status = NGX_HTTP_BAD_GATEWAY;
}
if (r->connection->error) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_CLIENT_CLOSED_REQUEST);
return;
}
u->state->status = status;
timeout = u->conf->next_upstream_timeout;
// 已经向上游发送了请求,且是post方法或。。。
if (u->request_sent
&& (r->method & (NGX_HTTP_POST|NGX_HTTP_LOCK|NGX_HTTP_PATCH)))
{
// 上游不是幂等的
ft_type |= NGX_HTTP_UPSTREAM_FT_NON_IDEMPOTENT;
}
if (u->peer.tries == 0
|| ((u->conf->next_upstream & ft_type) != ft_type)
|| (u->request_sent && r->request_body_no_buffering)
|| (timeout && ngx_current_msec - u->peer.start_time >= timeout))
{
#if (NGX_HTTP_CACHE)
if (u->cache_status == NGX_HTTP_CACHE_EXPIRED
&& ((u->conf->cache_use_stale & ft_type) || r->cache->stale_error))
{
ngx_int_t rc;
rc = u->reinit_request(r);
if (rc != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
u->cache_status = NGX_HTTP_CACHE_STALE;
rc = ngx_http_upstream_cache_send(r, u);
if (rc == NGX_DONE) {
return;
}
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
#endif
ngx_http_upstream_finalize_request(r, u, status);
return;
}
if (u->peer.connection) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"close http upstream connection: %d",
u->peer.connection->fd);
#if (NGX_HTTP_SSL)
if (u->peer.connection->ssl) {
u->peer.connection->ssl->no_wait_shutdown = 1;
u->peer.connection->ssl->no_send_shutdown = 1;
(void) ngx_ssl_shutdown(u->peer.connection);
}
#endif
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
u->peer.connection = NULL;
}
ngx_http_upstream_connect(r, u);
}
```c
结束上游请求
static void
ngx_http_upstream_finalize_request(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_int_t rc)
{
ngx_uint_t flush;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"finalize http upstream request: %i", rc);
if (u->cleanup == NULL) {
// 上游请求已经结束
/* the request was already finalized */
ngx_http_finalize_request(r, NGX_DONE);
return;
}
*u->cleanup = NULL;
u->cleanup = NULL;
if (u->resolved && u->resolved->ctx) {
ngx_resolve_name_done(u->resolved->ctx);
u->resolved->ctx = NULL;
}
if (u->state && u->state->response_time) {
u->state->response_time = ngx_current_msec - u->state->response_time;
if (u->pipe && u->pipe->read_length) {
u->state->bytes_received += u->pipe->read_length
- u->pipe->preread_size;
u->state->response_length = u->pipe->read_length;
}
}
// 调用ngx_http_proxy_finalize_request
u->finalize_request(r, rc);
if (u->peer.free && u->peer.sockaddr) {
u->peer.free(&u->peer, u->peer.data, 0);
u->peer.sockaddr = NULL;
}
if (u->peer.connection) {
#if (NGX_HTTP_SSL)
/* TODO: do not shutdown persistent connection */
if (u->peer.connection->ssl) {
/*
* We send the "close notify" shutdown alert to the upstream only
* and do not wait its "close notify" shutdown alert.
* It is acceptable according to the TLS standard.
*/
u->peer.connection->ssl->no_wait_shutdown = 1;
(void) ngx_ssl_shutdown(u->peer.connection);
}
#endif
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"close http upstream connection: %d",
u->peer.connection->fd);
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
// 关闭上游连接
ngx_close_connection(u->peer.connection);
}
u->peer.connection = NULL;
if (u->pipe && u->pipe->temp_file) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream temp fd: %d",
u->pipe->temp_file->file.fd);
}
if (u->store && u->pipe && u->pipe->temp_file
&& u->pipe->temp_file->file.fd != NGX_INVALID_FILE)
{
if (ngx_delete_file(u->pipe->temp_file->file.name.data)
== NGX_FILE_ERROR)
{
ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno,
ngx_delete_file_n " \"%s\" failed",
u->pipe->temp_file->file.name.data);
}
}
#if (NGX_HTTP_CACHE)
if (r->cache) {
if (u->cacheable) {
if (rc == NGX_HTTP_BAD_GATEWAY || rc == NGX_HTTP_GATEWAY_TIME_OUT) {
time_t valid;
valid = ngx_http_file_cache_valid(u->conf->cache_valid, rc);
if (valid) {
r->cache->valid_sec = ngx_time() + valid;
r->cache->error = rc;
}
}
}
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}
#endif
if (r->subrequest_in_memory
&& u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE)
{
u->buffer.last = u->buffer.pos;
}
r->read_event_handler = ngx_http_block_reading;
if (rc == NGX_DECLINED) {
return;
}
r->connection->log->action = "sending to client";
if (!u->header_sent
|| rc == NGX_HTTP_REQUEST_TIME_OUT
|| rc == NGX_HTTP_CLIENT_CLOSED_REQUEST
|| (u->pipe && u->pipe->downstream_error))
{
ngx_http_finalize_request(r, rc);
return;
}
flush = 0;
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
rc = NGX_ERROR;
flush = 1;
}
if (r->header_only) {
ngx_http_finalize_request(r, rc);
return;
}
if (rc == 0) {
rc = ngx_http_send_special(r, NGX_HTTP_LAST);
} else if (flush) {
r->keepalive = 0;
rc = ngx_http_send_special(r, NGX_HTTP_FLUSH);
}
ngx_http_finalize_request(r, rc);
}
ngx_event_pipe.c
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
ngx_int_t rc;
ngx_uint_t flags;
ngx_event_t *rev, *wev;
for ( ;; ) {
if (do_write) {
p->log->action = "sending to client";
rc = ngx_event_pipe_write_to_downstream(p);
if (rc == NGX_ABORT) {
return NGX_ABORT;
}
if (rc == NGX_BUSY) {
return NGX_OK;
}
}
p->read = 0;
p->upstream_blocked = 0;
p->log->action = "reading upstream";
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
if (!p->read && !p->upstream_blocked) {
break;
}
do_write = 1;
}
if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;
flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}
if (!rev->delayed) {
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
}
if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}
if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);
} else if (wev->timer_set) {
ngx_del_timer(wev);
}
}
}
return NGX_OK;
}
总结
u->pipe
是resp body传输的核心,定义也非常贴切,就是相当于一根接通上下游的一个单向管道,把上游的body转到下游,其中input_filter
是从上游读回resp body处理函数,output_filter
是把body发送到下游的处理函数。
容易混淆的还有u->output
和 u->input_filter
, 其中u->output
是把请求发送到上游的回调结构体,和 u->pipe
没关系。从ngx_http_read_client_request_body(r, ngx_http_upstream_init)
开始,读取完body,调用 ngx_http_upstream_init
-> ngx_http_upstream_init_request
-> ngx_http_upstream_connect
-> ngx_http_upstream_send_request
-> ngx_http_upstream_send_request_body
-> ngx_output_chain
-> u->output.output_filter = ngx_chain_writer
而u->input_filter
是和u->pipe
有关系的,
u->input_filter
也要区别于p->input_filter
,这两个回调用函数作用类似都是处理上游resp body,前者是没有缓冲区(proxy_buffering off;)会调用,后者是有缓冲区(proxy_buffering on;)调用。
附录:关键函数
ngx_http_top_request_body_filter = ngx_http_request_body_save_filter;
ngx_http_init_connection
rev->handler = ngx_http_wait_request_handler;
c->write->handler = ngx_http_empty_handler;
rev->handler = ngx_http_process_request_line;
rev->handler = ngx_http_process_request_headers;
c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
ngx_http_core_content_phase
r->write_event_handler = ngx_http_request_empty_handler;
ngx_http_proxy_handler
u->pipe->input_filter = ngx_http_proxy_copy_filter;
u->pipe->input_ctx = r;
u->input_filter_init = ngx_http_proxy_input_filter_init;
u->input_filter = ngx_http_proxy_non_buffered_copy_filter;
u->input_filter_ctx = r;
r->read_event_handler = ngx_http_read_client_request_body_handler;
r->write_event_handler = ngx_http_request_empty_handler;
rc = ngx_http_do_read_client_request_body(r);
upstream:
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
ngx_http_upstream_send_request ngx_http_upstream_send_request_body
ngx_http_upstream_send_response
p = u->pipe;
p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->upstream = u->peer.connection;
p->downstream = c;
u->read_event_handler = ngx_http_upstream_process_upstream;
r->write_event_handler = ngx_http_upstream_process_downstream;
ngx_http_upstream_process_upstream(r, u);