diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-08-14 23:59:46 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-08-14 23:59:46 +0300 |
commit | 1b095ff417272aa570dd39f4bd94133ab244e789 (patch) | |
tree | e683970edcd6f84a4cba2e5144b2376933d5a2f9 /src | |
parent | ac316ff7a5a29c4d3a3c965147fd3fd9401a0efc (diff) | |
download | unit-1b095ff417272aa570dd39f4bd94133ab244e789.tar.gz unit-1b095ff417272aa570dd39f4bd94133ab244e789.tar.bz2 |
Renaming supplemental request structures in router.
- nxt_req_app_link_t -> nxt_request_app_link_t
- nxt_req_conn_link_t -> nxt_request_rpc_data_t
Corresponding abbreviated field names also changed:
- ra -> req_app_link
- rc -> req_rpc_data
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 765 | ||||
-rw-r--r-- | src/nxt_router.h | 4 |
2 files changed, 420 insertions, 349 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index df2557fc..f09779bc 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -55,41 +55,54 @@ typedef struct nxt_msg_info_s { } nxt_msg_info_t; -typedef struct nxt_req_app_link_s nxt_req_app_link_t; +typedef struct nxt_request_app_link_s nxt_request_app_link_t; + + +typedef enum { + NXT_APR_NEW_PORT, + NXT_APR_REQUEST_FAILED, + NXT_APR_GOT_RESPONSE, + NXT_APR_CLOSE, +} nxt_apr_action_t; typedef struct { - uint32_t stream; - nxt_app_t *app; - nxt_port_t *app_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_req_app_link_t *ra; + uint32_t stream; + nxt_app_t *app; + + nxt_port_t *app_port; + nxt_apr_action_t apr_action; + + nxt_http_request_t *request; + nxt_msg_info_t msg_info; + nxt_request_app_link_t *req_app_link; +} nxt_request_rpc_data_t; - nxt_queue_link_t link; /* for nxt_conn_t.requests */ -} nxt_req_conn_link_t; +struct nxt_request_app_link_s { + uint32_t stream; + nxt_atomic_t use_count; -struct nxt_req_app_link_s { - uint32_t stream; - nxt_atomic_t use_count; - nxt_port_t *app_port; - nxt_port_t *reply_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_req_conn_link_t *rc; + nxt_port_t *app_port; + nxt_apr_action_t apr_action; - nxt_nsec_t res_time; + nxt_port_t *reply_port; + nxt_http_request_t *request; + nxt_msg_info_t msg_info; + nxt_request_rpc_data_t *req_rpc_data; - nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ - nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ - nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ + nxt_nsec_t res_time; - nxt_mp_t *mem_pool; - nxt_work_t work; + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ + /* for nxt_port_t.pending_requests */ + nxt_queue_link_t link_port_pending; + nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ - int err_code; - const char *err_str; + nxt_mp_t *mem_pool; + nxt_work_t work; + + int err_code; + const char *err_str; }; @@ -106,15 +119,15 @@ typedef struct { struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_request_app_link_t *req_app_link; - nxt_port_t *failed_port; - int failed_port_use_delta; + nxt_port_t *failed_port; + int failed_port_use_delta; - uint8_t start_process; /* 1 bit */ - nxt_req_app_link_t *shared_ra; - nxt_port_t *port; + uint8_t start_process; /* 1 bit */ + nxt_request_app_link_t *shared_ra; + nxt_port_t *port; }; typedef struct nxt_port_select_state_s nxt_port_select_state_t; @@ -129,28 +142,32 @@ static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state); static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); +static void nxt_request_app_link_update_peer(nxt_task_t *task, + nxt_request_app_link_t *req_app_link); + nxt_inline void -nxt_router_ra_inc_use(nxt_req_app_link_t *ra) +nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) { - nxt_atomic_fetch_add(&ra->use_count, 1); + nxt_atomic_fetch_add(&req_app_link->use_count, 1); } nxt_inline void -nxt_router_ra_dec_use(nxt_req_app_link_t *ra) +nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) { #if (NXT_DEBUG) int c; - c = nxt_atomic_fetch_add(&ra->use_count, -1); + c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); nxt_assert(c > 1); #else - (void) nxt_atomic_fetch_add(&ra->use_count, -1); + (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); #endif } -static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); +static void nxt_request_app_link_use(nxt_task_t *task, + nxt_request_app_link_t *req_app_link, int i); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -257,13 +274,14 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); + static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - uint32_t request_failed, uint32_t got_response); + nxt_apr_action_t action); static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_req_app_link_t *ra); + nxt_request_app_link_t *req_app_link); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_req_app_link_t *ra); + nxt_request_app_link_t *req_app_link); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); @@ -493,58 +511,63 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_inline void -nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, - nxt_req_conn_link_t *rc) +nxt_request_app_link_init(nxt_task_t *task, + nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) { nxt_event_engine_t *engine; engine = task->thread->engine; - nxt_memzero(ra, sizeof(nxt_req_app_link_t)); + nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - ra->stream = rc->stream; - ra->use_count = 1; - ra->rc = rc; - rc->ra = ra; - ra->reply_port = engine->port; - ra->request = rc->request; + req_app_link->stream = req_rpc_data->stream; + req_app_link->use_count = 1; + req_app_link->req_rpc_data = req_rpc_data; + req_rpc_data->req_app_link = req_app_link; + req_app_link->reply_port = engine->port; + req_app_link->request = req_rpc_data->request; + req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - ra->work.handler = NULL; - ra->work.task = &engine->task; - ra->work.obj = ra; - ra->work.data = engine; + req_app_link->work.handler = NULL; + req_app_link->work.task = &engine->task; + req_app_link->work.obj = req_app_link; + req_app_link->work.data = engine; } -nxt_inline nxt_req_app_link_t * -nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) +nxt_inline nxt_request_app_link_t * +nxt_request_app_link_alloc(nxt_task_t *task, + nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) { - nxt_mp_t *mp; - nxt_req_app_link_t *ra; + nxt_mp_t *mp; + nxt_request_app_link_t *req_app_link; - if (ra_src->mem_pool != NULL) { + if (ra_src != NULL && ra_src->mem_pool != NULL) { return ra_src; } - mp = ra_src->request->mem_pool; + mp = req_rpc_data->request->mem_pool; + + req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); + if (nxt_slow_path(req_app_link == NULL)) { - if (nxt_slow_path(ra == NULL)) { + req_rpc_data->req_app_link = NULL; - ra_src->rc->ra = NULL; - ra_src->rc = NULL; + if (ra_src != NULL) { + ra_src->req_rpc_data = NULL; + } return NULL; } nxt_mp_retain(mp); - nxt_router_ra_init(task, ra, ra_src->rc); + nxt_request_app_link_init(task, req_app_link, req_rpc_data); - ra->mem_pool = mp; + req_app_link->mem_pool = mp; - return ra; + return req_app_link; } @@ -584,177 +607,187 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, static void -nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); - - -static void -nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) +nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, + void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = obj; + req_app_link = obj; - nxt_router_ra_update_peer(task, ra); + nxt_request_app_link_update_peer(task, req_app_link); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_use(task, req_app_link, -1); } static void -nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_request_app_link_update_peer(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { - nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + nxt_request_rpc_data_t *req_rpc_data; - engine = ra->work.data; + engine = req_app_link->work.data; if (task->thread->engine != engine) { - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->work.handler = nxt_router_ra_update_peer_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; + req_app_link->work.handler = nxt_request_app_link_update_peer_handler; + req_app_link->work.task = &engine->task; + req_app_link->work.next = NULL; - nxt_debug(task, "ra stream #%uD post update peer to %p", - ra->stream, engine); + nxt_debug(task, "req_app_link stream #%uD post update peer to %p", + req_app_link->stream, engine); - nxt_event_engine_post(engine, &ra->work); + nxt_event_engine_post(engine, &req_app_link->work); return; } - nxt_debug(task, "ra stream #%uD update peer", ra->stream); + nxt_debug(task, "req_app_link stream #%uD update peer", + req_app_link->stream); - rc = ra->rc; + req_rpc_data = req_app_link->req_rpc_data; - if (rc != NULL && ra->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); + if (req_rpc_data != NULL && req_app_link->app_port != NULL) { + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, + req_app_link->app_port->pid); } - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_use(task, req_app_link, -1); } static void -nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_request_app_link_release(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { nxt_mp_t *mp; - nxt_req_conn_link_t *rc; + nxt_request_rpc_data_t *req_rpc_data; - nxt_assert(task->thread->engine == ra->work.data); - nxt_assert(ra->use_count == 0); + nxt_assert(task->thread->engine == req_app_link->work.data); + nxt_assert(req_app_link->use_count == 0); - nxt_debug(task, "ra stream #%uD release", ra->stream); + nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); - rc = ra->rc; + req_rpc_data = req_app_link->req_rpc_data; - if (rc != NULL) { - if (nxt_slow_path(ra->err_code != 0)) { - nxt_http_request_error(task, rc->request, ra->err_code); + if (req_rpc_data != NULL) { + if (nxt_slow_path(req_app_link->err_code != 0)) { + nxt_http_request_error(task, req_rpc_data->request, + req_app_link->err_code); } else { - rc->app_port = ra->app_port; - rc->msg_info = ra->msg_info; - - if (rc->app->timeout != 0) { - rc->request->timer.handler = nxt_router_app_timeout; - rc->request->timer_data = rc; - nxt_timer_add(task->thread->engine, &rc->request->timer, - rc->app->timeout); + req_rpc_data->app_port = req_app_link->app_port; + req_rpc_data->apr_action = req_app_link->apr_action; + req_rpc_data->msg_info = req_app_link->msg_info; + + if (req_rpc_data->app->timeout != 0) { + req_rpc_data->request->timer.handler = nxt_router_app_timeout; + req_rpc_data->request->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, + &req_rpc_data->request->timer, + req_rpc_data->app->timeout); } - ra->app_port = NULL; - ra->msg_info.buf = NULL; + req_app_link->app_port = NULL; + req_app_link->msg_info.buf = NULL; } - rc->ra = NULL; - ra->rc = NULL; + req_rpc_data->req_app_link = NULL; + req_app_link->req_rpc_data = NULL; } - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); + if (req_app_link->app_port != NULL) { + nxt_router_app_port_release(task, req_app_link->app_port, + req_app_link->apr_action); - ra->app_port = NULL; + req_app_link->app_port = NULL; } - nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); - mp = ra->mem_pool; + mp = req_app_link->mem_pool; if (mp != NULL) { - nxt_mp_free(mp, ra); + nxt_mp_free(mp, req_app_link); nxt_mp_release(mp); } } static void -nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) +nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = obj; + req_app_link = obj; - nxt_assert(ra->work.data == data); + nxt_assert(req_app_link->work.data == data); - nxt_atomic_fetch_add(&ra->use_count, -1); + nxt_atomic_fetch_add(&req_app_link->use_count, -1); - nxt_router_ra_release(task, ra); + nxt_request_app_link_release(task, req_app_link); } static void -nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) +nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, + int i) { int c; nxt_event_engine_t *engine; - c = nxt_atomic_fetch_add(&ra->use_count, i); + c = nxt_atomic_fetch_add(&req_app_link->use_count, i); if (i < 0 && c == -i) { - engine = ra->work.data; + engine = req_app_link->work.data; if (task->thread->engine == engine) { - nxt_router_ra_release(task, ra); + nxt_request_app_link_release(task, req_app_link); return; } - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->work.handler = nxt_router_ra_release_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; + req_app_link->work.handler = nxt_request_app_link_release_handler; + req_app_link->work.task = &engine->task; + req_app_link->work.next = NULL; - nxt_debug(task, "ra stream #%uD post release to %p", - ra->stream, engine); + nxt_debug(task, "req_app_link stream #%uD post release to %p", + req_app_link->stream, engine); - nxt_event_engine_post(engine, &ra->work); + nxt_event_engine_post(engine, &req_app_link->work); } } nxt_inline void -nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) +nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code, + const char *str) { - ra->app_port = NULL; - ra->err_code = code; - ra->err_str = str; + req_app_link->app_port = NULL; + req_app_link->err_code = code; + req_app_link->err_str = str; } nxt_inline void -nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, + nxt_request_app_link_t *req_app_link) { - nxt_queue_insert_tail(&ra->app_port->pending_requests, - &ra->link_port_pending); - nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); + nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, + &req_app_link->link_port_pending); + nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; + req_app_link->res_time = nxt_thread_monotonic_time(task->thread) + + app->res_timeout; - nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); + nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", + req_app_link->stream); } @@ -774,60 +807,63 @@ nxt_queue_chk_remove(nxt_queue_link_t *lnk) nxt_inline void -nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) +nxt_request_rpc_data_unlink(nxt_task_t *task, + nxt_request_rpc_data_t *req_rpc_data) { - int ra_use_delta; - nxt_req_app_link_t *ra; + int ra_use_delta; + nxt_request_app_link_t *req_app_link; - if (rc->app_port != NULL) { - nxt_router_app_port_release(task, rc->app_port, 0, 1); + if (req_rpc_data->app_port != NULL) { + nxt_router_app_port_release(task, req_rpc_data->app_port, + req_rpc_data->apr_action); - rc->app_port = NULL; + req_rpc_data->app_port = NULL; } - nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); - - ra = rc->ra; + nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - if (ra != NULL) { - rc->ra = NULL; - ra->rc = NULL; + req_app_link = req_rpc_data->req_app_link; + if (req_app_link != NULL) { + req_rpc_data->req_app_link = NULL; + req_app_link->req_rpc_data = NULL; ra_use_delta = 0; - nxt_thread_mutex_lock(&rc->app->mutex); + nxt_thread_mutex_lock(&req_rpc_data->app->mutex); - if (ra->link_app_requests.next == NULL - && ra->link_port_pending.next == NULL - && ra->link_app_pending.next == NULL) + if (req_app_link->link_app_requests.next == NULL + && req_app_link->link_port_pending.next == NULL + && req_app_link->link_app_pending.next == NULL) { - ra = NULL; + req_app_link = NULL; } else { - ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); - ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); - nxt_queue_chk_remove(&ra->link_app_pending); + ra_use_delta -= + nxt_queue_chk_remove(&req_app_link->link_app_requests) + + nxt_queue_chk_remove(&req_app_link->link_port_pending); + + nxt_queue_chk_remove(&req_app_link->link_app_pending); } - nxt_thread_mutex_unlock(&rc->app->mutex); + nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); - if (ra != NULL) { - nxt_router_ra_use(task, ra, ra_use_delta); + if (req_app_link != NULL) { + nxt_request_app_link_use(task, req_app_link, ra_use_delta); } } - if (rc->app != NULL) { - nxt_router_app_use(task, rc->app, -1); + if (req_rpc_data->app != NULL) { + nxt_router_app_use(task, req_rpc_data->app, -1); - rc->app = NULL; + req_rpc_data->app = NULL; } - if (rc->request != NULL) { - rc->request->timer_data = NULL; + if (req_rpc_data->request != NULL) { + req_rpc_data->request->timer_data = NULL; - nxt_router_http_request_done(task, rc->request); + nxt_router_http_request_done(task, req_rpc_data->request); - rc->request = NULL; + req_rpc_data->request = NULL; } } @@ -3380,28 +3416,28 @@ static void nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t ret; - nxt_buf_t *b; - nxt_unit_field_t *f; - nxt_http_field_t *field; - nxt_http_request_t *r; - nxt_req_conn_link_t *rc; - nxt_unit_response_t *resp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_unit_field_t *f; + nxt_http_field_t *field; + nxt_http_request_t *r; + nxt_unit_response_t *resp; + nxt_request_rpc_data_t *req_rpc_data; b = msg->buf; - rc = data; + req_rpc_data = data; if (msg->size == 0) { b = NULL; } - r = rc->request; + r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; } if (r->error) { - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); return; } @@ -3410,13 +3446,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, nxt_http_buf_last(r)); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (rc->app != NULL && rc->app->timeout != 0) { + if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; - r->timer_data = rc; - nxt_timer_add(task->thread->engine, &r->timer, rc->app->timeout); + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, + req_rpc_data->app->timeout); } } @@ -3504,7 +3541,7 @@ fail: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } @@ -3537,36 +3574,37 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_req_app_link_t *ra; - nxt_req_conn_link_t *rc; + nxt_int_t res; + nxt_port_t *port; + nxt_bool_t cancelled; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; - rc = data; + req_rpc_data = data; - ra = rc->ra; - - if (ra != NULL) { - cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + req_app_link = req_rpc_data->req_app_link; + if (req_app_link != NULL) { + cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, + req_app_link->stream); if (cancelled) { - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - res = nxt_router_app_port(task, rc->app, ra); + res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); if (res == NXT_OK) { - port = ra->app_port; + port = req_app_link->app_port; if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra"); + nxt_log(task, NXT_LOG_ERR, + "port is NULL in cancelled req_app_link"); return; } - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, - port->pid); + nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, + req_rpc_data, port->pid); - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } msg->port_msg.last = 0; @@ -3575,12 +3613,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } } - if (rc->request != NULL) { - nxt_http_request_error(task, rc->request, + if (req_rpc_data->request != NULL) { + nxt_http_request_error(task, req_rpc_data->request, NXT_HTTP_SERVICE_UNAVAILABLE); } - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } @@ -3624,7 +3662,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_port_release(task, port, 0, 0); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } @@ -3632,10 +3670,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *lnk; + nxt_request_app_link_t *req_app_link; app_joint = data; @@ -3664,20 +3702,22 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); + req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_requests); } else { - ra = NULL; + req_app_link = NULL; } nxt_thread_mutex_unlock(&app->mutex); - if (ra != NULL) { + if (req_app_link != NULL) { nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, ra->stream); + &app->name, app, req_app_link->stream); - nxt_router_ra_error(ra, 500, "Failed to start application process"); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_error(req_app_link, 500, + "Failed to start application process"); + nxt_request_app_link_use(task, req_app_link, -1); } } @@ -3811,9 +3851,9 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = data; + req_app_link = data; #if (NXT_DEBUG) { @@ -3822,39 +3862,62 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) app = obj; nxt_assert(app != NULL); - nxt_assert(ra != NULL); - nxt_assert(ra->app_port != NULL); + nxt_assert(req_app_link != NULL); + nxt_assert(req_app_link->app_port != NULL); nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, ra->stream); + &app->name, app, req_app_link->stream); } #endif - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - uint32_t request_failed, uint32_t got_response) + nxt_apr_action_t action) { + int inc_use; + uint32_t dec_pending, got_response; nxt_app_t *app; nxt_bool_t port_unchained; nxt_bool_t send_quit, cancelled, adjust_idle_timer; nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra, *pending_ra, *re_ra; + nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; nxt_port_select_state_t state; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - ra = NULL; + req_app_link = NULL; app = port->app; + inc_use = 0; + dec_pending = 0; + got_response = 0; + + switch (action) { + case NXT_APR_NEW_PORT: + break; + case NXT_APR_REQUEST_FAILED: + dec_pending = 1; + inc_use = -1; + break; + case NXT_APR_GOT_RESPONSE: + dec_pending = 1; + got_response = 1; + inc_use = -1; + break; + case NXT_APR_CLOSE: + inc_use = -1; + break; + } + nxt_thread_mutex_lock(&app->mutex); - port->app_pending_responses -= request_failed + got_response; + port->app_pending_responses -= dec_pending; port->app_responses += got_response; if (port->pair[1] != -1 @@ -3891,24 +3954,25 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); + req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_requests); - ra->app_port = nxt_router_pop_first_port(app); + req_app_link->app_port = nxt_router_pop_first_port(app); - if (ra->app_port->app_pending_responses > 1) { - nxt_router_ra_pending(task, app, ra); + if (req_app_link->app_port->app_pending_responses > 1) { + nxt_request_app_link_pending(task, app, req_app_link); } } /* Pop first pending request for this port. */ - if ((request_failed > 0 || got_response > 0) + if (dec_pending > 0 && !nxt_queue_is_empty(&port->pending_requests)) { lnk = nxt_queue_first(&port->pending_requests); nxt_queue_remove(lnk); lnk->next = NULL; - pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, link_port_pending); nxt_assert(pending_ra->link_app_pending.next != NULL); @@ -3924,7 +3988,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { lnk = nxt_queue_first(&app->pending); - re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); + re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_pending); if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { @@ -3935,9 +4000,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, re_ra->stream); if (cancelled) { - nxt_router_ra_inc_use(re_ra); + nxt_request_app_link_inc_use(re_ra); - state.ra = re_ra; + state.req_app_link = re_ra; state.app = app; nxt_router_port_select(task, &state); @@ -3998,7 +4063,7 @@ re_ra_cancelled: } if (pending_ra != NULL) { - nxt_router_ra_use(task, pending_ra, -1); + nxt_request_app_link_use(task, pending_ra, -1); } if (re_ra != NULL) { @@ -4009,12 +4074,12 @@ re_ra_cancelled: } } - if (ra != NULL) { - nxt_router_ra_use(task, ra, -1); + if (req_app_link != NULL) { + nxt_request_app_link_use(task, req_app_link, -1); nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, - &task->thread->engine->task, app, ra); + &task->thread->engine->task, app, req_app_link); goto adjust_use; } @@ -4046,9 +4111,7 @@ re_ra_cancelled: adjust_use: - if (request_failed > 0 || got_response > 0) { - nxt_port_use(task, port, -1); - } + nxt_port_use(task, port, inc_use); } @@ -4265,33 +4328,33 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_bool_t can_start_process; + nxt_request_app_link_t *req_app_link; - ra = state->ra; + req_app_link = state->req_app_link; app = state->app; state->failed_port_use_delta = 0; - if (nxt_queue_chk_remove(&ra->link_app_requests)) + if (nxt_queue_chk_remove(&req_app_link->link_app_requests)) { - nxt_router_ra_dec_use(ra); + nxt_request_app_link_dec_use(req_app_link); } - if (nxt_queue_chk_remove(&ra->link_port_pending)) + if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) { - nxt_assert(ra->link_app_pending.next != NULL); + nxt_assert(req_app_link->link_app_pending.next != NULL); - nxt_queue_remove(&ra->link_app_pending); - ra->link_app_pending.next = NULL; + nxt_queue_remove(&req_app_link->link_app_pending); + req_app_link->link_app_pending.next = NULL; - nxt_router_ra_dec_use(ra); + nxt_request_app_link_dec_use(req_app_link); } - state->failed_port = ra->app_port; + state->failed_port = req_app_link->app_port; - if (ra->app_port != NULL) { + if (req_app_link->app_port != NULL) { state->failed_port_use_delta--; state->failed_port->app_pending_responses--; @@ -4300,7 +4363,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) state->failed_port_use_delta--; } - ra->app_port = NULL; + req_app_link->app_port = NULL; } can_start_process = nxt_router_app_can_start(app); @@ -4311,22 +4374,25 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) if (nxt_queue_is_empty(&app->ports) || (can_start_process && nxt_router_app_first_port_busy(app)) ) { - ra = nxt_router_ra_create(task, ra); - - if (nxt_slow_path(ra == NULL)) { + req_app_link = nxt_request_app_link_alloc(task, req_app_link, + req_app_link->req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { goto fail; } if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, &ra->link_app_requests); + nxt_queue_insert_head(&app->requests, + &req_app_link->link_app_requests); } else { - nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + nxt_queue_insert_tail(&app->requests, + &req_app_link->link_app_requests); } - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); + nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", + req_app_link->stream); if (can_start_process) { app->pending_processes++; @@ -4337,15 +4403,15 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) state->port = nxt_router_pop_first_port(app); if (state->port->app_pending_responses > 1) { - ra = nxt_router_ra_create(task, ra); - - if (nxt_slow_path(ra == NULL)) { + req_app_link = nxt_request_app_link_alloc(task, req_app_link, + req_app_link->req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { goto fail; } - ra->app_port = state->port; + req_app_link->app_port = state->port; - nxt_router_ra_pending(task, app, ra); + nxt_request_app_link_pending(task, app, req_app_link); } if (can_start_process && nxt_router_app_need_start(app)) { @@ -4356,32 +4422,32 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) fail: - state->shared_ra = ra; + state->shared_ra = req_app_link; } static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) { - nxt_int_t res; - nxt_app_t *app; - nxt_req_app_link_t *ra; + nxt_int_t res; + nxt_app_t *app; + nxt_request_app_link_t *req_app_link; - ra = state->shared_ra; + req_app_link = state->shared_ra; app = state->app; if (state->failed_port_use_delta != 0) { nxt_port_use(task, state->failed_port, state->failed_port_use_delta); } - if (nxt_slow_path(ra == NULL)) { + if (nxt_slow_path(req_app_link == NULL)) { if (state->port != NULL) { nxt_port_use(task, state->port, -1); } - nxt_router_ra_error(state->ra, 500, + nxt_request_app_link_error(state->req_app_link, 500, "Failed to allocate shared req<->app link"); - nxt_router_ra_use(task, state->ra, -1); + nxt_request_app_link_use(task, state->req_app_link, -1); return NXT_ERROR; } @@ -4389,7 +4455,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) if (state->port != NULL) { nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); - ra->app_port = state->port; + req_app_link->app_port = state->port; if (state->start_process) { nxt_router_start_app_process(task, app); @@ -4408,8 +4474,9 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) res = nxt_router_start_app_process(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, "Failed to start app process"); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_error(req_app_link, 500, + "Failed to start app process"); + nxt_request_app_link_use(task, req_app_link, -1); return NXT_ERROR; } @@ -4419,11 +4486,12 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, + nxt_request_app_link_t *req_app_link) { nxt_port_select_state_t state; - state.ra = ra; + state.req_app_link = req_app_link; state.app = app; nxt_thread_mutex_lock(&app->mutex); @@ -4440,48 +4508,47 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; + nxt_int_t res; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_request_app_link_t ra_local, *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; - rc = nxt_port_rpc_register_handler_ex(task, engine->port, + req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, nxt_router_response_ready_handler, nxt_router_response_error_handler, - sizeof(nxt_req_conn_link_t)); - - if (nxt_slow_path(rc == NULL)) { + sizeof(nxt_request_rpc_data_t)); + if (nxt_slow_path(req_rpc_data == NULL)) { nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } - rc->stream = nxt_port_rpc_ex_stream(rc); - rc->app = app; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); + req_rpc_data->app = app; nxt_router_app_use(task, app, 1); - rc->request = r; + req_rpc_data->request = r; - ra = &ra_local; - nxt_router_ra_init(task, ra, rc); + req_app_link = &ra_local; + nxt_request_app_link_init(task, req_app_link, req_rpc_data); - res = nxt_router_app_port(task, app, ra); + res = nxt_router_app_port(task, app, req_app_link); if (res != NXT_OK) { return; } - ra = rc->ra; - port = ra->app_port; + req_app_link = req_rpc_data->req_app_link; + port = req_app_link->app_port; nxt_assert(port != NULL); - nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } @@ -4492,19 +4559,20 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void -nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_router_app_prepare_request(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { - uint32_t request_failed; - nxt_buf_t *buf; - nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_buf_t *buf; + nxt_int_t res; + nxt_port_t *port, *c_port, *reply_port; + nxt_apr_action_t apr_action; - nxt_assert(ra->app_port != NULL); + nxt_assert(req_app_link->app_port != NULL); - port = ra->app_port; - reply_port = ra->reply_port; + port = req_app_link->app_port; + reply_port = req_app_link->reply_port; - request_failed = 1; + apr_action = NXT_APR_REQUEST_FAILED; c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); @@ -4512,7 +4580,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, + nxt_request_app_link_error(req_app_link, 500, "Failed to send reply port to application"); goto release_port; } @@ -4520,11 +4588,11 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_process_connected_port_add(port->process, reply_port); } - buf = nxt_router_prepare_msg(task, ra->request, port, + buf = nxt_router_prepare_msg(task, req_app_link->request, port, nxt_app_msg_prefix[port->app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_router_ra_error(ra, 500, + nxt_request_app_link_error(req_app_link, 500, "Failed to prepare message for application"); goto release_port; } @@ -4533,40 +4601,41 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_buf_used_size(buf), port->socket.fd); - request_failed = 0; + apr_action = NXT_APR_NEW_PORT; - ra->msg_info.buf = buf; - ra->msg_info.completion_handler = buf->completion_handler; + req_app_link->msg_info.buf = buf; + req_app_link->msg_info.completion_handler = buf->completion_handler; for (; buf; buf = buf->next) { buf->completion_handler = nxt_router_dummy_buf_completion; } - buf = ra->msg_info.buf; + buf = req_app_link->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, - ra->stream); + res = nxt_port_mmap_get_tracking(task, port, + &req_app_link->msg_info.tracking, + req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, - "Failed to get tracking area"); + nxt_request_app_link_error(req_app_link, 500, + "Failed to get tracking area"); goto release_port; } res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA, - -1, ra->stream, reply_port->id, buf, - &ra->msg_info.tracking); + -1, req_app_link->stream, reply_port->id, buf, + &req_app_link->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, - "Failed to send message to application"); + nxt_request_app_link_error(req_app_link, 500, + "Failed to send message to application"); goto release_port; } release_port: - nxt_router_app_port_release(task, port, request_failed, 0); + nxt_router_app_port_release(task, port, apr_action); - nxt_router_ra_update_peer(task, ra); + nxt_request_app_link_update_peer(task, req_app_link); } @@ -4897,8 +4966,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_timer_t *timer; nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_req_app_link_t *pending_ra; - nxt_req_conn_link_t *rc; + nxt_request_app_link_t *pending_ra; + nxt_request_rpc_data_t *req_rpc_data; nxt_port_select_state_t state; timer = obj; @@ -4906,8 +4975,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router app timeout"); r = nxt_timer_data(timer, nxt_http_request_t, timer); - rc = r->timer_data; - app = rc->app; + req_rpc_data = r->timer_data; + app = req_rpc_data->app; if (app == NULL) { goto generate_error; @@ -4916,14 +4985,16 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) port = NULL; pending_ra = NULL; - if (rc->app_port != NULL) { - port = rc->app_port; - rc->app_port = NULL; + if (req_rpc_data->app_port != NULL) { + port = req_rpc_data->app_port; + req_rpc_data->app_port = NULL; } - if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) { - port = rc->ra->app_port; - rc->ra->app_port = NULL; + if (port == NULL && req_rpc_data->req_app_link != NULL + && req_rpc_data->req_app_link->app_port != NULL) + { + port = req_rpc_data->req_app_link->app_port; + req_rpc_data->req_app_link->app_port = NULL; } if (port == NULL) { @@ -4937,7 +5008,7 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) if (!nxt_queue_is_empty(&port->pending_requests)) { lnk = nxt_queue_first(&port->pending_requests); - pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, link_port_pending); nxt_assert(pending_ra->link_app_pending.next != NULL); @@ -4949,9 +5020,9 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) pending_ra->stream); if (cancelled) { - nxt_router_ra_inc_use(pending_ra); + nxt_request_app_link_inc_use(pending_ra); - state.ra = pending_ra; + state.req_app_link = pending_ra; state.app = app; nxt_router_port_select(task, &state); @@ -4979,7 +5050,7 @@ generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } diff --git a/src/nxt_router.h b/src/nxt_router.h index d9fbfe05..ff791e3d 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -108,8 +108,8 @@ struct nxt_app_s { nxt_work_t adjust_idle_work; nxt_event_engine_t *engine; - nxt_queue_t requests; /* of nxt_req_app_link_t */ - nxt_queue_t pending; /* of nxt_req_app_link_t */ + nxt_queue_t requests; /* of nxt_request_app_link_t */ + nxt_queue_t pending; /* of nxt_request_app_link_t */ nxt_str_t name; uint32_t pending_processes; |