diff options
author | Andrei Belov <defan@nginx.com> | 2020-03-12 18:40:48 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2020-03-12 18:40:48 +0300 |
commit | 4b7ca39903178e20ec7381205694cb01f0dec6bc (patch) | |
tree | 51afb9c7003b5927183e7ddecd766eb19e421233 /src/nxt_router.c | |
parent | 8414897527ed1616ea39a0cae4d1b8ee170d5cb8 (diff) | |
parent | b3c8a7b33a29208e75dfe4f670cf81dac7b99ccc (diff) | |
download | unit-4b7ca39903178e20ec7381205694cb01f0dec6bc.tar.gz unit-4b7ca39903178e20ec7381205694cb01f0dec6bc.tar.bz2 |
Merged with the default branch.1.16.0-1
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 116 |
1 files changed, 101 insertions, 15 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 3ff048c5..a913284c 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1361,6 +1361,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, send_timeout), }, + + { + nxt_string("body_temp_path"), + NXT_CONF_MAP_STR, + offsetof(nxt_socket_conf_t, body_temp_path), + }, }; @@ -1397,6 +1403,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_int_t ret; nxt_str_t name, path; nxt_app_t *app, *prev; + nxt_str_t *t; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1634,6 +1641,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->routes = routes; } + ret = nxt_upstreams_create(task, tmcf, conf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + http = nxt_conf_get_path(conf, &http_path); #if 0 if (http == NULL) { @@ -1693,6 +1705,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->websocket_conf.read_timeout = 60 * 1000; skcf->websocket_conf.keepalive_interval = 30 * 1000; + nxt_str_null(&skcf->body_temp_path); + if (http != NULL) { ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, nxt_nitems(nxt_router_http_conf), @@ -1714,6 +1728,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + t = &skcf->body_temp_path; + + if (t->length == 0) { + t->start = (u_char *) task->thread->runtime->tmp; + t->length = nxt_strlen(t->start); + } + #if (NXT_TLS) value = nxt_conf_get_path(listener, &certificate_path); @@ -1904,8 +1925,9 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } -nxt_app_t * -nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) +void +nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, + nxt_http_action_t *action) { nxt_app_t *app; @@ -1915,7 +1937,8 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) app = nxt_router_app_find(&tmcf->previous, name); } - return app; + action->u.application = app; + action->handler = nxt_http_application_handler; } @@ -2524,6 +2547,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_work_handler_t handler) { + nxt_int_t ret; nxt_joint_job_t *job; nxt_queue_link_t *qlk; nxt_socket_conf_t *skcf; @@ -2557,6 +2581,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, job->work.data = joint; + ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + joint->count = 1; skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -4152,6 +4181,13 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, state.req_app_link = re_ra; state.app = app; + /* + * Need to increment use count "in advance" because + * nxt_router_port_select() will remove re_ra from lists + * and decrement use count. + */ + nxt_request_app_link_inc_use(re_ra); + nxt_router_port_select(task, &state); goto re_ra_cancelled; @@ -4217,16 +4253,18 @@ re_ra_cancelled: if (re_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { /* - * There should be call nxt_request_app_link_inc_use(re_ra), - * because of one more link in the queue. - * Corresponding decrement is in nxt_router_app_process_request(). + * Reference counter already incremented above, this will + * keep re_ra while nxt_router_app_process_request() + * task is in queue. Reference counter decreased in + * nxt_router_app_process_request() after processing. */ - nxt_request_app_link_inc_use(re_ra); - nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, re_ra); + + } else { + nxt_request_app_link_use(task, re_ra, -1); } } @@ -4234,15 +4272,14 @@ re_ra_cancelled: /* * There should be call nxt_request_app_link_inc_use(req_app_link), * because of one more link in the queue. But one link was - * recently removed from app->requests link. + * recently removed from app->requests linked list. + * Corresponding decrement is in nxt_router_app_process_request(). */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, req_app_link); - /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */ - goto adjust_use; } @@ -4684,6 +4721,21 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, return; } + /* + * At this point we have request req_rpc_data allocated and registered + * in port handlers. Need to fixup request memory pool. Counterpart + * release will be called via following call chain: + * nxt_request_rpc_data_unlink() -> + * nxt_router_http_request_done() -> + * nxt_router_http_request_release() + */ + nxt_mp_retain(r->mem_pool); + + r->timer.task = &engine->task; + r->timer.work_queue = &engine->fast_work_queue; + r->timer.log = engine->task.log; + r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; @@ -4722,7 +4774,8 @@ static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_app_link_t *req_app_link) { - nxt_buf_t *buf; + nxt_fd_t fd; + nxt_buf_t *buf, *body; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; nxt_apr_action_t apr_action; @@ -4781,8 +4834,14 @@ nxt_router_app_prepare_request(nxt_task_t *task, goto release_port; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - -1, req_app_link->stream, reply_port->id, buf, + body = req_app_link->request->body; + fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1; + + res = nxt_port_socket_twrite(task, port, + NXT_PORT_MSG_REQ_HEADERS + | NXT_PORT_MSG_CLOSE_FD, + fd, + req_app_link->stream, reply_port->id, buf, &req_app_link->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { @@ -4791,6 +4850,10 @@ nxt_router_app_prepare_request(nxt_task_t *task, goto release_port; } + if (fd != -1) { + body->file->fd = -1; + } + release_port: nxt_router_app_port_release(task, port, apr_action); @@ -5115,6 +5178,10 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, } } + if (r->body != NULL && nxt_buf_is_file(r->body)) { + lseek(r->body->file->fd, 0, SEEK_SET); + } + return out; } @@ -5185,6 +5252,13 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) state.req_app_link = pending_ra; state.app = app; + /* + * Need to increment use count "in advance" because + * nxt_router_port_select() will remove pending_ra from lists + * and decrement use count. + */ + nxt_request_app_link_inc_use(pending_ra); + nxt_router_port_select(task, &state); } else { @@ -5196,7 +5270,19 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) if (pending_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { - nxt_router_app_prepare_request(task, pending_ra); + /* + * Reference counter already incremented above, this will + * keep pending_ra while nxt_router_app_process_request() + * task is in queue. Reference counter decreased in + * nxt_router_app_process_request() after processing. + */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, pending_ra); + + } else { + nxt_request_app_link_use(task, pending_ra, -1); } } |