diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index a913284c..93b750a0 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -303,7 +303,7 @@ nxt_router_start(nxt_task_t *task, void *data) } #endif - ret = nxt_http_init(task, rt); + ret = nxt_http_init(task); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -462,6 +462,7 @@ nxt_inline void nxt_request_app_link_init(nxt_task_t *task, nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) { + nxt_buf_t *body; nxt_event_engine_t *engine; engine = task->thread->engine; @@ -480,6 +481,17 @@ nxt_request_app_link_init(nxt_task_t *task, req_app_link->work.task = &engine->task; req_app_link->work.obj = req_app_link; req_app_link->work.data = engine; + + body = req_rpc_data->request->body; + + if (body != NULL && nxt_buf_is_file(body)) { + req_app_link->body_fd = body->file->fd; + + body->file->fd = -1; + + } else { + req_app_link->body_fd = -1; + } } @@ -513,6 +525,10 @@ nxt_request_app_link_alloc(nxt_task_t *task, nxt_request_app_link_init(task, req_app_link, req_rpc_data); + if (ra_src != NULL) { + req_app_link->body_fd = ra_src->body_fd; + } + req_app_link->mem_pool = mp; return req_app_link; @@ -654,6 +670,12 @@ nxt_request_app_link_release(nxt_task_t *task, req_app_link->app_port = NULL; } + if (req_app_link->body_fd != -1) { + nxt_fd_close(req_app_link->body_fd); + + req_app_link->body_fd = -1; + } + nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); mp = req_app_link->mem_pool; @@ -713,12 +735,15 @@ nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, nxt_inline void -nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code, - const char *str) +nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, + nxt_request_app_link_t *req_app_link, const char *str) { req_app_link->app_port = NULL; - req_app_link->err_code = code; + req_app_link->err_code = 500; req_app_link->err_str = str; + + nxt_alert(task, "app \"%V\" internal error: %s on #%uD", + &app->name, str, req_app_link->stream); } @@ -3887,7 +3912,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, req_app_link->stream); - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, app, req_app_link, "Failed to start application process"); nxt_request_app_link_use(task, req_app_link, -1); } @@ -4643,7 +4668,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_port_use(task, state->port, -1); } - nxt_request_app_link_error(state->req_app_link, 500, + nxt_request_app_link_error(task, app, state->req_app_link, "Failed to allocate shared req<->app link"); return NXT_ERROR; @@ -4671,7 +4696,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) res = nxt_router_start_app_process(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, app, req_app_link, "Failed to start app process"); return NXT_ERROR; @@ -4774,8 +4799,7 @@ static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_app_link_t *req_app_link) { - nxt_fd_t fd; - nxt_buf_t *buf, *body; + nxt_buf_t *buf; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; nxt_apr_action_t apr_action; @@ -4787,14 +4811,15 @@ nxt_router_app_prepare_request(nxt_task_t *task, apr_action = NXT_APR_REQUEST_FAILED; - c_port = nxt_process_connected_port_find(port->process, reply_port->pid, - reply_port->id); + c_port = nxt_process_connected_port_find(port->process, reply_port); + if (nxt_slow_path(c_port != reply_port)) { res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, port->app, req_app_link, "Failed to send reply port to application"); + goto release_port; } @@ -4805,7 +4830,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_app_msg_prefix[port->app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, port->app, req_app_link, "Failed to prepare message for application"); goto release_port; } @@ -4829,31 +4854,29 @@ nxt_router_app_prepare_request(nxt_task_t *task, &req_app_link->msg_info.tracking, req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, port->app, req_app_link, "Failed to get tracking area"); goto release_port; } - body = req_app_link->request->body; - fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1; + if (req_app_link->body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, + req_app_link->body_fd); + + lseek(req_app_link->body_fd, 0, SEEK_SET); + } - res = nxt_port_socket_twrite(task, port, - NXT_PORT_MSG_REQ_HEADERS - | NXT_PORT_MSG_CLOSE_FD, - fd, + res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, + req_app_link->body_fd, req_app_link->stream, reply_port->id, buf, &req_app_link->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(req_app_link, 500, + nxt_request_app_link_error(task, port->app, req_app_link, "Failed to send message to application"); goto release_port; } - if (fd != -1) { - body->file->fd = -1; - } - release_port: nxt_router_app_port_release(task, port, apr_action); @@ -5178,10 +5201,6 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, } } - if (r->body != NULL && nxt_buf_is_file(r->body)) { - lseek(r->body->file->fd, 0, SEEK_SET); - } - return out; } |