summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c765
-rw-r--r--src/nxt_router.h4
2 files changed, 420 insertions, 349 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df2557fc..f09779bc 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -55,41 +55,54 @@ typedef struct nxt_msg_info_s {
} nxt_msg_info_t;
-typedef struct nxt_req_app_link_s nxt_req_app_link_t;
+typedef struct nxt_request_app_link_s nxt_request_app_link_t;
+
+
+typedef enum {
+ NXT_APR_NEW_PORT,
+ NXT_APR_REQUEST_FAILED,
+ NXT_APR_GOT_RESPONSE,
+ NXT_APR_CLOSE,
+} nxt_apr_action_t;
typedef struct {
- uint32_t stream;
- nxt_app_t *app;
- nxt_port_t *app_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_req_app_link_t *ra;
+ uint32_t stream;
+ nxt_app_t *app;
+
+ nxt_port_t *app_port;
+ nxt_apr_action_t apr_action;
+
+ nxt_http_request_t *request;
+ nxt_msg_info_t msg_info;
+ nxt_request_app_link_t *req_app_link;
+} nxt_request_rpc_data_t;
- nxt_queue_link_t link; /* for nxt_conn_t.requests */
-} nxt_req_conn_link_t;
+struct nxt_request_app_link_s {
+ uint32_t stream;
+ nxt_atomic_t use_count;
-struct nxt_req_app_link_s {
- uint32_t stream;
- nxt_atomic_t use_count;
- nxt_port_t *app_port;
- nxt_port_t *reply_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_req_conn_link_t *rc;
+ nxt_port_t *app_port;
+ nxt_apr_action_t apr_action;
- nxt_nsec_t res_time;
+ nxt_port_t *reply_port;
+ nxt_http_request_t *request;
+ nxt_msg_info_t msg_info;
+ nxt_request_rpc_data_t *req_rpc_data;
- nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
- nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
- nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
+ nxt_nsec_t res_time;
- nxt_mp_t *mem_pool;
- nxt_work_t work;
+ nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
+ /* for nxt_port_t.pending_requests */
+ nxt_queue_link_t link_port_pending;
+ nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
- int err_code;
- const char *err_str;
+ nxt_mp_t *mem_pool;
+ nxt_work_t work;
+
+ int err_code;
+ const char *err_str;
};
@@ -106,15 +119,15 @@ typedef struct {
struct nxt_port_select_state_s {
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_request_app_link_t *req_app_link;
- nxt_port_t *failed_port;
- int failed_port_use_delta;
+ nxt_port_t *failed_port;
+ int failed_port_use_delta;
- uint8_t start_process; /* 1 bit */
- nxt_req_app_link_t *shared_ra;
- nxt_port_t *port;
+ uint8_t start_process; /* 1 bit */
+ nxt_request_app_link_t *shared_ra;
+ nxt_port_t *port;
};
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
@@ -129,28 +142,32 @@ static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
+static void nxt_request_app_link_update_peer(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link);
+
nxt_inline void
-nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
+nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
{
- nxt_atomic_fetch_add(&ra->use_count, 1);
+ nxt_atomic_fetch_add(&req_app_link->use_count, 1);
}
nxt_inline void
-nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
+nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link)
{
#if (NXT_DEBUG)
int c;
- c = nxt_atomic_fetch_add(&ra->use_count, -1);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, -1);
nxt_assert(c > 1);
#else
- (void) nxt_atomic_fetch_add(&ra->use_count, -1);
+ (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1);
#endif
}
-static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
+static void nxt_request_app_link_use(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
@@ -257,13 +274,14 @@ static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
+
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
- uint32_t request_failed, uint32_t got_response);
+ nxt_apr_action_t action);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_req_app_link_t *ra);
+ nxt_request_app_link_t *req_app_link);
static void nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_req_app_link_t *ra);
+ nxt_request_app_link_t *req_app_link);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
@@ -493,58 +511,63 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
nxt_inline void
-nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
- nxt_req_conn_link_t *rc)
+nxt_request_app_link_init(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
{
nxt_event_engine_t *engine;
engine = task->thread->engine;
- nxt_memzero(ra, sizeof(nxt_req_app_link_t));
+ nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
- ra->stream = rc->stream;
- ra->use_count = 1;
- ra->rc = rc;
- rc->ra = ra;
- ra->reply_port = engine->port;
- ra->request = rc->request;
+ req_app_link->stream = req_rpc_data->stream;
+ req_app_link->use_count = 1;
+ req_app_link->req_rpc_data = req_rpc_data;
+ req_rpc_data->req_app_link = req_app_link;
+ req_app_link->reply_port = engine->port;
+ req_app_link->request = req_rpc_data->request;
+ req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
- ra->work.handler = NULL;
- ra->work.task = &engine->task;
- ra->work.obj = ra;
- ra->work.data = engine;
+ req_app_link->work.handler = NULL;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.obj = req_app_link;
+ req_app_link->work.data = engine;
}
-nxt_inline nxt_req_app_link_t *
-nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
+nxt_inline nxt_request_app_link_t *
+nxt_request_app_link_alloc(nxt_task_t *task,
+ nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_mp_t *mp;
- nxt_req_app_link_t *ra;
+ nxt_mp_t *mp;
+ nxt_request_app_link_t *req_app_link;
- if (ra_src->mem_pool != NULL) {
+ if (ra_src != NULL && ra_src->mem_pool != NULL) {
return ra_src;
}
- mp = ra_src->request->mem_pool;
+ mp = req_rpc_data->request->mem_pool;
+
+ req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
- ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
+ if (nxt_slow_path(req_app_link == NULL)) {
- if (nxt_slow_path(ra == NULL)) {
+ req_rpc_data->req_app_link = NULL;
- ra_src->rc->ra = NULL;
- ra_src->rc = NULL;
+ if (ra_src != NULL) {
+ ra_src->req_rpc_data = NULL;
+ }
return NULL;
}
nxt_mp_retain(mp);
- nxt_router_ra_init(task, ra, ra_src->rc);
+ nxt_request_app_link_init(task, req_app_link, req_rpc_data);
- ra->mem_pool = mp;
+ req_app_link->mem_pool = mp;
- return ra;
+ return req_app_link;
}
@@ -584,177 +607,187 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
static void
-nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
-
-
-static void
-nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
+nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
+ void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = obj;
+ req_app_link = obj;
- nxt_router_ra_update_peer(task, ra);
+ nxt_request_app_link_update_peer(task, req_app_link);
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
static void
-nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_request_app_link_update_peer(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
- nxt_event_engine_t *engine;
- nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
+ nxt_request_rpc_data_t *req_rpc_data;
- engine = ra->work.data;
+ engine = req_app_link->work.data;
if (task->thread->engine != engine) {
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->work.handler = nxt_router_ra_update_peer_handler;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
+ req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post update peer to %p",
- ra->stream, engine);
+ nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
+ req_app_link->stream, engine);
- nxt_event_engine_post(engine, &ra->work);
+ nxt_event_engine_post(engine, &req_app_link->work);
return;
}
- nxt_debug(task, "ra stream #%uD update peer", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD update peer",
+ req_app_link->stream);
- rc = ra->rc;
+ req_rpc_data = req_app_link->req_rpc_data;
- if (rc != NULL && ra->app_port != NULL) {
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
+ if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
+ req_app_link->app_port->pid);
}
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
static void
-nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_request_app_link_release(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
nxt_mp_t *mp;
- nxt_req_conn_link_t *rc;
+ nxt_request_rpc_data_t *req_rpc_data;
- nxt_assert(task->thread->engine == ra->work.data);
- nxt_assert(ra->use_count == 0);
+ nxt_assert(task->thread->engine == req_app_link->work.data);
+ nxt_assert(req_app_link->use_count == 0);
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
- rc = ra->rc;
+ req_rpc_data = req_app_link->req_rpc_data;
- if (rc != NULL) {
- if (nxt_slow_path(ra->err_code != 0)) {
- nxt_http_request_error(task, rc->request, ra->err_code);
+ if (req_rpc_data != NULL) {
+ if (nxt_slow_path(req_app_link->err_code != 0)) {
+ nxt_http_request_error(task, req_rpc_data->request,
+ req_app_link->err_code);
} else {
- rc->app_port = ra->app_port;
- rc->msg_info = ra->msg_info;
-
- if (rc->app->timeout != 0) {
- rc->request->timer.handler = nxt_router_app_timeout;
- rc->request->timer_data = rc;
- nxt_timer_add(task->thread->engine, &rc->request->timer,
- rc->app->timeout);
+ req_rpc_data->app_port = req_app_link->app_port;
+ req_rpc_data->apr_action = req_app_link->apr_action;
+ req_rpc_data->msg_info = req_app_link->msg_info;
+
+ if (req_rpc_data->app->timeout != 0) {
+ req_rpc_data->request->timer.handler = nxt_router_app_timeout;
+ req_rpc_data->request->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine,
+ &req_rpc_data->request->timer,
+ req_rpc_data->app->timeout);
}
- ra->app_port = NULL;
- ra->msg_info.buf = NULL;
+ req_app_link->app_port = NULL;
+ req_app_link->msg_info.buf = NULL;
}
- rc->ra = NULL;
- ra->rc = NULL;
+ req_rpc_data->req_app_link = NULL;
+ req_app_link->req_rpc_data = NULL;
}
- if (ra->app_port != NULL) {
- nxt_router_app_port_release(task, ra->app_port, 0, 1);
+ if (req_app_link->app_port != NULL) {
+ nxt_router_app_port_release(task, req_app_link->app_port,
+ req_app_link->apr_action);
- ra->app_port = NULL;
+ req_app_link->app_port = NULL;
}
- nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+ nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
- mp = ra->mem_pool;
+ mp = req_app_link->mem_pool;
if (mp != NULL) {
- nxt_mp_free(mp, ra);
+ nxt_mp_free(mp, req_app_link);
nxt_mp_release(mp);
}
}
static void
-nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
+nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = obj;
+ req_app_link = obj;
- nxt_assert(ra->work.data == data);
+ nxt_assert(req_app_link->work.data == data);
- nxt_atomic_fetch_add(&ra->use_count, -1);
+ nxt_atomic_fetch_add(&req_app_link->use_count, -1);
- nxt_router_ra_release(task, ra);
+ nxt_request_app_link_release(task, req_app_link);
}
static void
-nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
+nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
+ int i)
{
int c;
nxt_event_engine_t *engine;
- c = nxt_atomic_fetch_add(&ra->use_count, i);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
if (i < 0 && c == -i) {
- engine = ra->work.data;
+ engine = req_app_link->work.data;
if (task->thread->engine == engine) {
- nxt_router_ra_release(task, ra);
+ nxt_request_app_link_release(task, req_app_link);
return;
}
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->work.handler = nxt_router_ra_release_handler;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
+ req_app_link->work.handler = nxt_request_app_link_release_handler;
+ req_app_link->work.task = &engine->task;
+ req_app_link->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post release to %p",
- ra->stream, engine);
+ nxt_debug(task, "req_app_link stream #%uD post release to %p",
+ req_app_link->stream, engine);
- nxt_event_engine_post(engine, &ra->work);
+ nxt_event_engine_post(engine, &req_app_link->work);
}
}
nxt_inline void
-nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
+nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
+ const char *str)
{
- ra->app_port = NULL;
- ra->err_code = code;
- ra->err_str = str;
+ req_app_link->app_port = NULL;
+ req_app_link->err_code = code;
+ req_app_link->err_str = str;
}
nxt_inline void
-nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_app_link_t *req_app_link)
{
- nxt_queue_insert_tail(&ra->app_port->pending_requests,
- &ra->link_port_pending);
- nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
+ nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
+ &req_app_link->link_port_pending);
+ nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
+ req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
+ + app->res_timeout;
- nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
+ req_app_link->stream);
}
@@ -774,60 +807,63 @@ nxt_queue_chk_remove(nxt_queue_link_t *lnk)
nxt_inline void
-nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
+nxt_request_rpc_data_unlink(nxt_task_t *task,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- int ra_use_delta;
- nxt_req_app_link_t *ra;
+ int ra_use_delta;
+ nxt_request_app_link_t *req_app_link;
- if (rc->app_port != NULL) {
- nxt_router_app_port_release(task, rc->app_port, 0, 1);
+ if (req_rpc_data->app_port != NULL) {
+ nxt_router_app_port_release(task, req_rpc_data->app_port,
+ req_rpc_data->apr_action);
- rc->app_port = NULL;
+ req_rpc_data->app_port = NULL;
}
- nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
-
- ra = rc->ra;
+ nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
- if (ra != NULL) {
- rc->ra = NULL;
- ra->rc = NULL;
+ req_app_link = req_rpc_data->req_app_link;
+ if (req_app_link != NULL) {
+ req_rpc_data->req_app_link = NULL;
+ req_app_link->req_rpc_data = NULL;
ra_use_delta = 0;
- nxt_thread_mutex_lock(&rc->app->mutex);
+ nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
- if (ra->link_app_requests.next == NULL
- && ra->link_port_pending.next == NULL
- && ra->link_app_pending.next == NULL)
+ if (req_app_link->link_app_requests.next == NULL
+ && req_app_link->link_port_pending.next == NULL
+ && req_app_link->link_app_pending.next == NULL)
{
- ra = NULL;
+ req_app_link = NULL;
} else {
- ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
- ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
- nxt_queue_chk_remove(&ra->link_app_pending);
+ ra_use_delta -=
+ nxt_queue_chk_remove(&req_app_link->link_app_requests)
+ + nxt_queue_chk_remove(&req_app_link->link_port_pending);
+
+ nxt_queue_chk_remove(&req_app_link->link_app_pending);
}
- nxt_thread_mutex_unlock(&rc->app->mutex);
+ nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
- if (ra != NULL) {
- nxt_router_ra_use(task, ra, ra_use_delta);
+ if (req_app_link != NULL) {
+ nxt_request_app_link_use(task, req_app_link, ra_use_delta);
}
}
- if (rc->app != NULL) {
- nxt_router_app_use(task, rc->app, -1);
+ if (req_rpc_data->app != NULL) {
+ nxt_router_app_use(task, req_rpc_data->app, -1);
- rc->app = NULL;
+ req_rpc_data->app = NULL;
}
- if (rc->request != NULL) {
- rc->request->timer_data = NULL;
+ if (req_rpc_data->request != NULL) {
+ req_rpc_data->request->timer_data = NULL;
- nxt_router_http_request_done(task, rc->request);
+ nxt_router_http_request_done(task, req_rpc_data->request);
- rc->request = NULL;
+ req_rpc_data->request = NULL;
}
}
@@ -3380,28 +3416,28 @@ static void
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t ret;
- nxt_buf_t *b;
- nxt_unit_field_t *f;
- nxt_http_field_t *field;
- nxt_http_request_t *r;
- nxt_req_conn_link_t *rc;
- nxt_unit_response_t *resp;
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_unit_field_t *f;
+ nxt_http_field_t *field;
+ nxt_http_request_t *r;
+ nxt_unit_response_t *resp;
+ nxt_request_rpc_data_t *req_rpc_data;
b = msg->buf;
- rc = data;
+ req_rpc_data = data;
if (msg->size == 0) {
b = NULL;
}
- r = rc->request;
+ r = req_rpc_data->request;
if (nxt_slow_path(r == NULL)) {
return;
}
if (r->error) {
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
return;
}
@@ -3410,13 +3446,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
} else {
- if (rc->app != NULL && rc->app->timeout != 0) {
+ if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
r->timer.handler = nxt_router_app_timeout;
- r->timer_data = rc;
- nxt_timer_add(task->thread->engine, &r->timer, rc->app->timeout);
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer,
+ req_rpc_data->app->timeout);
}
}
@@ -3504,7 +3541,7 @@ fail:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}
@@ -3537,36 +3574,37 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_bool_t cancelled;
- nxt_req_app_link_t *ra;
- nxt_req_conn_link_t *rc;
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_bool_t cancelled;
+ nxt_request_app_link_t *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
- rc = data;
+ req_rpc_data = data;
- ra = rc->ra;
-
- if (ra != NULL) {
- cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+ req_app_link = req_rpc_data->req_app_link;
+ if (req_app_link != NULL) {
+ cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
+ req_app_link->stream);
if (cancelled) {
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- res = nxt_router_app_port(task, rc->app, ra);
+ res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
if (res == NXT_OK) {
- port = ra->app_port;
+ port = req_app_link->app_port;
if (nxt_slow_path(port == NULL)) {
- nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra");
+ nxt_log(task, NXT_LOG_ERR,
+ "port is NULL in cancelled req_app_link");
return;
}
- nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
- port->pid);
+ nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
+ req_rpc_data, port->pid);
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
msg->port_msg.last = 0;
@@ -3575,12 +3613,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
}
- if (rc->request != NULL) {
- nxt_http_request_error(task, rc->request,
+ if (req_rpc_data->request != NULL) {
+ nxt_http_request_error(task, req_rpc_data->request,
NXT_HTTP_SERVICE_UNAVAILABLE);
}
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}
@@ -3624,7 +3662,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
- nxt_router_app_port_release(task, port, 0, 0);
+ nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
}
@@ -3632,10 +3670,10 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *lnk;
+ nxt_request_app_link_t *req_app_link;
app_joint = data;
@@ -3664,20 +3702,22 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
+ req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_requests);
} else {
- ra = NULL;
+ req_app_link = NULL;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (ra != NULL) {
+ if (req_app_link != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
- &app->name, app, ra->stream);
+ &app->name, app, req_app_link->stream);
- nxt_router_ra_error(ra, 500, "Failed to start application process");
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to start application process");
+ nxt_request_app_link_use(task, req_app_link, -1);
}
}
@@ -3811,9 +3851,9 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_req_app_link_t *ra;
+ nxt_request_app_link_t *req_app_link;
- ra = data;
+ req_app_link = data;
#if (NXT_DEBUG)
{
@@ -3822,39 +3862,62 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
app = obj;
nxt_assert(app != NULL);
- nxt_assert(ra != NULL);
- nxt_assert(ra->app_port != NULL);
+ nxt_assert(req_app_link != NULL);
+ nxt_assert(req_app_link->app_port != NULL);
nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, ra->stream);
+ &app->name, app, req_app_link->stream);
}
#endif
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
- uint32_t request_failed, uint32_t got_response)
+ nxt_apr_action_t action)
{
+ int inc_use;
+ uint32_t dec_pending, got_response;
nxt_app_t *app;
nxt_bool_t port_unchained;
nxt_bool_t send_quit, cancelled, adjust_idle_timer;
nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra, *pending_ra, *re_ra;
+ nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
- ra = NULL;
+ req_app_link = NULL;
app = port->app;
+ inc_use = 0;
+ dec_pending = 0;
+ got_response = 0;
+
+ switch (action) {
+ case NXT_APR_NEW_PORT:
+ break;
+ case NXT_APR_REQUEST_FAILED:
+ dec_pending = 1;
+ inc_use = -1;
+ break;
+ case NXT_APR_GOT_RESPONSE:
+ dec_pending = 1;
+ got_response = 1;
+ inc_use = -1;
+ break;
+ case NXT_APR_CLOSE:
+ inc_use = -1;
+ break;
+ }
+
nxt_thread_mutex_lock(&app->mutex);
- port->app_pending_responses -= request_failed + got_response;
+ port->app_pending_responses -= dec_pending;
port->app_responses += got_response;
if (port->pair[1] != -1
@@ -3891,24 +3954,25 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
+ req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_requests);
- ra->app_port = nxt_router_pop_first_port(app);
+ req_app_link->app_port = nxt_router_pop_first_port(app);
- if (ra->app_port->app_pending_responses > 1) {
- nxt_router_ra_pending(task, app, ra);
+ if (req_app_link->app_port->app_pending_responses > 1) {
+ nxt_request_app_link_pending(task, app, req_app_link);
}
}
/* Pop first pending request for this port. */
- if ((request_failed > 0 || got_response > 0)
+ if (dec_pending > 0
&& !nxt_queue_is_empty(&port->pending_requests))
{
lnk = nxt_queue_first(&port->pending_requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
- pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
@@ -3924,7 +3988,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
lnk = nxt_queue_first(&app->pending);
- re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
+ re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
+ link_app_pending);
if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
@@ -3935,9 +4000,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
re_ra->stream);
if (cancelled) {
- nxt_router_ra_inc_use(re_ra);
+ nxt_request_app_link_inc_use(re_ra);
- state.ra = re_ra;
+ state.req_app_link = re_ra;
state.app = app;
nxt_router_port_select(task, &state);
@@ -3998,7 +4063,7 @@ re_ra_cancelled:
}
if (pending_ra != NULL) {
- nxt_router_ra_use(task, pending_ra, -1);
+ nxt_request_app_link_use(task, pending_ra, -1);
}
if (re_ra != NULL) {
@@ -4009,12 +4074,12 @@ re_ra_cancelled:
}
}
- if (ra != NULL) {
- nxt_router_ra_use(task, ra, -1);
+ if (req_app_link != NULL) {
+ nxt_request_app_link_use(task, req_app_link, -1);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
- &task->thread->engine->task, app, ra);
+ &task->thread->engine->task, app, req_app_link);
goto adjust_use;
}
@@ -4046,9 +4111,7 @@ re_ra_cancelled:
adjust_use:
- if (request_failed > 0 || got_response > 0) {
- nxt_port_use(task, port, -1);
- }
+ nxt_port_use(task, port, inc_use);
}
@@ -4265,33 +4328,33 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- nxt_app_t *app;
- nxt_bool_t can_start_process;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_bool_t can_start_process;
+ nxt_request_app_link_t *req_app_link;
- ra = state->ra;
+ req_app_link = state->req_app_link;
app = state->app;
state->failed_port_use_delta = 0;
- if (nxt_queue_chk_remove(&ra->link_app_requests))
+ if (nxt_queue_chk_remove(&req_app_link->link_app_requests))
{
- nxt_router_ra_dec_use(ra);
+ nxt_request_app_link_dec_use(req_app_link);
}
- if (nxt_queue_chk_remove(&ra->link_port_pending))
+ if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
{
- nxt_assert(ra->link_app_pending.next != NULL);
+ nxt_assert(req_app_link->link_app_pending.next != NULL);
- nxt_queue_remove(&ra->link_app_pending);
- ra->link_app_pending.next = NULL;
+ nxt_queue_remove(&req_app_link->link_app_pending);
+ req_app_link->link_app_pending.next = NULL;
- nxt_router_ra_dec_use(ra);
+ nxt_request_app_link_dec_use(req_app_link);
}
- state->failed_port = ra->app_port;
+ state->failed_port = req_app_link->app_port;
- if (ra->app_port != NULL) {
+ if (req_app_link->app_port != NULL) {
state->failed_port_use_delta--;
state->failed_port->app_pending_responses--;
@@ -4300,7 +4363,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
state->failed_port_use_delta--;
}
- ra->app_port = NULL;
+ req_app_link->app_port = NULL;
}
can_start_process = nxt_router_app_can_start(app);
@@ -4311,22 +4374,25 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (nxt_queue_is_empty(&app->ports)
|| (can_start_process && nxt_router_app_first_port_busy(app)) )
{
- ra = nxt_router_ra_create(task, ra);
-
- if (nxt_slow_path(ra == NULL)) {
+ req_app_link = nxt_request_app_link_alloc(task, req_app_link,
+ req_app_link->req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
goto fail;
}
if (nxt_slow_path(state->failed_port != NULL)) {
- nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
+ nxt_queue_insert_head(&app->requests,
+ &req_app_link->link_app_requests);
} else {
- nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
+ nxt_queue_insert_tail(&app->requests,
+ &req_app_link->link_app_requests);
}
- nxt_router_ra_inc_use(ra);
+ nxt_request_app_link_inc_use(req_app_link);
- nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
+ nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
+ req_app_link->stream);
if (can_start_process) {
app->pending_processes++;
@@ -4337,15 +4403,15 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
state->port = nxt_router_pop_first_port(app);
if (state->port->app_pending_responses > 1) {
- ra = nxt_router_ra_create(task, ra);
-
- if (nxt_slow_path(ra == NULL)) {
+ req_app_link = nxt_request_app_link_alloc(task, req_app_link,
+ req_app_link->req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
goto fail;
}
- ra->app_port = state->port;
+ req_app_link->app_port = state->port;
- nxt_router_ra_pending(task, app, ra);
+ nxt_request_app_link_pending(task, app, req_app_link);
}
if (can_start_process && nxt_router_app_need_start(app)) {
@@ -4356,32 +4422,32 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
fail:
- state->shared_ra = ra;
+ state->shared_ra = req_app_link;
}
static nxt_int_t
nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- nxt_int_t res;
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_request_app_link_t *req_app_link;
- ra = state->shared_ra;
+ req_app_link = state->shared_ra;
app = state->app;
if (state->failed_port_use_delta != 0) {
nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
- if (nxt_slow_path(ra == NULL)) {
+ if (nxt_slow_path(req_app_link == NULL)) {
if (state->port != NULL) {
nxt_port_use(task, state->port, -1);
}
- nxt_router_ra_error(state->ra, 500,
+ nxt_request_app_link_error(state->req_app_link, 500,
"Failed to allocate shared req<->app link");
- nxt_router_ra_use(task, state->ra, -1);
+ nxt_request_app_link_use(task, state->req_app_link, -1);
return NXT_ERROR;
}
@@ -4389,7 +4455,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
- ra->app_port = state->port;
+ req_app_link->app_port = state->port;
if (state->start_process) {
nxt_router_start_app_process(task, app);
@@ -4408,8 +4474,9 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
res = nxt_router_start_app_process(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500, "Failed to start app process");
- nxt_router_ra_use(task, ra, -1);
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to start app process");
+ nxt_request_app_link_use(task, req_app_link, -1);
return NXT_ERROR;
}
@@ -4419,11 +4486,12 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_app_link_t *req_app_link)
{
nxt_port_select_state_t state;
- state.ra = ra;
+ state.req_app_link = req_app_link;
state.app = app;
nxt_thread_mutex_lock(&app->mutex);
@@ -4440,48 +4508,47 @@ void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_event_engine_t *engine;
- nxt_req_app_link_t ra_local, *ra;
- nxt_req_conn_link_t *rc;
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_event_engine_t *engine;
+ nxt_request_app_link_t ra_local, *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
engine = task->thread->engine;
- rc = nxt_port_rpc_register_handler_ex(task, engine->port,
+ req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
nxt_router_response_ready_handler,
nxt_router_response_error_handler,
- sizeof(nxt_req_conn_link_t));
-
- if (nxt_slow_path(rc == NULL)) {
+ sizeof(nxt_request_rpc_data_t));
+ if (nxt_slow_path(req_rpc_data == NULL)) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
return;
}
- rc->stream = nxt_port_rpc_ex_stream(rc);
- rc->app = app;
+ req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
+ req_rpc_data->app = app;
nxt_router_app_use(task, app, 1);
- rc->request = r;
+ req_rpc_data->request = r;
- ra = &ra_local;
- nxt_router_ra_init(task, ra, rc);
+ req_app_link = &ra_local;
+ nxt_request_app_link_init(task, req_app_link, req_rpc_data);
- res = nxt_router_app_port(task, app, ra);
+ res = nxt_router_app_port(task, app, req_app_link);
if (res != NXT_OK) {
return;
}
- ra = rc->ra;
- port = ra->app_port;
+ req_app_link = req_rpc_data->req_app_link;
+ port = req_app_link->app_port;
nxt_assert(port != NULL);
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
- nxt_router_app_prepare_request(task, ra);
+ nxt_router_app_prepare_request(task, req_app_link);
}
@@ -4492,19 +4559,20 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_router_app_prepare_request(nxt_task_t *task,
+ nxt_request_app_link_t *req_app_link)
{
- uint32_t request_failed;
- nxt_buf_t *buf;
- nxt_int_t res;
- nxt_port_t *port, *c_port, *reply_port;
+ nxt_buf_t *buf;
+ nxt_int_t res;
+ nxt_port_t *port, *c_port, *reply_port;
+ nxt_apr_action_t apr_action;
- nxt_assert(ra->app_port != NULL);
+ nxt_assert(req_app_link->app_port != NULL);
- port = ra->app_port;
- reply_port = ra->reply_port;
+ port = req_app_link->app_port;
+ reply_port = req_app_link->reply_port;
- request_failed = 1;
+ apr_action = NXT_APR_REQUEST_FAILED;
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
@@ -4512,7 +4580,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
+ nxt_request_app_link_error(req_app_link, 500,
"Failed to send reply port to application");
goto release_port;
}
@@ -4520,11 +4588,11 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_process_connected_port_add(port->process, reply_port);
}
- buf = nxt_router_prepare_msg(task, ra->request, port,
+ buf = nxt_router_prepare_msg(task, req_app_link->request, port,
nxt_app_msg_prefix[port->app->type]);
if (nxt_slow_path(buf == NULL)) {
- nxt_router_ra_error(ra, 500,
+ nxt_request_app_link_error(req_app_link, 500,
"Failed to prepare message for application");
goto release_port;
}
@@ -4533,40 +4601,41 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_buf_used_size(buf),
port->socket.fd);
- request_failed = 0;
+ apr_action = NXT_APR_NEW_PORT;
- ra->msg_info.buf = buf;
- ra->msg_info.completion_handler = buf->completion_handler;
+ req_app_link->msg_info.buf = buf;
+ req_app_link->msg_info.completion_handler = buf->completion_handler;
for (; buf; buf = buf->next) {
buf->completion_handler = nxt_router_dummy_buf_completion;
}
- buf = ra->msg_info.buf;
+ buf = req_app_link->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
- ra->stream);
+ res = nxt_port_mmap_get_tracking(task, port,
+ &req_app_link->msg_info.tracking,
+ req_app_link->stream);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
- "Failed to get tracking area");
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to get tracking area");
goto release_port;
}
res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
- -1, ra->stream, reply_port->id, buf,
- &ra->msg_info.tracking);
+ -1, req_app_link->stream, reply_port->id, buf,
+ &req_app_link->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500,
- "Failed to send message to application");
+ nxt_request_app_link_error(req_app_link, 500,
+ "Failed to send message to application");
goto release_port;
}
release_port:
- nxt_router_app_port_release(task, port, request_failed, 0);
+ nxt_router_app_port_release(task, port, apr_action);
- nxt_router_ra_update_peer(task, ra);
+ nxt_request_app_link_update_peer(task, req_app_link);
}
@@ -4897,8 +4966,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_timer_t *timer;
nxt_queue_link_t *lnk;
nxt_http_request_t *r;
- nxt_req_app_link_t *pending_ra;
- nxt_req_conn_link_t *rc;
+ nxt_request_app_link_t *pending_ra;
+ nxt_request_rpc_data_t *req_rpc_data;
nxt_port_select_state_t state;
timer = obj;
@@ -4906,8 +4975,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router app timeout");
r = nxt_timer_data(timer, nxt_http_request_t, timer);
- rc = r->timer_data;
- app = rc->app;
+ req_rpc_data = r->timer_data;
+ app = req_rpc_data->app;
if (app == NULL) {
goto generate_error;
@@ -4916,14 +4985,16 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
port = NULL;
pending_ra = NULL;
- if (rc->app_port != NULL) {
- port = rc->app_port;
- rc->app_port = NULL;
+ if (req_rpc_data->app_port != NULL) {
+ port = req_rpc_data->app_port;
+ req_rpc_data->app_port = NULL;
}
- if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
- port = rc->ra->app_port;
- rc->ra->app_port = NULL;
+ if (port == NULL && req_rpc_data->req_app_link != NULL
+ && req_rpc_data->req_app_link->app_port != NULL)
+ {
+ port = req_rpc_data->req_app_link->app_port;
+ req_rpc_data->req_app_link->app_port = NULL;
}
if (port == NULL) {
@@ -4937,7 +5008,7 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
if (!nxt_queue_is_empty(&port->pending_requests)) {
lnk = nxt_queue_first(&port->pending_requests);
- pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
@@ -4949,9 +5020,9 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
pending_ra->stream);
if (cancelled) {
- nxt_router_ra_inc_use(pending_ra);
+ nxt_request_app_link_inc_use(pending_ra);
- state.ra = pending_ra;
+ state.req_app_link = pending_ra;
state.app = app;
nxt_router_port_select(task, &state);
@@ -4979,7 +5050,7 @@ generate_error:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
- nxt_router_rc_unlink(task, rc);
+ nxt_request_rpc_data_unlink(task, req_rpc_data);
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index d9fbfe05..ff791e3d 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -108,8 +108,8 @@ struct nxt_app_s {
nxt_work_t adjust_idle_work;
nxt_event_engine_t *engine;
- nxt_queue_t requests; /* of nxt_req_app_link_t */
- nxt_queue_t pending; /* of nxt_req_app_link_t */
+ nxt_queue_t requests; /* of nxt_request_app_link_t */
+ nxt_queue_t pending; /* of nxt_request_app_link_t */
nxt_str_t name;
uint32_t pending_processes;