summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port.c1
-rw-r--r--src/nxt_port.h6
-rw-r--r--src/nxt_port_rpc.c55
-rw-r--r--src/nxt_router.c407
4 files changed, 362 insertions, 107 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 4a07657d..5a1d4e6c 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -66,6 +66,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
+ nxt_queue_init(&port->pending_requests);
} else {
nxt_mp_destroy(mp);
diff --git a/src/nxt_port.h b/src/nxt_port.h
index cf30bc92..74d37dcb 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -168,6 +168,7 @@ struct nxt_port_s {
uint32_t app_pending_responses;
uint32_t app_responses;
+ nxt_queue_t pending_requests;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
@@ -265,4 +266,9 @@ nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
nxt_port_post_handler_t handler, void *data);
void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
+nxt_inline void nxt_port_inc_use(nxt_port_t *port)
+{
+ nxt_atomic_fetch_add(&port->use_count, 1);
+}
+
#endif /* _NXT_PORT_H_INCLUDED_ */
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 04de7775..15c550f4 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s {
};
+static void
+nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_reg_t *reg);
+
+
static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
nxt_assert(reg->data == ex);
- if (peer == -1 || reg->peer != -1) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
- "change peer %PI->%PI", reg->stream, reg->peer, peer);
+ if (nxt_slow_path(peer == reg->peer)) {
+ return;
+ }
+
+ if (reg->peer != -1) {
+ nxt_port_rpc_remove_from_peers(task, port, reg);
+
+ reg->peer = -1;
+ }
+ if (peer == -1) {
return;
}
@@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
msg.port_msg.pid = peer;
msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
- msg.port_msg.last = 1;
peer_link = lhq.value;
last = 0;
@@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
nxt_debug(task, "rpc: stream #%uD trigger error", stream);
msg.port_msg.stream = stream;
-
- reg->error_handler(task, &msg, reg->data);
-
- nxt_port_rpc_lhq_stream(&lhq, &stream);
- lhq.pool = port->mem_pool;
-
- ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "rpc: stream #%uD failed to delete handler", stream);
-
- return;
- }
+ msg.port_msg.last = 1;
if (peer_link == peer_link->next) {
nxt_assert(peer_link->prev == peer_link);
@@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
peer_link = next_link;
}
+ reg->peer = -1;
+
+ reg->error_handler(task, &msg, reg->data);
+
+ /* Reset 'last' flag to preserve rpc handler. */
+ if (msg.port_msg.last == 0) {
+ continue;
+ }
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "rpc: stream #%uD failed to delete handler", stream);
+
+ return;
+ }
+
nxt_mp_free(port->mem_pool, reg);
}
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index c71338fc..61daf746 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -48,14 +48,15 @@ typedef struct {
struct nxt_req_app_link_s {
uint32_t stream;
+ nxt_atomic_t use_count;
nxt_port_t *app_port;
- nxt_pid_t app_pid;
nxt_port_t *reply_port;
nxt_app_parse_ctx_t *ap;
nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
- nxt_queue_link_t link; /* for nxt_app_t.requests */
+ nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
+ nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
nxt_mp_t *mem_pool;
nxt_work_t work;
@@ -73,6 +74,24 @@ typedef struct {
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+nxt_inline void
+nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
+{
+ nxt_atomic_fetch_add(&ra->use_count, 1);
+}
+
+nxt_inline void
+nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
+{
+ int c;
+
+ c = nxt_atomic_fetch_add(&ra->use_count, -1);
+
+ nxt_assert(c > 1);
+}
+
+static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
+
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conf_ready(nxt_task_t *task,
@@ -153,6 +172,7 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
+static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -162,7 +182,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
-static void nxt_router_process_http_request_mp(nxt_task_t *task,
+static void nxt_router_app_prepare_request(nxt_task_t *task,
nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
@@ -319,7 +339,7 @@ nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
ra->stream = rc->stream;
- ra->app_pid = -1;
+ ra->use_count = 1;
ra->rc = rc;
rc->ra = ra;
ra->reply_port = engine->port;
@@ -338,6 +358,10 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
nxt_mp_t *mp;
nxt_req_app_link_t *ra;
+ if (ra_src->mem_pool != NULL) {
+ return ra_src;
+ }
+
mp = ra_src->ap->mem_pool;
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
@@ -394,25 +418,38 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
static void
-nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
+nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
+
+
+static void
+nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_req_app_link_t *ra;
+
+ ra = obj;
+
+ nxt_router_ra_update_peer(task, ra);
+
+ nxt_router_ra_use(task, ra, -1);
+}
+
+
+static void
+nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
{
- nxt_req_app_link_t *ra;
nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc;
- ra = obj;
- engine = data;
+ engine = ra->work.data;
if (task->thread->engine != engine) {
- if (ra->app_port != NULL) {
- ra->app_pid = ra->app_port->pid;
- }
+ nxt_router_ra_inc_use(ra);
- ra->work.handler = nxt_router_ra_release;
+ ra->work.handler = nxt_router_ra_update_peer_handler;
ra->work.task = &engine->task;
ra->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post release to %p",
+ nxt_debug(task, "ra stream #%uD post update peer to %p",
ra->stream, engine);
nxt_event_engine_post(engine, &ra->work);
@@ -420,22 +457,47 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
return;
}
+ nxt_debug(task, "ra stream #%uD update peer", ra->stream);
+
+ rc = ra->rc;
+
+ if (rc != NULL && ra->app_port != NULL) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
+ }
+
+ nxt_router_ra_use(task, ra, -1);
+}
+
+
+static void
+nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
+{
+ nxt_conn_t *c;
+ nxt_req_conn_link_t *rc;
+
+ nxt_assert(task->thread->engine == ra->work.data);
+ nxt_assert(ra->use_count == 0);
+
nxt_debug(task, "ra stream #%uD release", ra->stream);
rc = ra->rc;
if (rc != NULL) {
- if (ra->app_pid != -1) {
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
- }
+ c = rc->conn;
if (nxt_slow_path(ra->err_code != 0)) {
- nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str);
+ nxt_router_gen_error(task, c, ra->err_code, ra->err_str);
} else {
rc->app_port = ra->app_port;
rc->msg_info = ra->msg_info;
+ if (rc->app->timeout != 0) {
+ c->read_timer.handler = nxt_router_app_timeout;
+ nxt_timer_add(task->thread->engine, &c->read_timer,
+ rc->app->timeout);
+ }
+
ra->app_port = NULL;
ra->msg_info.buf = NULL;
}
@@ -458,6 +520,52 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
}
+static void
+nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_req_app_link_t *ra;
+
+ ra = obj;
+
+ nxt_assert(ra->work.data == data);
+
+ nxt_atomic_fetch_add(&ra->use_count, -1);
+
+ nxt_router_ra_release(task, ra);
+}
+
+
+static void
+nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
+{
+ int c;
+ nxt_event_engine_t *engine;
+
+ c = nxt_atomic_fetch_add(&ra->use_count, i);
+
+ if (i < 0 && c == -i) {
+ engine = ra->work.data;
+
+ if (task->thread->engine == engine) {
+ nxt_router_ra_release(task, ra);
+
+ return;
+ }
+
+ nxt_router_ra_inc_use(ra);
+
+ ra->work.handler = nxt_router_ra_release_handler;
+ ra->work.task = &engine->task;
+ ra->work.next = NULL;
+
+ nxt_debug(task, "ra stream #%uD post release to %p",
+ ra->stream, engine);
+
+ nxt_event_engine_post(engine, &ra->work);
+ }
+}
+
+
nxt_inline void
nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
{
@@ -467,9 +575,25 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
}
+nxt_inline nxt_bool_t
+nxt_queue_chk_remove(nxt_queue_link_t *lnk)
+{
+ if (lnk->next != NULL) {
+ nxt_queue_remove(lnk);
+
+ lnk->next = NULL;
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
nxt_inline void
nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
{
+ int ra_use_delta;
nxt_req_app_link_t *ra;
if (rc->app_port != NULL) {
@@ -486,22 +610,25 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
rc->ra = NULL;
ra->rc = NULL;
- nxt_thread_mutex_lock(&rc->app->mutex);
+ ra_use_delta = 0;
- if (ra->link.next != NULL) {
- nxt_queue_remove(&ra->link);
+ nxt_thread_mutex_lock(&rc->app->mutex);
- ra->link.next = NULL;
+ if (ra->link_app_requests.next == NULL
+ && ra->link_port_pending.next == NULL)
+ {
+ ra = NULL;
} else {
- ra = NULL;
+ ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
+ ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
}
nxt_thread_mutex_unlock(&rc->app->mutex);
- }
- if (ra != NULL) {
- nxt_router_ra_release(task, ra, ra->work.data);
+ if (ra != NULL) {
+ nxt_router_ra_use(task, ra, ra_use_delta);
+ }
}
if (rc->app != NULL) {
@@ -2266,6 +2393,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
size_t dump_size;
nxt_buf_t *b, *last;
nxt_conn_t *c;
+ nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc;
b = msg->buf;
@@ -2287,6 +2415,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
b = NULL;
}
+ engine = task->thread->engine;
+
+ nxt_timer_disable(engine, &c->read_timer);
+
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
@@ -2298,6 +2430,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, last);
nxt_router_rc_unlink(task, rc);
+
+ } else {
+ if (rc->app->timeout != 0) {
+ c->read_timer.handler = nxt_router_app_timeout;
+ nxt_timer_add(engine, &c->read_timer, rc->app->timeout);
+ }
}
if (b == NULL) {
@@ -2327,10 +2465,41 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_bool_t cancelled;
+ nxt_req_app_link_t *ra;
nxt_req_conn_link_t *rc;
rc = data;
+ ra = rc->ra;
+
+ if (ra != NULL) {
+ cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+
+ if (cancelled) {
+ nxt_router_ra_inc_use(ra);
+
+ res = nxt_router_app_port(task, ra);
+
+ if (res == NXT_OK) {
+ port = ra->app_port;
+
+ nxt_assert(port != NULL);
+
+ nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
+ port->pid);
+
+ nxt_router_app_prepare_request(task, ra);
+ }
+
+ msg->port_msg.last = 0;
+
+ return;
+ }
+ }
+
nxt_router_gen_error(task, rc->conn, 500,
"Application terminated unexpectedly");
@@ -2478,7 +2647,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
} else {
ra = NULL;
@@ -2491,7 +2660,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
&app->name, app, ra->stream);
nxt_router_ra_error(ra, 500, "Failed to start application worker");
- nxt_router_ra_release(task, ra, ra->work.data);
+ nxt_router_ra_use(task, ra, -1);
}
nxt_router_app_use(task, app, -1);
@@ -2533,7 +2702,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app)
nxt_inline nxt_port_t *
-nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
+nxt_router_app_get_port_unsafe(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
@@ -2550,10 +2719,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
{
nxt_queue_insert_tail(&app->ports, lnk);
+ nxt_port_inc_use(port);
+
} else {
lnk->next = NULL;
-
- (*use_delta)--;
}
return port;
@@ -2606,7 +2775,7 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "app '%V' %p process next stream #%uD",
&app->name, app, ra->stream);
- nxt_router_process_http_request_mp(task, ra);
+ nxt_router_app_prepare_request(task, ra);
}
@@ -2614,19 +2783,16 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
- int use_delta, ra_use_delta;
nxt_app_t *app;
nxt_bool_t send_quit;
nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_req_app_link_t *ra, *next_ra;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
app = port->app;
- use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
-
nxt_thread_mutex_lock(&app->mutex);
port->app_pending_responses -= request_failed + got_response;
@@ -2645,7 +2811,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_insert_head(&app->ports, &port->app_link);
}
- use_delta++;
+ nxt_port_inc_use(port);
} else {
if (port->app_pending_responses == 0
@@ -2665,21 +2831,46 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
- ra_use_delta = 1;
- ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
+ ra->app_port = nxt_router_app_get_port_unsafe(app);
+
+ if (ra->app_port->app_pending_responses > 1) {
+ nxt_queue_insert_tail(&ra->app_port->pending_requests,
+ &ra->link_port_pending);
+
+ nxt_router_ra_inc_use(ra);
+ }
} else {
ra = NULL;
- ra_use_delta = 0;
+ }
+
+ if ((request_failed > 0 || got_response > 0)
+ && !nxt_queue_is_empty(&port->pending_requests))
+ {
+ lnk = nxt_queue_first(&port->pending_requests);
+ nxt_queue_remove(lnk);
+ lnk->next = NULL;
+
+ next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ link_port_pending);
+
+ } else {
+ next_ra = NULL;
}
send_quit = app->live == 0 && port->app_pending_responses > 0;
nxt_thread_mutex_unlock(&app->mutex);
+ if (next_ra != NULL) {
+ nxt_router_ra_use(task, next_ra, -1);
+ }
+
if (ra != NULL) {
+ nxt_router_ra_use(task, ra, -1);
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, ra);
@@ -2710,12 +2901,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
adjust_use:
- if (use_delta != 0) {
- nxt_port_use(task, port, use_delta);
- }
-
- if (ra_use_delta != 0) {
- nxt_port_use(task, ra->app_port, ra_use_delta);
+ if (request_failed > 0 || got_response > 0) {
+ nxt_port_use(task, port, -1);
}
}
@@ -2766,41 +2953,46 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
{
- int use_delta;
- nxt_int_t res;
- nxt_app_t *app;
- nxt_bool_t can_start_worker;
- nxt_conn_t *c;
- nxt_port_t *port;
- nxt_event_engine_t *engine;
- nxt_socket_conf_joint_t *joint;
+ int failed_port_use_delta;
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_bool_t can_start_worker;
+ nxt_conn_t *c;
+ nxt_port_t *port, *failed_port;
- use_delta = 1;
c = ra->rc->conn;
+ app = ra->rc->app;
- joint = c->joint;
- app = joint->socket_conf->application;
+ failed_port_use_delta = 0;
- if (app == NULL) {
- nxt_router_gen_error(task, c, 500,
- "Application is NULL in socket_conf");
- return NXT_ERROR;
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (nxt_queue_chk_remove(&ra->link_app_requests))
+ {
+ nxt_router_ra_dec_use(ra);
}
- ra->rc->app = app;
+ if (nxt_queue_chk_remove(&ra->link_port_pending))
+ {
+ nxt_router_ra_dec_use(ra);
+ }
- nxt_router_app_use(task, app, 1);
+ if (ra->app_port != NULL) {
+ failed_port = ra->app_port;
+ failed_port_use_delta--;
- engine = task->thread->engine;
+ failed_port->app_pending_responses--;
- nxt_timer_disable(engine, &c->read_timer);
+ if (failed_port->app_link.next != NULL) {
+ nxt_queue_remove(&failed_port->app_link);
+ failed_port->app_link.next = NULL;
- if (app->timeout != 0) {
- c->read_timer.handler = nxt_router_app_timeout;
- nxt_timer_add(engine, &c->read_timer, app->timeout);
- }
+ failed_port_use_delta--;
+ }
- nxt_thread_mutex_lock(&app->mutex);
+ } else {
+ failed_port = NULL;
+ }
can_start_worker = (app->workers + app->pending_workers)
< app->max_workers;
@@ -2811,7 +3003,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) {
- nxt_queue_insert_tail(&app->requests, &ra->link);
+ nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
+
+ nxt_router_ra_inc_use(ra);
+
+ nxt_debug(task, "ra stream #%uD enqueue to app->requests",
+ ra->stream);
if (can_start_worker) {
app->pending_workers++;
@@ -2821,14 +3018,37 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
port = NULL;
} else {
- port = nxt_router_app_get_port_unsafe(app, &use_delta);
+ port = nxt_router_app_get_port_unsafe(app);
+
+ if (port->app_pending_responses > 1) {
+ ra = nxt_router_ra_create(task, ra);
+
+ if (nxt_fast_path(ra != NULL)) {
+ nxt_queue_insert_tail(&port->pending_requests,
+ &ra->link_port_pending);
+
+ nxt_router_ra_inc_use(ra);
+
+ nxt_debug(task, "ra stream #%uD enqueue to "
+ "port->pending_requests", ra->stream);
+ }
+ }
}
nxt_thread_mutex_unlock(&app->mutex);
+ if (failed_port_use_delta != 0) {
+ nxt_port_use(task, failed_port, failed_port_use_delta);
+ }
+
if (nxt_slow_path(ra == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"req<->app link");
+
+ if (port != NULL) {
+ nxt_port_use(task, port, -1);
+ }
+
return NXT_ERROR;
}
@@ -2837,14 +3057,9 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
ra->app_port = port;
- if (use_delta != 0) {
- nxt_port_use(task, port, use_delta);
- }
return NXT_OK;
}
- nxt_debug(task, "ra stream #%uD allocated", ra->stream);
-
if (!can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
@@ -3127,11 +3342,22 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_event_engine_t *engine;
- nxt_req_app_link_t ra_local, *ra;
- nxt_req_conn_link_t *rc;
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_event_engine_t *engine;
+ nxt_req_app_link_t ra_local, *ra;
+ nxt_req_conn_link_t *rc;
+ nxt_socket_conf_joint_t *joint;
+
+ joint = c->joint;
+ app = joint->socket_conf->application;
+
+ if (app == NULL) {
+ nxt_router_gen_error(task, c, 500,
+ "Application is NULL in socket_conf");
+ return;
+ }
engine = task->thread->engine;
@@ -3149,6 +3375,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc->stream = nxt_port_rpc_ex_stream(rc);
rc->conn = c;
+ rc->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
+ nxt_timer_disable(engine, &c->read_timer);
nxt_queue_insert_tail(&c->requests, &rc->link);
@@ -3167,16 +3398,14 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
return;
}
+ ra = rc->ra;
port = ra->app_port;
- if (nxt_slow_path(port == NULL)) {
- nxt_router_gen_error(task, c, 500, "Application port not found");
- return;
- }
+ nxt_assert(port != NULL);
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
- nxt_router_process_http_request_mp(task, ra);
+ nxt_router_app_prepare_request(task, ra);
}
@@ -3187,7 +3416,7 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
+nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
{
uint32_t request_failed;
nxt_buf_t *b;
@@ -3266,7 +3495,7 @@ release_port:
nxt_router_app_port_release(task, port, request_failed, 0);
- nxt_router_ra_release(task, ra, ra->work.data);
+ nxt_router_ra_update_peer(task, ra);
}