summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c77
1 files changed, 48 insertions, 29 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index a913284c..93b750a0 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -303,7 +303,7 @@ nxt_router_start(nxt_task_t *task, void *data)
}
#endif
- ret = nxt_http_init(task, rt);
+ ret = nxt_http_init(task);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
@@ -462,6 +462,7 @@ nxt_inline void
nxt_request_app_link_init(nxt_task_t *task,
nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
{
+ nxt_buf_t *body;
nxt_event_engine_t *engine;
engine = task->thread->engine;
@@ -480,6 +481,17 @@ nxt_request_app_link_init(nxt_task_t *task,
req_app_link->work.task = &engine->task;
req_app_link->work.obj = req_app_link;
req_app_link->work.data = engine;
+
+ body = req_rpc_data->request->body;
+
+ if (body != NULL && nxt_buf_is_file(body)) {
+ req_app_link->body_fd = body->file->fd;
+
+ body->file->fd = -1;
+
+ } else {
+ req_app_link->body_fd = -1;
+ }
}
@@ -513,6 +525,10 @@ nxt_request_app_link_alloc(nxt_task_t *task,
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
+ if (ra_src != NULL) {
+ req_app_link->body_fd = ra_src->body_fd;
+ }
+
req_app_link->mem_pool = mp;
return req_app_link;
@@ -654,6 +670,12 @@ nxt_request_app_link_release(nxt_task_t *task,
req_app_link->app_port = NULL;
}
+ if (req_app_link->body_fd != -1) {
+ nxt_fd_close(req_app_link->body_fd);
+
+ req_app_link->body_fd = -1;
+ }
+
nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
mp = req_app_link->mem_pool;
@@ -713,12 +735,15 @@ nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
nxt_inline void
-nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
- const char *str)
+nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_app_link_t *req_app_link, const char *str)
{
req_app_link->app_port = NULL;
- req_app_link->err_code = code;
+ req_app_link->err_code = 500;
req_app_link->err_str = str;
+
+ nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
+ &app->name, str, req_app_link->stream);
}
@@ -3887,7 +3912,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, req_app_link->stream);
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, app, req_app_link,
"Failed to start application process");
nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -4643,7 +4668,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_port_use(task, state->port, -1);
}
- nxt_request_app_link_error(state->req_app_link, 500,
+ nxt_request_app_link_error(task, app, state->req_app_link,
"Failed to allocate shared req<->app link");
return NXT_ERROR;
@@ -4671,7 +4696,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
res = nxt_router_start_app_process(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, app, req_app_link,
"Failed to start app process");
return NXT_ERROR;
@@ -4774,8 +4799,7 @@ static void
nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
- nxt_fd_t fd;
- nxt_buf_t *buf, *body;
+ nxt_buf_t *buf;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_apr_action_t apr_action;
@@ -4787,14 +4811,15 @@ nxt_router_app_prepare_request(nxt_task_t *task,
apr_action = NXT_APR_REQUEST_FAILED;
- c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
- reply_port->id);
+ c_port = nxt_process_connected_port_find(port->process, reply_port);
+
if (nxt_slow_path(c_port != reply_port)) {
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to send reply port to application");
+
goto release_port;
}
@@ -4805,7 +4830,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_app_msg_prefix[port->app->type]);
if (nxt_slow_path(buf == NULL)) {
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to prepare message for application");
goto release_port;
}
@@ -4829,31 +4854,29 @@ nxt_router_app_prepare_request(nxt_task_t *task,
&req_app_link->msg_info.tracking,
req_app_link->stream);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to get tracking area");
goto release_port;
}
- body = req_app_link->request->body;
- fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1;
+ if (req_app_link->body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
+ req_app_link->body_fd);
+
+ lseek(req_app_link->body_fd, 0, SEEK_SET);
+ }
- res = nxt_port_socket_twrite(task, port,
- NXT_PORT_MSG_REQ_HEADERS
- | NXT_PORT_MSG_CLOSE_FD,
- fd,
+ res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
+ req_app_link->body_fd,
req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(req_app_link, 500,
+ nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to send message to application");
goto release_port;
}
- if (fd != -1) {
- body->file->fd = -1;
- }
-
release_port:
nxt_router_app_port_release(task, port, apr_action);
@@ -5178,10 +5201,6 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
}
}
- if (r->body != NULL && nxt_buf_is_file(r->body)) {
- lseek(r->body->file->fd, 0, SEEK_SET);
- }
-
return out;
}