summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c1280
1 files changed, 701 insertions, 579 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 149a0ff3..b87f588f 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -14,7 +14,7 @@
#include <nxt_port_memory_int.h>
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
-
+#include <nxt_router_request.h>
typedef struct {
nxt_str_t type;
@@ -48,51 +48,6 @@ typedef struct {
#endif
-typedef struct nxt_msg_info_s {
- nxt_buf_t *buf;
- nxt_port_mmap_tracking_t tracking;
- nxt_work_handler_t completion_handler;
-} nxt_msg_info_t;
-
-
-typedef struct nxt_req_app_link_s nxt_req_app_link_t;
-
-
-typedef struct {
- uint32_t stream;
- nxt_app_t *app;
- nxt_port_t *app_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_req_app_link_t *ra;
-
- nxt_queue_link_t link; /* for nxt_conn_t.requests */
-} nxt_req_conn_link_t;
-
-
-struct nxt_req_app_link_s {
- uint32_t stream;
- nxt_atomic_t use_count;
- nxt_port_t *app_port;
- nxt_port_t *reply_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_req_conn_link_t *rc;
-
- nxt_nsec_t res_time;
-
- nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
- nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
- nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
-
- nxt_mp_t *mem_pool;
- nxt_work_t work;
-
- int err_code;
- const char *err_str;
-};
-
-
typedef struct {
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
@@ -106,15 +61,15 @@ typedef struct {
struct nxt_port_select_state_s {
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_request_app_link_t *req_app_link;
- nxt_port_t *failed_port;
- int failed_port_use_delta;
+ nxt_port_t *failed_port;
+ int failed_port_use_delta;
- uint8_t start_process; /* 1 bit */
- nxt_req_app_link_t *shared_ra;
- nxt_port_t *port;
+ uint8_t start_process; /* 1 bit */
+ nxt_request_app_link_t *shared_ra;
+ nxt_port_t *port;
};
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
@@ -129,28 +84,32 @@ static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
+static void nxt_request_app_link_update_peer(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link);
+
nxt_inline void
-nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
+nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
{
- nxt_atomic_fetch_add(&ra->use_count, 1);
+ nxt_atomic_fetch_add(&req_app_link->use_count, 1);
}
nxt_inline void
-nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
+nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link)
{
#if (NXT_DEBUG)
int c;
- c = nxt_atomic_fetch_add(&ra->use_count, -1);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, -1);
nxt_assert(c > 1);
#else
- (void) nxt_atomic_fetch_add(&ra->use_count, -1);
+ (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1);
#endif
}
-static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
+static void nxt_request_app_link_use(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
@@ -257,13 +216,14 @@ static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
+
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
- uint32_t request_failed, uint32_t got_response);
+ nxt_apr_action_t action);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_req_app_link_t *ra);
+ nxt_request_app_link_t *req_app_link);
static void nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_req_app_link_t *ra);
+ nxt_request_app_link_t *req_app_link);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
@@ -287,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
+const nxt_http_request_state_t nxt_http_websocket;
+
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -493,58 +455,63 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
nxt_inline void
-nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
- nxt_req_conn_link_t *rc)
+nxt_request_app_link_init(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
{
nxt_event_engine_t *engine;
engine = task->thread->engine;
- nxt_memzero(ra, sizeof(nxt_req_app_link_t));
+ nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
- ra->stream = rc->stream;
- ra->use_count = 1;
- ra->rc = rc;
- rc->ra = ra;
- ra->reply_port = engine->port;
- ra->request = rc->request;
+ req_app_link->stream = req_rpc_data->stream;
+ req_app_link->use_count = 1;
+ req_app_link->req_rpc_data = req_rpc_data;
+ req_rpc_data->req_app_link = req_app_link;
+ req_app_link->reply_port = engine->port;
+ req_app_link->request = req_rpc_data->request;
+ req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
- ra->work.handler = NULL;
- ra->work.task = &engine->task;
- ra->work.obj = ra;
- ra->work.data = engine;
+ req_app_link->work.handler = NULL;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.obj = req_app_link;
+ req_app_link->work.data = engine;
}
-nxt_inline nxt_req_app_link_t *
-nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
+nxt_inline nxt_request_app_link_t *
+nxt_request_app_link_alloc(nxt_task_t *task,
+ nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_mp_t *mp;
- nxt_req_app_link_t *ra;
+ nxt_mp_t *mp;
+ nxt_request_app_link_t *req_app_link;
- if (ra_src->mem_pool != NULL) {
+ if (ra_src != NULL && ra_src->mem_pool != NULL) {
return ra_src;
}
- mp = ra_src->request->mem_pool;
+ mp = req_rpc_data->request->mem_pool;
- ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
+ req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
- if (nxt_slow_path(ra == NULL)) {
+ if (nxt_slow_path(req_app_link == NULL)) {
- ra_src->rc->ra = NULL;
- ra_src->rc = NULL;
+ req_rpc_data->req_app_link = NULL;
+
+ if (ra_src != NULL) {
+ ra_src->req_rpc_data = NULL;
+ }
return NULL;
}
nxt_mp_retain(mp);
- nxt_router_ra_init(task, ra, ra_src->rc);
+ nxt_request_app_link_init(task, req_app_link, req_rpc_data);
- ra->mem_pool = mp;
+ req_app_link->mem_pool = mp;
- return ra;
+ return req_app_link;
}
@@ -584,177 +551,189 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
static void
-nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
-
-
-static void
-nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
+nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
+ void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = obj;
+ req_app_link = obj;
- nxt_router_ra_update_peer(task, ra);
+ nxt_request_app_link_update_peer(task, req_app_link);
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
static void
-nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_request_app_link_update_peer(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
- nxt_event_engine_t *engine;
- nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
+ nxt_request_rpc_data_t *req_rpc_data;
- engine = ra->work.data;
+ engine = req_app_link->work.data;
if (task->thread->engine != engine) {
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->work.handler = nxt_router_ra_update_peer_handler;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
+ req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post update peer to %p",
- ra->stream, engine);
+ nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
+ req_app_link->stream, engine);
- nxt_event_engine_post(engine, &ra->work);
+ nxt_event_engine_post(engine, &req_app_link->work);
return;
}
- nxt_debug(task, "ra stream #%uD update peer", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD update peer",
+ req_app_link->stream);
- rc = ra->rc;
+ req_rpc_data = req_app_link->req_rpc_data;
- if (rc != NULL && ra->app_port != NULL) {
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
+ if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
+ req_app_link->app_port->pid);
}
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
static void
-nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_request_app_link_release(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
nxt_mp_t *mp;
- nxt_req_conn_link_t *rc;
+ nxt_http_request_t *r;
+ nxt_request_rpc_data_t *req_rpc_data;
- nxt_assert(task->thread->engine == ra->work.data);
- nxt_assert(ra->use_count == 0);
+ nxt_assert(task->thread->engine == req_app_link->work.data);
+ nxt_assert(req_app_link->use_count == 0);
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
- rc = ra->rc;
+ req_rpc_data = req_app_link->req_rpc_data;
- if (rc != NULL) {
- if (nxt_slow_path(ra->err_code != 0)) {
- nxt_http_request_error(task, rc->request, ra->err_code);
+ if (req_rpc_data != NULL) {
+ if (nxt_slow_path(req_app_link->err_code != 0)) {
+ nxt_http_request_error(task, req_rpc_data->request,
+ req_app_link->err_code);
} else {
- rc->app_port = ra->app_port;
- rc->msg_info = ra->msg_info;
-
- if (rc->app->timeout != 0) {
- rc->request->timer.handler = nxt_router_app_timeout;
- rc->request->timer_data = rc;
- nxt_timer_add(task->thread->engine, &rc->request->timer,
- rc->app->timeout);
+ req_rpc_data->app_port = req_app_link->app_port;
+ req_rpc_data->apr_action = req_app_link->apr_action;
+ req_rpc_data->msg_info = req_app_link->msg_info;
+
+ if (req_rpc_data->app->timeout != 0) {
+ r = req_rpc_data->request;
+
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer,
+ req_rpc_data->app->timeout);
}
- ra->app_port = NULL;
- ra->msg_info.buf = NULL;
+ req_app_link->app_port = NULL;
+ req_app_link->msg_info.buf = NULL;
}
- rc->ra = NULL;
- ra->rc = NULL;
+ req_rpc_data->req_app_link = NULL;
+ req_app_link->req_rpc_data = NULL;
}
- if (ra->app_port != NULL) {
- nxt_router_app_port_release(task, ra->app_port, 0, 1);
+ if (req_app_link->app_port != NULL) {
+ nxt_router_app_port_release(task, req_app_link->app_port,
+ req_app_link->apr_action);
- ra->app_port = NULL;
+ req_app_link->app_port = NULL;
}
- nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+ nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
- mp = ra->mem_pool;
+ mp = req_app_link->mem_pool;
if (mp != NULL) {
- nxt_mp_free(mp, ra);
+ nxt_mp_free(mp, req_app_link);
nxt_mp_release(mp);
}
}
static void
-nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
+nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = obj;
+ req_app_link = obj;
- nxt_assert(ra->work.data == data);
+ nxt_assert(req_app_link->work.data == data);
- nxt_atomic_fetch_add(&ra->use_count, -1);
+ nxt_atomic_fetch_add(&req_app_link->use_count, -1);
- nxt_router_ra_release(task, ra);
+ nxt_request_app_link_release(task, req_app_link);
}
static void
-nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
+nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
+ int i)
{
int c;
nxt_event_engine_t *engine;
- c = nxt_atomic_fetch_add(&ra->use_count, i);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
if (i < 0 && c == -i) {
- engine = ra->work.data;
+ engine = req_app_link->work.data;
if (task->thread->engine == engine) {
- nxt_router_ra_release(task, ra);
+ nxt_request_app_link_release(task, req_app_link);
return;
}
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->work.handler = nxt_router_ra_release_handler;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
+ req_app_link->work.handler = nxt_request_app_link_release_handler;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post release to %p",
- ra->stream, engine);
+ nxt_debug(task, "req_app_link stream #%uD post release to %p",
+ req_app_link->stream, engine);
- nxt_event_engine_post(engine, &ra->work);
+ nxt_event_engine_post(engine, &req_app_link->work);
}
}
nxt_inline void
-nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
+nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
+ const char *str)
{
- ra->app_port = NULL;
- ra->err_code = code;
- ra->err_str = str;
+ req_app_link->app_port = NULL;
+ req_app_link->err_code = code;
+ req_app_link->err_str = str;
}
nxt_inline void
-nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_app_link_t *req_app_link)
{
- nxt_queue_insert_tail(&ra->app_port->pending_requests,
- &ra->link_port_pending);
- nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
+ nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
+ &req_app_link->link_port_pending);
+ nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
+ req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
+ + app->res_timeout;
- nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
+ req_app_link->stream);
}
@@ -774,60 +753,66 @@ nxt_queue_chk_remove(nxt_queue_link_t *lnk)
nxt_inline void
-nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
+nxt_request_rpc_data_unlink(nxt_task_t *task,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- int ra_use_delta;
- nxt_req_app_link_t *ra;
+ int ra_use_delta;
+ nxt_request_app_link_t *req_app_link;
- if (rc->app_port != NULL) {
- nxt_router_app_port_release(task, rc->app_port, 0, 1);
+ if (req_rpc_data->app_port != NULL) {
+ nxt_router_app_port_release(task, req_rpc_data->app_port,
+ req_rpc_data->apr_action);
- rc->app_port = NULL;
+ req_rpc_data->app_port = NULL;
}
- nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
-
- ra = rc->ra;
+ nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
- if (ra != NULL) {
- rc->ra = NULL;
- ra->rc = NULL;
+ req_app_link = req_rpc_data->req_app_link;
+ if (req_app_link != NULL) {
+ req_rpc_data->req_app_link = NULL;
+ req_app_link->req_rpc_data = NULL;
ra_use_delta = 0;
- nxt_thread_mutex_lock(&rc->app->mutex);
+ nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
- if (ra->link_app_requests.next == NULL
- && ra->link_port_pending.next == NULL
- && ra->link_app_pending.next == NULL)
+ if (req_app_link->link_app_requests.next == NULL
+ && req_app_link->link_port_pending.next == NULL
+ && req_app_link->link_app_pending.next == NULL
+ && req_app_link->link_port_websockets.next == NULL)
{
- ra = NULL;
+ req_app_link = NULL;
} else {
- ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
- ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
- nxt_queue_chk_remove(&ra->link_app_pending);
+ ra_use_delta -=
+ nxt_queue_chk_remove(&req_app_link->link_app_requests)
+ + nxt_queue_chk_remove(&req_app_link->link_port_pending)
+ + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
+
+ nxt_queue_chk_remove(&req_app_link->link_app_pending);
}
- nxt_thread_mutex_unlock(&rc->app->mutex);
+ nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
- if (ra != NULL) {
- nxt_router_ra_use(task, ra, ra_use_delta);
+ if (req_app_link != NULL) {
+ nxt_request_app_link_use(task, req_app_link, ra_use_delta);
}
}
- if (rc->app != NULL) {
- nxt_router_app_use(task, rc->app, -1);
+ if (req_rpc_data->app != NULL) {
+ nxt_router_app_use(task, req_rpc_data->app, -1);
- rc->app = NULL;
+ req_rpc_data->app = NULL;
}
- if (rc->request != NULL) {
- rc->request->timer_data = NULL;
+ if (req_rpc_data->request != NULL) {
+ req_rpc_data->request->timer_data = NULL;
- nxt_router_http_request_done(task, rc->request);
+ nxt_router_http_request_done(task, req_rpc_data->request);
- rc->request = NULL;
+ req_rpc_data->request->req_rpc_data = NULL;
+ req_rpc_data->request = NULL;
}
}
@@ -928,10 +913,6 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_remove_pid_handler(task, msg);
- if (msg->port_msg.stream == 0) {
- return;
- }
-
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
@@ -939,6 +920,10 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
nxt_queue_loop;
+ if (msg->port_msg.stream == 0) {
+ return;
+ }
+
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
nxt_port_rpc_handler(task, msg);
@@ -1376,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
};
+static nxt_conf_map_t nxt_router_websocket_conf[] = {
+ {
+ nxt_string("max_frame_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_websocket_conf_t, max_frame_size),
+ },
+
+ {
+ nxt_string("read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, read_timeout),
+ },
+
+ {
+ nxt_string("keepalive_interval"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, keepalive_interval),
+ },
+
+};
+
+
static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
@@ -1389,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
- nxt_conf_value_t *conf, *http, *value;
+ nxt_conf_value_t *conf, *http, *value, *websocket;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_conf_value_t *routes_conf;
@@ -1412,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
#if (NXT_TLS)
static nxt_str_t certificate_path = nxt_string("/tls/certificate");
#endif
+ static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
@@ -1432,177 +1440,177 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->threads = nxt_ncpu;
}
- applications = nxt_conf_get_path(conf, &applications_path);
- if (applications == NULL) {
- nxt_alert(task, "no \"applications\" block");
- return NXT_ERROR;
- }
-
router = tmcf->router_conf->router;
- next = 0;
+ applications = nxt_conf_get_path(conf, &applications_path);
- for ( ;; ) {
- application = nxt_conf_next_object_member(applications, &name, &next);
- if (application == NULL) {
- break;
- }
+ if (applications != NULL) {
+ next = 0;
- nxt_debug(task, "application \"%V\"", &name);
+ for ( ;; ) {
+ application = nxt_conf_next_object_member(applications, &name, &next);
+ if (application == NULL) {
+ break;
+ }
- size = nxt_conf_json_length(application, NULL);
+ nxt_debug(task, "application \"%V\"", &name);
- app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
- if (app == NULL) {
- goto fail;
- }
+ size = nxt_conf_json_length(application, NULL);
- nxt_memzero(app, sizeof(nxt_app_t));
+ app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
+ if (app == NULL) {
+ goto fail;
+ }
- app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
- app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
+ nxt_memzero(app, sizeof(nxt_app_t));
- p = nxt_conf_json_print(app->conf.start, application, NULL);
- app->conf.length = p - app->conf.start;
+ app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
+ app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
+ + name.length);
- nxt_assert(app->conf.length <= size);
+ p = nxt_conf_json_print(app->conf.start, application, NULL);
+ app->conf.length = p - app->conf.start;
- nxt_debug(task, "application conf \"%V\"", &app->conf);
+ nxt_assert(app->conf.length <= size);
- prev = nxt_router_app_find(&router->apps, &name);
+ nxt_debug(task, "application conf \"%V\"", &app->conf);
- if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
- nxt_free(app);
+ prev = nxt_router_app_find(&router->apps, &name);
- nxt_queue_remove(&prev->link);
- nxt_queue_insert_tail(&tmcf->previous, &prev->link);
- continue;
- }
+ if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
+ nxt_free(app);
- apcf.processes = 1;
- apcf.max_processes = 1;
- apcf.spare_processes = 0;
- apcf.timeout = 0;
- apcf.res_timeout = 1000;
- apcf.idle_timeout = 15000;
- apcf.requests = 0;
- apcf.limits_value = NULL;
- apcf.processes_value = NULL;
-
- app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
- if (nxt_slow_path(app_joint == NULL)) {
- goto app_fail;
- }
-
- nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
-
- ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
- nxt_nitems(nxt_router_app_conf), &apcf);
- if (ret != NXT_OK) {
- nxt_alert(task, "application map error");
- goto app_fail;
- }
-
- if (apcf.limits_value != NULL) {
+ nxt_queue_remove(&prev->link);
+ nxt_queue_insert_tail(&tmcf->previous, &prev->link);
+ continue;
+ }
- if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
- nxt_alert(task, "application limits is not object");
+ apcf.processes = 1;
+ apcf.max_processes = 1;
+ apcf.spare_processes = 0;
+ apcf.timeout = 0;
+ apcf.res_timeout = 1000;
+ apcf.idle_timeout = 15000;
+ apcf.requests = 0;
+ apcf.limits_value = NULL;
+ apcf.processes_value = NULL;
+
+ app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
+ if (nxt_slow_path(app_joint == NULL)) {
goto app_fail;
}
- ret = nxt_conf_map_object(mp, apcf.limits_value,
- nxt_router_app_limits_conf,
- nxt_nitems(nxt_router_app_limits_conf),
- &apcf);
+ nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
+
+ ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
+ nxt_nitems(nxt_router_app_conf), &apcf);
if (ret != NXT_OK) {
- nxt_alert(task, "application limits map error");
+ nxt_alert(task, "application map error");
goto app_fail;
}
- }
- if (apcf.processes_value != NULL
- && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
- {
- ret = nxt_conf_map_object(mp, apcf.processes_value,
- nxt_router_app_processes_conf,
- nxt_nitems(nxt_router_app_processes_conf),
- &apcf);
- if (ret != NXT_OK) {
- nxt_alert(task, "application processes map error");
- goto app_fail;
+ if (apcf.limits_value != NULL) {
+
+ if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
+ nxt_alert(task, "application limits is not object");
+ goto app_fail;
+ }
+
+ ret = nxt_conf_map_object(mp, apcf.limits_value,
+ nxt_router_app_limits_conf,
+ nxt_nitems(nxt_router_app_limits_conf),
+ &apcf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "application limits map error");
+ goto app_fail;
+ }
}
- } else {
- apcf.max_processes = apcf.processes;
- apcf.spare_processes = apcf.processes;
- }
+ if (apcf.processes_value != NULL
+ && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
+ {
+ ret = nxt_conf_map_object(mp, apcf.processes_value,
+ nxt_router_app_processes_conf,
+ nxt_nitems(nxt_router_app_processes_conf),
+ &apcf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "application processes map error");
+ goto app_fail;
+ }
- nxt_debug(task, "application type: %V", &apcf.type);
- nxt_debug(task, "application processes: %D", apcf.processes);
- nxt_debug(task, "application request timeout: %M", apcf.timeout);
- nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
- nxt_debug(task, "application requests: %D", apcf.requests);
+ } else {
+ apcf.max_processes = apcf.processes;
+ apcf.spare_processes = apcf.processes;
+ }
- lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
+ nxt_debug(task, "application type: %V", &apcf.type);
+ nxt_debug(task, "application processes: %D", apcf.processes);
+ nxt_debug(task, "application request timeout: %M", apcf.timeout);
+ nxt_debug(task, "application reschedule timeout: %M",
+ apcf.res_timeout);
+ nxt_debug(task, "application requests: %D", apcf.requests);
- if (lang == NULL) {
- nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
- goto app_fail;
- }
+ lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
- nxt_debug(task, "application language module: \"%s\"", lang->file);
+ if (lang == NULL) {
+ nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
+ goto app_fail;
+ }
- ret = nxt_thread_mutex_create(&app->mutex);
- if (ret != NXT_OK) {
- goto app_fail;
- }
+ nxt_debug(task, "application language module: \"%s\"", lang->file);
- nxt_queue_init(&app->ports);
- nxt_queue_init(&app->spare_ports);
- nxt_queue_init(&app->idle_ports);
- nxt_queue_init(&app->requests);
- nxt_queue_init(&app->pending);
+ ret = nxt_thread_mutex_create(&app->mutex);
+ if (ret != NXT_OK) {
+ goto app_fail;
+ }
- app->name.length = name.length;
- nxt_memcpy(app->name.start, name.start, name.length);
+ nxt_queue_init(&app->ports);
+ nxt_queue_init(&app->spare_ports);
+ nxt_queue_init(&app->idle_ports);
+ nxt_queue_init(&app->requests);
+ nxt_queue_init(&app->pending);
- app->type = lang->type;
- app->max_processes = apcf.max_processes;
- app->spare_processes = apcf.spare_processes;
- app->max_pending_processes = apcf.spare_processes
- ? apcf.spare_processes : 1;
- app->timeout = apcf.timeout;
- app->res_timeout = apcf.res_timeout * 1000000;
- app->idle_timeout = apcf.idle_timeout;
- app->max_pending_responses = 2;
- app->max_requests = apcf.requests;
+ app->name.length = name.length;
+ nxt_memcpy(app->name.start, name.start, name.length);
- engine = task->thread->engine;
+ app->type = lang->type;
+ app->max_processes = apcf.max_processes;
+ app->spare_processes = apcf.spare_processes;
+ app->max_pending_processes = apcf.spare_processes
+ ? apcf.spare_processes : 1;
+ app->timeout = apcf.timeout;
+ app->res_timeout = apcf.res_timeout * 1000000;
+ app->idle_timeout = apcf.idle_timeout;
+ app->max_pending_responses = 2;
+ app->max_requests = apcf.requests;
- app->engine = engine;
+ engine = task->thread->engine;
- app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
- app->adjust_idle_work.task = &engine->task;
- app->adjust_idle_work.obj = app;
+ app->engine = engine;
- nxt_queue_insert_tail(&tmcf->apps, &app->link);
+ app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
+ app->adjust_idle_work.task = &engine->task;
+ app->adjust_idle_work.obj = app;
- nxt_router_app_use(task, app, 1);
+ nxt_queue_insert_tail(&tmcf->apps, &app->link);
- app->joint = app_joint;
+ nxt_router_app_use(task, app, 1);
- app_joint->use_count = 1;
- app_joint->app = app;
+ app->joint = app_joint;
- app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
- app_joint->idle_timer.work_queue = &engine->fast_work_queue;
- app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
- app_joint->idle_timer.task = &engine->task;
- app_joint->idle_timer.log = app_joint->idle_timer.task->log;
+ app_joint->use_count = 1;
+ app_joint->app = app;
- app_joint->free_app_work.handler = nxt_router_free_app;
- app_joint->free_app_work.task = &engine->task;
- app_joint->free_app_work.obj = app_joint;
+ app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
+ app_joint->idle_timer.work_queue = &engine->fast_work_queue;
+ app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
+ app_joint->idle_timer.task = &engine->task;
+ app_joint->idle_timer.log = app_joint->idle_timer.task->log;
+
+ app_joint->free_app_work.handler = nxt_router_free_app;
+ app_joint->free_app_work.task = &engine->task;
+ app_joint->free_app_work.obj = app_joint;
+ }
}
routes_conf = nxt_conf_get_path(conf, &routes_path);
@@ -1622,87 +1630,102 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
#endif
- listeners = nxt_conf_get_path(conf, &listeners_path);
- if (listeners == NULL) {
- nxt_alert(task, "no \"listeners\" block");
- return NXT_ERROR;
- }
+ websocket = nxt_conf_get_path(conf, &websocket_path);
- next = 0;
+ listeners = nxt_conf_get_path(conf, &listeners_path);
- for ( ;; ) {
- listener = nxt_conf_next_object_member(listeners, &name, &next);
- if (listener == NULL) {
- break;
- }
+ if (listeners != NULL) {
+ next = 0;
- skcf = nxt_router_socket_conf(task, tmcf, &name);
- if (skcf == NULL) {
- goto fail;
- }
+ for ( ;; ) {
+ listener = nxt_conf_next_object_member(listeners, &name, &next);
+ if (listener == NULL) {
+ break;
+ }
- nxt_memzero(&lscf, sizeof(lscf));
+ skcf = nxt_router_socket_conf(task, tmcf, &name);
+ if (skcf == NULL) {
+ goto fail;
+ }
- ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
- nxt_nitems(nxt_router_listener_conf), &lscf);
- if (ret != NXT_OK) {
- nxt_alert(task, "listener map error");
- goto fail;
- }
+ nxt_memzero(&lscf, sizeof(lscf));
- nxt_debug(task, "application: %V", &lscf.application);
-
- // STUB, default values if http block is not defined.
- skcf->header_buffer_size = 2048;
- skcf->large_header_buffer_size = 8192;
- skcf->large_header_buffers = 4;
- skcf->body_buffer_size = 16 * 1024;
- skcf->max_body_size = 8 * 1024 * 1024;
- skcf->idle_timeout = 180 * 1000;
- skcf->header_read_timeout = 30 * 1000;
- skcf->body_read_timeout = 30 * 1000;
- skcf->send_timeout = 30 * 1000;
-
- if (http != NULL) {
- ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
- nxt_nitems(nxt_router_http_conf), skcf);
+ ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
+ nxt_nitems(nxt_router_listener_conf),
+ &lscf);
if (ret != NXT_OK) {
- nxt_alert(task, "http map error");
+ nxt_alert(task, "listener map error");
goto fail;
}
- }
-#if (NXT_TLS)
+ nxt_debug(task, "application: %V", &lscf.application);
+
+ // STUB, default values if http block is not defined.
+ skcf->header_buffer_size = 2048;
+ skcf->large_header_buffer_size = 8192;
+ skcf->large_header_buffers = 4;
+ skcf->body_buffer_size = 16 * 1024;
+ skcf->max_body_size = 8 * 1024 * 1024;
+ skcf->idle_timeout = 180 * 1000;
+ skcf->header_read_timeout = 30 * 1000;
+ skcf->body_read_timeout = 30 * 1000;
+ skcf->send_timeout = 30 * 1000;
+
+ skcf->websocket_conf.max_frame_size = 1024 * 1024;
+ skcf->websocket_conf.read_timeout = 60 * 1000;
+ skcf->websocket_conf.keepalive_interval = 30 * 1000;
+
+ if (http != NULL) {
+ ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
+ nxt_nitems(nxt_router_http_conf),
+ skcf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "http map error");
+ goto fail;
+ }
+ }
- value = nxt_conf_get_path(listener, &certificate_path);
+ if (websocket != NULL) {
+ ret = nxt_conf_map_object(mp, websocket,
+ nxt_router_websocket_conf,
+ nxt_nitems(nxt_router_websocket_conf),
+ &skcf->websocket_conf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "websocket map error");
+ goto fail;
+ }
+ }
- if (value != NULL) {
- nxt_conf_get_string(value, &name);
+#if (NXT_TLS)
+ value = nxt_conf_get_path(listener, &certificate_path);
- tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
- if (nxt_slow_path(tls == NULL)) {
- goto fail;
- }
+ if (value != NULL) {
+ nxt_conf_get_string(value, &name);
- tls->name = name;
- tls->conf = skcf;
+ tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
+ if (nxt_slow_path(tls == NULL)) {
+ goto fail;
+ }
- nxt_queue_insert_tail(&tmcf->tls, &tls->link);
- }
+ tls->name = name;
+ tls->conf = skcf;
+ nxt_queue_insert_tail(&tmcf->tls, &tls->link);
+ }
#endif
- skcf->listen->handler = nxt_http_conn_init;
- skcf->router_conf = tmcf->router_conf;
- skcf->router_conf->count++;
+ skcf->listen->handler = nxt_http_conn_init;
+ skcf->router_conf = tmcf->router_conf;
+ skcf->router_conf->count++;
- if (lscf.pass.length != 0) {
- skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
+ if (lscf.pass.length != 0) {
+ skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
- /* COMPATIBILITY: listener application. */
- } else if (lscf.application.length > 0) {
- skcf->pass = nxt_http_pass_application(task, tmcf,
- &lscf.application);
+ /* COMPATIBILITY: listener application. */
+ } else if (lscf.application.length > 0) {
+ skcf->pass = nxt_http_pass_application(task, tmcf,
+ &lscf.application);
+ }
}
}
@@ -3058,7 +3081,7 @@ nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
*p++ = ' ';
- bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
+ bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
@@ -3382,28 +3405,30 @@ static void
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t ret;
- nxt_buf_t *b;
- nxt_unit_field_t *f;
- nxt_http_field_t *field;
- nxt_http_request_t *r;
- nxt_req_conn_link_t *rc;
- nxt_unit_response_t *resp;
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_port_t *app_port;
+ nxt_unit_field_t *f;
+ nxt_http_field_t *field;
+ nxt_http_request_t *r;
+ nxt_unit_response_t *resp;
+ nxt_request_app_link_t *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
b = msg->buf;
- rc = data;
+ req_rpc_data = data;
if (msg->size == 0) {
b = NULL;
}
- r = rc->request;
+ r = req_rpc_data->request;
if (nxt_slow_path(r == NULL)) {
return;
}
if (r->error) {
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
return;
}
@@ -3412,13 +3437,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
} else {
- if (rc->app != NULL && rc->app->timeout != 0) {
+ if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
r->timer.handler = nxt_router_app_timeout;
- r->timer_data = rc;
- nxt_timer_add(task->thread->engine, &r->timer, rc->app->timeout);
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer,
+ req_rpc_data->app->timeout);
}
}
@@ -3448,7 +3474,13 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
goto fail;
}
+ field = NULL;
+
for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
+ if (f->skip) {
+ continue;
+ }
+
field = nxt_list_add(r->resp.fields);
if (nxt_slow_path(field == NULL)) {
@@ -3456,26 +3488,30 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
field->hash = f->hash;
- field->skip = f->skip;
+ field->skip = 0;
field->name_length = f->name_length;
field->value_length = f->value_length;
field->name = nxt_unit_sptr_get(&f->name);
field->value = nxt_unit_sptr_get(&f->value);
- nxt_debug(task, "header: %*s: %*s",
+ ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+ nxt_debug(task, "header%s: %*s: %*s",
+ (field->skip ? " skipped" : ""),
(size_t) field->name_length, field->name,
(size_t) field->value_length, field->value);
+
+ if (field->skip) {
+ r->resp.fields->last->nelts--;
+ }
}
r->status = resp->status;
- ret = nxt_http_fields_process(r->resp.fields,
- &nxt_response_fields_hash, r);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
- }
-
if (resp->piggyback_content_length != 0) {
b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
b->mem.free = b->mem.pos + resp->piggyback_content_length;
@@ -3495,9 +3531,55 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&r->out, b);
}
- r->state = &nxt_http_request_send_state;
-
nxt_http_request_header_send(task, r);
+
+ if (r->websocket_handshake
+ && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
+ {
+ req_app_link = nxt_request_app_link_alloc(task,
+ req_rpc_data->req_app_link,
+ req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
+ goto fail;
+ }
+
+ app_port = req_app_link->app_port;
+
+ if (app_port == NULL && req_rpc_data->app_port != NULL) {
+ req_app_link->app_port = req_rpc_data->app_port;
+ app_port = req_app_link->app_port;
+ req_app_link->apr_action = req_rpc_data->apr_action;
+
+ req_rpc_data->app_port = NULL;
+ }
+
+ if (nxt_slow_path(app_port == NULL)) {
+ goto fail;
+ }
+
+ nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+
+ nxt_queue_insert_tail(&app_port->active_websockets,
+ &req_app_link->link_port_websockets);
+
+ nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+
+ nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
+ req_app_link->apr_action = NXT_APR_CLOSE;
+
+ nxt_debug(task, "req_app_link stream #%uD upgrade",
+ req_app_link->stream);
+
+ r->state = &nxt_http_websocket;
+
+ } else {
+ r->state = &nxt_http_request_send_state;
+ }
+
+ if (r->out) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_http_request_send_body, task, r, NULL);
+ }
}
return;
@@ -3506,14 +3588,13 @@ fail:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}
static const nxt_http_request_state_t nxt_http_request_send_state
nxt_aligned(64) =
{
- .ready_handler = nxt_http_request_send_body,
.error_handler = nxt_http_request_error_handler,
};
@@ -3539,36 +3620,37 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_bool_t cancelled;
- nxt_req_app_link_t *ra;
- nxt_req_conn_link_t *rc;
-
- rc = data;
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_bool_t cancelled;
+ nxt_request_app_link_t *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
- ra = rc->ra;
+ req_rpc_data = data;
- if (ra != NULL) {
- cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+ req_app_link = req_rpc_data->req_app_link;
+ if (req_app_link != NULL) {
+ cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
+ req_app_link->stream);
if (cancelled) {
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- res = nxt_router_app_port(task, rc->app, ra);
+ res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
if (res == NXT_OK) {
- port = ra->app_port;
+ port = req_app_link->app_port;
if (nxt_slow_path(port == NULL)) {
- nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra");
+ nxt_log(task, NXT_LOG_ERR,
+ "port is NULL in cancelled req_app_link");
return;
}
- nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
- port->pid);
+ nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
+ req_rpc_data, port->pid);
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
msg->port_msg.last = 0;
@@ -3577,12 +3659,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
}
- if (rc->request != NULL) {
- nxt_http_request_error(task, rc->request,
+ if (req_rpc_data->request != NULL) {
+ nxt_http_request_error(task, req_rpc_data->request,
NXT_HTTP_SERVICE_UNAVAILABLE);
}
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}
@@ -3626,7 +3708,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
- nxt_router_app_port_release(task, port, 0, 0);
+ nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
}
@@ -3634,10 +3716,10 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *lnk;
+ nxt_request_app_link_t *req_app_link;
app_joint = data;
@@ -3666,20 +3748,22 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
+ req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_requests);
} else {
- ra = NULL;
+ req_app_link = NULL;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (ra != NULL) {
+ if (req_app_link != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
- &app->name, app, ra->stream);
+ &app->name, app, req_app_link->stream);
- nxt_router_ra_error(ra, 500, "Failed to start application process");
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to start application process");
+ nxt_request_app_link_use(task, req_app_link, -1);
}
}
@@ -3813,9 +3897,9 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = data;
+ req_app_link = data;
#if (NXT_DEBUG)
{
@@ -3824,39 +3908,66 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
app = obj;
nxt_assert(app != NULL);
- nxt_assert(ra != NULL);
- nxt_assert(ra->app_port != NULL);
+ nxt_assert(req_app_link != NULL);
+ nxt_assert(req_app_link->app_port != NULL);
nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, ra->stream);
+ &app->name, app, req_app_link->stream);
}
#endif
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
- uint32_t request_failed, uint32_t got_response)
+ nxt_apr_action_t action)
{
+ int inc_use;
+ uint32_t dec_pending, got_response;
nxt_app_t *app;
nxt_bool_t port_unchained;
nxt_bool_t send_quit, cancelled, adjust_idle_timer;
nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra, *pending_ra, *re_ra;
+ nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
- ra = NULL;
+ req_app_link = NULL;
app = port->app;
+ inc_use = 0;
+ dec_pending = 0;
+ got_response = 0;
+
+ switch (action) {
+ case NXT_APR_NEW_PORT:
+ break;
+ case NXT_APR_REQUEST_FAILED:
+ dec_pending = 1;
+ inc_use = -1;
+ break;
+ case NXT_APR_GOT_RESPONSE:
+ dec_pending = 1;
+ got_response = 1;
+ inc_use = -1;
+ break;
+ case NXT_APR_UPGRADE:
+ dec_pending = 1;
+ got_response = 1;
+ break;
+ case NXT_APR_CLOSE:
+ inc_use = -1;
+ break;
+ }
+
nxt_thread_mutex_lock(&app->mutex);
- port->app_pending_responses -= request_failed + got_response;
+ port->app_pending_responses -= dec_pending;
port->app_responses += got_response;
if (port->pair[1] != -1
@@ -3893,24 +4004,25 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
+ req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_requests);
- ra->app_port = nxt_router_pop_first_port(app);
+ req_app_link->app_port = nxt_router_pop_first_port(app);
- if (ra->app_port->app_pending_responses > 1) {
- nxt_router_ra_pending(task, app, ra);
+ if (req_app_link->app_port->app_pending_responses > 1) {
+ nxt_request_app_link_pending(task, app, req_app_link);
}
}
/* Pop first pending request for this port. */
- if ((request_failed > 0 || got_response > 0)
+ if (dec_pending > 0
&& !nxt_queue_is_empty(&port->pending_requests))
{
lnk = nxt_queue_first(&port->pending_requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
- pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
@@ -3926,7 +4038,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
lnk = nxt_queue_first(&app->pending);
- re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
+ re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_pending);
if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
@@ -3937,9 +4050,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
re_ra->stream);
if (cancelled) {
- nxt_router_ra_inc_use(re_ra);
+ nxt_request_app_link_inc_use(re_ra);
- state.ra = re_ra;
+ state.req_app_link = re_ra;
state.app = app;
nxt_router_port_select(task, &state);
@@ -3969,9 +4082,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
- nxt_assert(port->idle_link.next == NULL);
-
+ if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
+ && nxt_queue_is_empty(&port->active_websockets)
+ && port->idle_link.next == NULL)
+ {
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
{
@@ -4000,7 +4114,7 @@ re_ra_cancelled:
}
if (pending_ra != NULL) {
- nxt_router_ra_use(task, pending_ra, -1);
+ nxt_request_app_link_use(task, pending_ra, -1);
}
if (re_ra != NULL) {
@@ -4011,12 +4125,12 @@ re_ra_cancelled:
}
}
- if (ra != NULL) {
- nxt_router_ra_use(task, ra, -1);
+ if (req_app_link != NULL) {
+ nxt_request_app_link_use(task, req_app_link, -1);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
- &task->thread->engine->task, app, ra);
+ &task->thread->engine->task, app, req_app_link);
goto adjust_use;
}
@@ -4048,9 +4162,7 @@ re_ra_cancelled:
adjust_use:
- if (request_failed > 0 || got_response > 0) {
- nxt_port_use(task, port, -1);
- }
+ nxt_port_use(task, port, inc_use);
}
@@ -4267,33 +4379,33 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- nxt_app_t *app;
- nxt_bool_t can_start_process;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_bool_t can_start_process;
+ nxt_request_app_link_t *req_app_link;
- ra = state->ra;
+ req_app_link = state->req_app_link;
app = state->app;
state->failed_port_use_delta = 0;
- if (nxt_queue_chk_remove(&ra->link_app_requests))
+ if (nxt_queue_chk_remove(&req_app_link->link_app_requests))
{
- nxt_router_ra_dec_use(ra);
+ nxt_request_app_link_dec_use(req_app_link);
}
- if (nxt_queue_chk_remove(&ra->link_port_pending))
+ if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
{
- nxt_assert(ra->link_app_pending.next != NULL);
+ nxt_assert(req_app_link->link_app_pending.next != NULL);
- nxt_queue_remove(&ra->link_app_pending);
- ra->link_app_pending.next = NULL;
+ nxt_queue_remove(&req_app_link->link_app_pending);
+ req_app_link->link_app_pending.next = NULL;
- nxt_router_ra_dec_use(ra);
+ nxt_request_app_link_dec_use(req_app_link);
}
- state->failed_port = ra->app_port;
+ state->failed_port = req_app_link->app_port;
- if (ra->app_port != NULL) {
+ if (req_app_link->app_port != NULL) {
state->failed_port_use_delta--;
state->failed_port->app_pending_responses--;
@@ -4302,7 +4414,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
state->failed_port_use_delta--;
}
- ra->app_port = NULL;
+ req_app_link->app_port = NULL;
}
can_start_process = nxt_router_app_can_start(app);
@@ -4313,22 +4425,25 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (nxt_queue_is_empty(&app->ports)
|| (can_start_process && nxt_router_app_first_port_busy(app)) )
{
- ra = nxt_router_ra_create(task, ra);
-
- if (nxt_slow_path(ra == NULL)) {
+ req_app_link = nxt_request_app_link_alloc(task, req_app_link,
+ req_app_link->req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
goto fail;
}
if (nxt_slow_path(state->failed_port != NULL)) {
- nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
+ nxt_queue_insert_head(&app->requests,
+ &req_app_link->link_app_requests);
} else {
- nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
+ nxt_queue_insert_tail(&app->requests,
+ &req_app_link->link_app_requests);
}
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
+ req_app_link->stream);
if (can_start_process) {
app->pending_processes++;
@@ -4339,15 +4454,15 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
state->port = nxt_router_pop_first_port(app);
if (state->port->app_pending_responses > 1) {
- ra = nxt_router_ra_create(task, ra);
-
- if (nxt_slow_path(ra == NULL)) {
+ req_app_link = nxt_request_app_link_alloc(task, req_app_link,
+ req_app_link->req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
goto fail;
}
- ra->app_port = state->port;
+ req_app_link->app_port = state->port;
- nxt_router_ra_pending(task, app, ra);
+ nxt_request_app_link_pending(task, app, req_app_link);
}
if (can_start_process && nxt_router_app_need_start(app)) {
@@ -4358,32 +4473,32 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
fail:
- state->shared_ra = ra;
+ state->shared_ra = req_app_link;
}
static nxt_int_t
nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- nxt_int_t res;
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_request_app_link_t *req_app_link;
- ra = state->shared_ra;
+ req_app_link = state->shared_ra;
app = state->app;
if (state->failed_port_use_delta != 0) {
nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
- if (nxt_slow_path(ra == NULL)) {
+ if (nxt_slow_path(req_app_link == NULL)) {
if (state->port != NULL) {
nxt_port_use(task, state->port, -1);
}
- nxt_router_ra_error(state->ra, 500,
+ nxt_request_app_link_error(state->req_app_link, 500,
"Failed to allocate shared req<->app link");
- nxt_router_ra_use(task, state->ra, -1);
+ nxt_request_app_link_use(task, state->req_app_link, -1);
return NXT_ERROR;
}
@@ -4391,7 +4506,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
- ra->app_port = state->port;
+ req_app_link->app_port = state->port;
if (state->start_process) {
nxt_router_start_app_process(task, app);
@@ -4410,8 +4525,9 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
res = nxt_router_start_app_process(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500, "Failed to start app process");
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to start app process");
+ nxt_request_app_link_use(task, req_app_link, -1);
return NXT_ERROR;
}
@@ -4421,11 +4537,12 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_app_link_t *req_app_link)
{
nxt_port_select_state_t state;
- state.ra = ra;
+ state.req_app_link = req_app_link;
state.app = app;
nxt_thread_mutex_lock(&app->mutex);
@@ -4442,48 +4559,48 @@ void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_event_engine_t *engine;
- nxt_req_app_link_t ra_local, *ra;
- nxt_req_conn_link_t *rc;
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_event_engine_t *engine;
+ nxt_request_app_link_t ra_local, *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
engine = task->thread->engine;
- rc = nxt_port_rpc_register_handler_ex(task, engine->port,
+ req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
nxt_router_response_ready_handler,
nxt_router_response_error_handler,
- sizeof(nxt_req_conn_link_t));
-
- if (nxt_slow_path(rc == NULL)) {
+ sizeof(nxt_request_rpc_data_t));
+ if (nxt_slow_path(req_rpc_data == NULL)) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
return;
}
- rc->stream = nxt_port_rpc_ex_stream(rc);
- rc->app = app;
+ req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
+ req_rpc_data->app = app;
nxt_router_app_use(task, app, 1);
- rc->request = r;
+ req_rpc_data->request = r;
+ r->req_rpc_data = req_rpc_data;
- ra = &ra_local;
- nxt_router_ra_init(task, ra, rc);
+ req_app_link = &ra_local;
+ nxt_request_app_link_init(task, req_app_link, req_rpc_data);
- res = nxt_router_app_port(task, app, ra);
+ res = nxt_router_app_port(task, app, req_app_link);
if (res != NXT_OK) {
return;
}
- ra = rc->ra;
- port = ra->app_port;
+ req_app_link = req_rpc_data->req_app_link;
+ port = req_app_link->app_port;
nxt_assert(port != NULL);
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
@@ -4494,19 +4611,20 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_router_app_prepare_request(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
- uint32_t request_failed;
- nxt_buf_t *buf;
- nxt_int_t res;
- nxt_port_t *port, *c_port, *reply_port;
+ nxt_buf_t *buf;
+ nxt_int_t res;
+ nxt_port_t *port, *c_port, *reply_port;
+ nxt_apr_action_t apr_action;
- nxt_assert(ra->app_port != NULL);
+ nxt_assert(req_app_link->app_port != NULL);
- port = ra->app_port;
- reply_port = ra->reply_port;
+ port = req_app_link->app_port;
+ reply_port = req_app_link->reply_port;
- request_failed = 1;
+ apr_action = NXT_APR_REQUEST_FAILED;
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
@@ -4514,7 +4632,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
+ nxt_request_app_link_error(req_app_link, 500,
"Failed to send reply port to application");
goto release_port;
}
@@ -4522,11 +4640,11 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_process_connected_port_add(port->process, reply_port);
}
- buf = nxt_router_prepare_msg(task, ra->request, port,
+ buf = nxt_router_prepare_msg(task, req_app_link->request, port,
nxt_app_msg_prefix[port->app->type]);
if (nxt_slow_path(buf == NULL)) {
- nxt_router_ra_error(ra, 500,
+ nxt_request_app_link_error(req_app_link, 500,
"Failed to prepare message for application");
goto release_port;
}
@@ -4535,40 +4653,41 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_buf_used_size(buf),
port->socket.fd);
- request_failed = 0;
+ apr_action = NXT_APR_NEW_PORT;
- ra->msg_info.buf = buf;
- ra->msg_info.completion_handler = buf->completion_handler;
+ req_app_link->msg_info.buf = buf;
+ req_app_link->msg_info.completion_handler = buf->completion_handler;
for (; buf; buf = buf->next) {
buf->completion_handler = nxt_router_dummy_buf_completion;
}
- buf = ra->msg_info.buf;
+ buf = req_app_link->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
- ra->stream);
+ res = nxt_port_mmap_get_tracking(task, port,
+ &req_app_link->msg_info.tracking,
+ req_app_link->stream);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
- "Failed to get tracking area");
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to get tracking area");
goto release_port;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
- -1, ra->stream, reply_port->id, buf,
- &ra->msg_info.tracking);
+ res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
+ -1, req_app_link->stream, reply_port->id, buf,
+ &req_app_link->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
- "Failed to send message to application");
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to send message to application");
goto release_port;
}
release_port:
- nxt_router_app_port_release(task, port, request_failed, 0);
+ nxt_router_app_port_release(task, port, apr_action);
- nxt_router_ra_update_peer(task, ra);
+ nxt_request_app_link_update_peer(task, req_app_link);
}
@@ -4704,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
*p++ = '\0';
req->tls = (r->tls != NULL);
+ req->websocket_handshake = r->websocket_handshake;
req->server_name_length = r->server_name.length;
nxt_unit_sptr_set(&req->server_name, p);
@@ -4899,8 +5019,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_timer_t *timer;
nxt_queue_link_t *lnk;
nxt_http_request_t *r;
- nxt_req_app_link_t *pending_ra;
- nxt_req_conn_link_t *rc;
+ nxt_request_app_link_t *pending_ra;
+ nxt_request_rpc_data_t *req_rpc_data;
nxt_port_select_state_t state;
timer = obj;
@@ -4908,8 +5028,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router app timeout");
r = nxt_timer_data(timer, nxt_http_request_t, timer);
- rc = r->timer_data;
- app = rc->app;
+ req_rpc_data = r->timer_data;
+ app = req_rpc_data->app;
if (app == NULL) {
goto generate_error;
@@ -4918,14 +5038,16 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
port = NULL;
pending_ra = NULL;
- if (rc->app_port != NULL) {
- port = rc->app_port;
- rc->app_port = NULL;
+ if (req_rpc_data->app_port != NULL) {
+ port = req_rpc_data->app_port;
+ req_rpc_data->app_port = NULL;
}
- if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
- port = rc->ra->app_port;
- rc->ra->app_port = NULL;
+ if (port == NULL && req_rpc_data->req_app_link != NULL
+ && req_rpc_data->req_app_link->app_port != NULL)
+ {
+ port = req_rpc_data->req_app_link->app_port;
+ req_rpc_data->req_app_link->app_port = NULL;
}
if (port == NULL) {
@@ -4939,7 +5061,7 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
if (!nxt_queue_is_empty(&port->pending_requests)) {
lnk = nxt_queue_first(&port->pending_requests);
- pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
@@ -4951,9 +5073,9 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
pending_ra->stream);
if (cancelled) {
- nxt_router_ra_inc_use(pending_ra);
+ nxt_request_app_link_inc_use(pending_ra);
- state.ra = pending_ra;
+ state.req_app_link = pending_ra;
state.app = app;
nxt_router_port_select(task, &state);
@@ -4981,7 +5103,7 @@ generate_error:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}