summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c116
1 files changed, 101 insertions, 15 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 3ff048c5..a913284c 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -1361,6 +1361,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, send_timeout),
},
+
+ {
+ nxt_string("body_temp_path"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_socket_conf_t, body_temp_path),
+ },
};
@@ -1397,6 +1403,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_int_t ret;
nxt_str_t name, path;
nxt_app_t *app, *prev;
+ nxt_str_t *t;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1634,6 +1641,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->routes = routes;
}
+ ret = nxt_upstreams_create(task, tmcf, conf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
@@ -1693,6 +1705,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->websocket_conf.read_timeout = 60 * 1000;
skcf->websocket_conf.keepalive_interval = 30 * 1000;
+ nxt_str_null(&skcf->body_temp_path);
+
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf),
@@ -1714,6 +1728,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ t = &skcf->body_temp_path;
+
+ if (t->length == 0) {
+ t->start = (u_char *) task->thread->runtime->tmp;
+ t->length = nxt_strlen(t->start);
+ }
+
#if (NXT_TLS)
value = nxt_conf_get_path(listener, &certificate_path);
@@ -1904,8 +1925,9 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
}
-nxt_app_t *
-nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
+void
+nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
+ nxt_http_action_t *action)
{
nxt_app_t *app;
@@ -1915,7 +1937,8 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
app = nxt_router_app_find(&tmcf->previous, name);
}
- return app;
+ action->u.application = app;
+ action->handler = nxt_http_application_handler;
}
@@ -2524,6 +2547,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
+ nxt_int_t ret;
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
@@ -2557,6 +2581,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
job->work.data = joint;
+ ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
joint->count = 1;
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
@@ -4152,6 +4181,13 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
state.req_app_link = re_ra;
state.app = app;
+ /*
+ * Need to increment use count "in advance" because
+ * nxt_router_port_select() will remove re_ra from lists
+ * and decrement use count.
+ */
+ nxt_request_app_link_inc_use(re_ra);
+
nxt_router_port_select(task, &state);
goto re_ra_cancelled;
@@ -4217,16 +4253,18 @@ re_ra_cancelled:
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
/*
- * There should be call nxt_request_app_link_inc_use(re_ra),
- * because of one more link in the queue.
- * Corresponding decrement is in nxt_router_app_process_request().
+ * Reference counter already incremented above, this will
+ * keep re_ra while nxt_router_app_process_request()
+ * task is in queue. Reference counter decreased in
+ * nxt_router_app_process_request() after processing.
*/
- nxt_request_app_link_inc_use(re_ra);
-
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
+
+ } else {
+ nxt_request_app_link_use(task, re_ra, -1);
}
}
@@ -4234,15 +4272,14 @@ re_ra_cancelled:
/*
* There should be call nxt_request_app_link_inc_use(req_app_link),
* because of one more link in the queue. But one link was
- * recently removed from app->requests link.
+ * recently removed from app->requests linked list.
+ * Corresponding decrement is in nxt_router_app_process_request().
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, req_app_link);
- /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */
-
goto adjust_use;
}
@@ -4684,6 +4721,21 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
return;
}
+ /*
+ * At this point we have request req_rpc_data allocated and registered
+ * in port handlers. Need to fixup request memory pool. Counterpart
+ * release will be called via following call chain:
+ * nxt_request_rpc_data_unlink() ->
+ * nxt_router_http_request_done() ->
+ * nxt_router_http_request_release()
+ */
+ nxt_mp_retain(r->mem_pool);
+
+ r->timer.task = &engine->task;
+ r->timer.work_queue = &engine->fast_work_queue;
+ r->timer.log = engine->task.log;
+ r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
+
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
@@ -4722,7 +4774,8 @@ static void
nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
- nxt_buf_t *buf;
+ nxt_fd_t fd;
+ nxt_buf_t *buf, *body;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_apr_action_t apr_action;
@@ -4781,8 +4834,14 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
- -1, req_app_link->stream, reply_port->id, buf,
+ body = req_app_link->request->body;
+ fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1;
+
+ res = nxt_port_socket_twrite(task, port,
+ NXT_PORT_MSG_REQ_HEADERS
+ | NXT_PORT_MSG_CLOSE_FD,
+ fd,
+ req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
@@ -4791,6 +4850,10 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
+ if (fd != -1) {
+ body->file->fd = -1;
+ }
+
release_port:
nxt_router_app_port_release(task, port, apr_action);
@@ -5115,6 +5178,10 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
}
}
+ if (r->body != NULL && nxt_buf_is_file(r->body)) {
+ lseek(r->body->file->fd, 0, SEEK_SET);
+ }
+
return out;
}
@@ -5185,6 +5252,13 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
state.req_app_link = pending_ra;
state.app = app;
+ /*
+ * Need to increment use count "in advance" because
+ * nxt_router_port_select() will remove pending_ra from lists
+ * and decrement use count.
+ */
+ nxt_request_app_link_inc_use(pending_ra);
+
nxt_router_port_select(task, &state);
} else {
@@ -5196,7 +5270,19 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
if (pending_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- nxt_router_app_prepare_request(task, pending_ra);
+ /*
+ * Reference counter already incremented above, this will
+ * keep pending_ra while nxt_router_app_process_request()
+ * task is in queue. Reference counter decreased in
+ * nxt_router_app_process_request() after processing.
+ */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, pending_ra);
+
+ } else {
+ nxt_request_app_link_use(task, pending_ra, -1);
}
}