diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 1280 |
1 files changed, 701 insertions, 579 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 149a0ff3..b87f588f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -14,7 +14,7 @@ #include <nxt_port_memory_int.h> #include <nxt_unit_request.h> #include <nxt_unit_response.h> - +#include <nxt_router_request.h> typedef struct { nxt_str_t type; @@ -48,51 +48,6 @@ typedef struct { #endif -typedef struct nxt_msg_info_s { - nxt_buf_t *buf; - nxt_port_mmap_tracking_t tracking; - nxt_work_handler_t completion_handler; -} nxt_msg_info_t; - - -typedef struct nxt_req_app_link_s nxt_req_app_link_t; - - -typedef struct { - uint32_t stream; - nxt_app_t *app; - nxt_port_t *app_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_req_app_link_t *ra; - - nxt_queue_link_t link; /* for nxt_conn_t.requests */ -} nxt_req_conn_link_t; - - -struct nxt_req_app_link_s { - uint32_t stream; - nxt_atomic_t use_count; - nxt_port_t *app_port; - nxt_port_t *reply_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_req_conn_link_t *rc; - - nxt_nsec_t res_time; - - nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ - nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ - nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ - - nxt_mp_t *mem_pool; - nxt_work_t work; - - int err_code; - const char *err_str; -}; - - typedef struct { nxt_socket_conf_t *socket_conf; nxt_router_temp_conf_t *temp_conf; @@ -106,15 +61,15 @@ typedef struct { struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_request_app_link_t *req_app_link; - nxt_port_t *failed_port; - int failed_port_use_delta; + nxt_port_t *failed_port; + int failed_port_use_delta; - uint8_t start_process; /* 1 bit */ - nxt_req_app_link_t *shared_ra; - nxt_port_t *port; + uint8_t start_process; /* 1 bit */ + nxt_request_app_link_t *shared_ra; + nxt_port_t *port; }; typedef struct nxt_port_select_state_s nxt_port_select_state_t; @@ -129,28 +84,32 @@ static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state); static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); +static void nxt_request_app_link_update_peer(nxt_task_t *task, + nxt_request_app_link_t *req_app_link); + nxt_inline void -nxt_router_ra_inc_use(nxt_req_app_link_t *ra) +nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) { - nxt_atomic_fetch_add(&ra->use_count, 1); + nxt_atomic_fetch_add(&req_app_link->use_count, 1); } nxt_inline void -nxt_router_ra_dec_use(nxt_req_app_link_t *ra) +nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) { #if (NXT_DEBUG) int c; - c = nxt_atomic_fetch_add(&ra->use_count, -1); + c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); nxt_assert(c > 1); #else - (void) nxt_atomic_fetch_add(&ra->use_count, -1); + (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); #endif } -static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); +static void nxt_request_app_link_use(nxt_task_t *task, + nxt_request_app_link_t *req_app_link, int i); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -257,13 +216,14 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); + static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - uint32_t request_failed, uint32_t got_response); + nxt_apr_action_t action); static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_req_app_link_t *ra); + nxt_request_app_link_t *req_app_link); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_req_app_link_t *ra); + nxt_request_app_link_t *req_app_link); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); @@ -287,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); +const nxt_http_request_state_t nxt_http_websocket; + static nxt_router_t *nxt_router; static const nxt_str_t http_prefix = nxt_string("HTTP_"); @@ -493,58 +455,63 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_inline void -nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, - nxt_req_conn_link_t *rc) +nxt_request_app_link_init(nxt_task_t *task, + nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) { nxt_event_engine_t *engine; engine = task->thread->engine; - nxt_memzero(ra, sizeof(nxt_req_app_link_t)); + nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - ra->stream = rc->stream; - ra->use_count = 1; - ra->rc = rc; - rc->ra = ra; - ra->reply_port = engine->port; - ra->request = rc->request; + req_app_link->stream = req_rpc_data->stream; + req_app_link->use_count = 1; + req_app_link->req_rpc_data = req_rpc_data; + req_rpc_data->req_app_link = req_app_link; + req_app_link->reply_port = engine->port; + req_app_link->request = req_rpc_data->request; + req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - ra->work.handler = NULL; - ra->work.task = &engine->task; - ra->work.obj = ra; - ra->work.data = engine; + req_app_link->work.handler = NULL; + req_app_link->work.task = &engine->task; + req_app_link->work.obj = req_app_link; + req_app_link->work.data = engine; } -nxt_inline nxt_req_app_link_t * -nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) +nxt_inline nxt_request_app_link_t * +nxt_request_app_link_alloc(nxt_task_t *task, + nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) { - nxt_mp_t *mp; - nxt_req_app_link_t *ra; + nxt_mp_t *mp; + nxt_request_app_link_t *req_app_link; - if (ra_src->mem_pool != NULL) { + if (ra_src != NULL && ra_src->mem_pool != NULL) { return ra_src; } - mp = ra_src->request->mem_pool; + mp = req_rpc_data->request->mem_pool; - ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t)); + req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - if (nxt_slow_path(ra == NULL)) { + if (nxt_slow_path(req_app_link == NULL)) { - ra_src->rc->ra = NULL; - ra_src->rc = NULL; + req_rpc_data->req_app_link = NULL; + + if (ra_src != NULL) { + ra_src->req_rpc_data = NULL; + } return NULL; } nxt_mp_retain(mp); - nxt_router_ra_init(task, ra, ra_src->rc); + nxt_request_app_link_init(task, req_app_link, req_rpc_data); - ra->mem_pool = mp; + req_app_link->mem_pool = mp; - return ra; + return req_app_link; } @@ -584,177 +551,189 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, static void -nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); - - -static void -nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) +nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, + void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = obj; + req_app_link = obj; - nxt_router_ra_update_peer(task, ra); + nxt_request_app_link_update_peer(task, req_app_link); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_use(task, req_app_link, -1); } static void -nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_request_app_link_update_peer(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { - nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + nxt_request_rpc_data_t *req_rpc_data; - engine = ra->work.data; + engine = req_app_link->work.data; if (task->thread->engine != engine) { - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->work.handler = nxt_router_ra_update_peer_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; + req_app_link->work.handler = nxt_request_app_link_update_peer_handler; + req_app_link->work.task = &engine->task; + req_app_link->work.next = NULL; - nxt_debug(task, "ra stream #%uD post update peer to %p", - ra->stream, engine); + nxt_debug(task, "req_app_link stream #%uD post update peer to %p", + req_app_link->stream, engine); - nxt_event_engine_post(engine, &ra->work); + nxt_event_engine_post(engine, &req_app_link->work); return; } - nxt_debug(task, "ra stream #%uD update peer", ra->stream); + nxt_debug(task, "req_app_link stream #%uD update peer", + req_app_link->stream); - rc = ra->rc; + req_rpc_data = req_app_link->req_rpc_data; - if (rc != NULL && ra->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); + if (req_rpc_data != NULL && req_app_link->app_port != NULL) { + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, + req_app_link->app_port->pid); } - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_use(task, req_app_link, -1); } static void -nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_request_app_link_release(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { nxt_mp_t *mp; - nxt_req_conn_link_t *rc; + nxt_http_request_t *r; + nxt_request_rpc_data_t *req_rpc_data; - nxt_assert(task->thread->engine == ra->work.data); - nxt_assert(ra->use_count == 0); + nxt_assert(task->thread->engine == req_app_link->work.data); + nxt_assert(req_app_link->use_count == 0); - nxt_debug(task, "ra stream #%uD release", ra->stream); + nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); - rc = ra->rc; + req_rpc_data = req_app_link->req_rpc_data; - if (rc != NULL) { - if (nxt_slow_path(ra->err_code != 0)) { - nxt_http_request_error(task, rc->request, ra->err_code); + if (req_rpc_data != NULL) { + if (nxt_slow_path(req_app_link->err_code != 0)) { + nxt_http_request_error(task, req_rpc_data->request, + req_app_link->err_code); } else { - rc->app_port = ra->app_port; - rc->msg_info = ra->msg_info; - - if (rc->app->timeout != 0) { - rc->request->timer.handler = nxt_router_app_timeout; - rc->request->timer_data = rc; - nxt_timer_add(task->thread->engine, &rc->request->timer, - rc->app->timeout); + req_rpc_data->app_port = req_app_link->app_port; + req_rpc_data->apr_action = req_app_link->apr_action; + req_rpc_data->msg_info = req_app_link->msg_info; + + if (req_rpc_data->app->timeout != 0) { + r = req_rpc_data->request; + + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, + req_rpc_data->app->timeout); } - ra->app_port = NULL; - ra->msg_info.buf = NULL; + req_app_link->app_port = NULL; + req_app_link->msg_info.buf = NULL; } - rc->ra = NULL; - ra->rc = NULL; + req_rpc_data->req_app_link = NULL; + req_app_link->req_rpc_data = NULL; } - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); + if (req_app_link->app_port != NULL) { + nxt_router_app_port_release(task, req_app_link->app_port, + req_app_link->apr_action); - ra->app_port = NULL; + req_app_link->app_port = NULL; } - nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); - mp = ra->mem_pool; + mp = req_app_link->mem_pool; if (mp != NULL) { - nxt_mp_free(mp, ra); + nxt_mp_free(mp, req_app_link); nxt_mp_release(mp); } } static void -nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) +nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = obj; + req_app_link = obj; - nxt_assert(ra->work.data == data); + nxt_assert(req_app_link->work.data == data); - nxt_atomic_fetch_add(&ra->use_count, -1); + nxt_atomic_fetch_add(&req_app_link->use_count, -1); - nxt_router_ra_release(task, ra); + nxt_request_app_link_release(task, req_app_link); } static void -nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) +nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, + int i) { int c; nxt_event_engine_t *engine; - c = nxt_atomic_fetch_add(&ra->use_count, i); + c = nxt_atomic_fetch_add(&req_app_link->use_count, i); if (i < 0 && c == -i) { - engine = ra->work.data; + engine = req_app_link->work.data; if (task->thread->engine == engine) { - nxt_router_ra_release(task, ra); + nxt_request_app_link_release(task, req_app_link); return; } - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->work.handler = nxt_router_ra_release_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; + req_app_link->work.handler = nxt_request_app_link_release_handler; + req_app_link->work.task = &engine->task; + req_app_link->work.next = NULL; - nxt_debug(task, "ra stream #%uD post release to %p", - ra->stream, engine); + nxt_debug(task, "req_app_link stream #%uD post release to %p", + req_app_link->stream, engine); - nxt_event_engine_post(engine, &ra->work); + nxt_event_engine_post(engine, &req_app_link->work); } } nxt_inline void -nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str) +nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code, + const char *str) { - ra->app_port = NULL; - ra->err_code = code; - ra->err_str = str; + req_app_link->app_port = NULL; + req_app_link->err_code = code; + req_app_link->err_str = str; } nxt_inline void -nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, + nxt_request_app_link_t *req_app_link) { - nxt_queue_insert_tail(&ra->app_port->pending_requests, - &ra->link_port_pending); - nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); + nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, + &req_app_link->link_port_pending); + nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; + req_app_link->res_time = nxt_thread_monotonic_time(task->thread) + + app->res_timeout; - nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); + nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", + req_app_link->stream); } @@ -774,60 +753,66 @@ nxt_queue_chk_remove(nxt_queue_link_t *lnk) nxt_inline void -nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) +nxt_request_rpc_data_unlink(nxt_task_t *task, + nxt_request_rpc_data_t *req_rpc_data) { - int ra_use_delta; - nxt_req_app_link_t *ra; + int ra_use_delta; + nxt_request_app_link_t *req_app_link; - if (rc->app_port != NULL) { - nxt_router_app_port_release(task, rc->app_port, 0, 1); + if (req_rpc_data->app_port != NULL) { + nxt_router_app_port_release(task, req_rpc_data->app_port, + req_rpc_data->apr_action); - rc->app_port = NULL; + req_rpc_data->app_port = NULL; } - nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); - - ra = rc->ra; + nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - if (ra != NULL) { - rc->ra = NULL; - ra->rc = NULL; + req_app_link = req_rpc_data->req_app_link; + if (req_app_link != NULL) { + req_rpc_data->req_app_link = NULL; + req_app_link->req_rpc_data = NULL; ra_use_delta = 0; - nxt_thread_mutex_lock(&rc->app->mutex); + nxt_thread_mutex_lock(&req_rpc_data->app->mutex); - if (ra->link_app_requests.next == NULL - && ra->link_port_pending.next == NULL - && ra->link_app_pending.next == NULL) + if (req_app_link->link_app_requests.next == NULL + && req_app_link->link_port_pending.next == NULL + && req_app_link->link_app_pending.next == NULL + && req_app_link->link_port_websockets.next == NULL) { - ra = NULL; + req_app_link = NULL; } else { - ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); - ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); - nxt_queue_chk_remove(&ra->link_app_pending); + ra_use_delta -= + nxt_queue_chk_remove(&req_app_link->link_app_requests) + + nxt_queue_chk_remove(&req_app_link->link_port_pending) + + nxt_queue_chk_remove(&req_app_link->link_port_websockets); + + nxt_queue_chk_remove(&req_app_link->link_app_pending); } - nxt_thread_mutex_unlock(&rc->app->mutex); + nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); - if (ra != NULL) { - nxt_router_ra_use(task, ra, ra_use_delta); + if (req_app_link != NULL) { + nxt_request_app_link_use(task, req_app_link, ra_use_delta); } } - if (rc->app != NULL) { - nxt_router_app_use(task, rc->app, -1); + if (req_rpc_data->app != NULL) { + nxt_router_app_use(task, req_rpc_data->app, -1); - rc->app = NULL; + req_rpc_data->app = NULL; } - if (rc->request != NULL) { - rc->request->timer_data = NULL; + if (req_rpc_data->request != NULL) { + req_rpc_data->request->timer_data = NULL; - nxt_router_http_request_done(task, rc->request); + nxt_router_http_request_done(task, req_rpc_data->request); - rc->request = NULL; + req_rpc_data->request->req_rpc_data = NULL; + req_rpc_data->request = NULL; } } @@ -928,10 +913,6 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_remove_pid_handler(task, msg); - if (msg->port_msg.stream == 0) { - return; - } - nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, @@ -939,6 +920,10 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } nxt_queue_loop; + if (msg->port_msg.stream == 0) { + return; + } + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; nxt_port_rpc_handler(task, msg); @@ -1376,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }; +static nxt_conf_map_t nxt_router_websocket_conf[] = { + { + nxt_string("max_frame_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_websocket_conf_t, max_frame_size), + }, + + { + nxt_string("read_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_websocket_conf_t, read_timeout), + }, + + { + nxt_string("keepalive_interval"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_websocket_conf_t, keepalive_interval), + }, + +}; + + static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end) @@ -1389,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_router_t *router; nxt_app_joint_t *app_joint; - nxt_conf_value_t *conf, *http, *value; + nxt_conf_value_t *conf, *http, *value, *websocket; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; nxt_conf_value_t *routes_conf; @@ -1412,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, #if (NXT_TLS) static nxt_str_t certificate_path = nxt_string("/tls/certificate"); #endif + static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); if (conf == NULL) { @@ -1432,177 +1440,177 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->threads = nxt_ncpu; } - applications = nxt_conf_get_path(conf, &applications_path); - if (applications == NULL) { - nxt_alert(task, "no \"applications\" block"); - return NXT_ERROR; - } - router = tmcf->router_conf->router; - next = 0; + applications = nxt_conf_get_path(conf, &applications_path); - for ( ;; ) { - application = nxt_conf_next_object_member(applications, &name, &next); - if (application == NULL) { - break; - } + if (applications != NULL) { + next = 0; - nxt_debug(task, "application \"%V\"", &name); + for ( ;; ) { + application = nxt_conf_next_object_member(applications, &name, &next); + if (application == NULL) { + break; + } - size = nxt_conf_json_length(application, NULL); + nxt_debug(task, "application \"%V\"", &name); - app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); - if (app == NULL) { - goto fail; - } + size = nxt_conf_json_length(application, NULL); - nxt_memzero(app, sizeof(nxt_app_t)); + app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); + if (app == NULL) { + goto fail; + } - app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); - app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); + nxt_memzero(app, sizeof(nxt_app_t)); - p = nxt_conf_json_print(app->conf.start, application, NULL); - app->conf.length = p - app->conf.start; + app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); + app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + + name.length); - nxt_assert(app->conf.length <= size); + p = nxt_conf_json_print(app->conf.start, application, NULL); + app->conf.length = p - app->conf.start; - nxt_debug(task, "application conf \"%V\"", &app->conf); + nxt_assert(app->conf.length <= size); - prev = nxt_router_app_find(&router->apps, &name); + nxt_debug(task, "application conf \"%V\"", &app->conf); - if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { - nxt_free(app); + prev = nxt_router_app_find(&router->apps, &name); - nxt_queue_remove(&prev->link); - nxt_queue_insert_tail(&tmcf->previous, &prev->link); - continue; - } + if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { + nxt_free(app); - apcf.processes = 1; - apcf.max_processes = 1; - apcf.spare_processes = 0; - apcf.timeout = 0; - apcf.res_timeout = 1000; - apcf.idle_timeout = 15000; - apcf.requests = 0; - apcf.limits_value = NULL; - apcf.processes_value = NULL; - - app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); - if (nxt_slow_path(app_joint == NULL)) { - goto app_fail; - } - - nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); - - ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, - nxt_nitems(nxt_router_app_conf), &apcf); - if (ret != NXT_OK) { - nxt_alert(task, "application map error"); - goto app_fail; - } - - if (apcf.limits_value != NULL) { + nxt_queue_remove(&prev->link); + nxt_queue_insert_tail(&tmcf->previous, &prev->link); + continue; + } - if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { - nxt_alert(task, "application limits is not object"); + apcf.processes = 1; + apcf.max_processes = 1; + apcf.spare_processes = 0; + apcf.timeout = 0; + apcf.res_timeout = 1000; + apcf.idle_timeout = 15000; + apcf.requests = 0; + apcf.limits_value = NULL; + apcf.processes_value = NULL; + + app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); + if (nxt_slow_path(app_joint == NULL)) { goto app_fail; } - ret = nxt_conf_map_object(mp, apcf.limits_value, - nxt_router_app_limits_conf, - nxt_nitems(nxt_router_app_limits_conf), - &apcf); + nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); + + ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, + nxt_nitems(nxt_router_app_conf), &apcf); if (ret != NXT_OK) { - nxt_alert(task, "application limits map error"); + nxt_alert(task, "application map error"); goto app_fail; } - } - if (apcf.processes_value != NULL - && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) - { - ret = nxt_conf_map_object(mp, apcf.processes_value, - nxt_router_app_processes_conf, - nxt_nitems(nxt_router_app_processes_conf), - &apcf); - if (ret != NXT_OK) { - nxt_alert(task, "application processes map error"); - goto app_fail; + if (apcf.limits_value != NULL) { + + if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { + nxt_alert(task, "application limits is not object"); + goto app_fail; + } + + ret = nxt_conf_map_object(mp, apcf.limits_value, + nxt_router_app_limits_conf, + nxt_nitems(nxt_router_app_limits_conf), + &apcf); + if (ret != NXT_OK) { + nxt_alert(task, "application limits map error"); + goto app_fail; + } } - } else { - apcf.max_processes = apcf.processes; - apcf.spare_processes = apcf.processes; - } + if (apcf.processes_value != NULL + && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) + { + ret = nxt_conf_map_object(mp, apcf.processes_value, + nxt_router_app_processes_conf, + nxt_nitems(nxt_router_app_processes_conf), + &apcf); + if (ret != NXT_OK) { + nxt_alert(task, "application processes map error"); + goto app_fail; + } - nxt_debug(task, "application type: %V", &apcf.type); - nxt_debug(task, "application processes: %D", apcf.processes); - nxt_debug(task, "application request timeout: %M", apcf.timeout); - nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); - nxt_debug(task, "application requests: %D", apcf.requests); + } else { + apcf.max_processes = apcf.processes; + apcf.spare_processes = apcf.processes; + } - lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); + nxt_debug(task, "application type: %V", &apcf.type); + nxt_debug(task, "application processes: %D", apcf.processes); + nxt_debug(task, "application request timeout: %M", apcf.timeout); + nxt_debug(task, "application reschedule timeout: %M", + apcf.res_timeout); + nxt_debug(task, "application requests: %D", apcf.requests); - if (lang == NULL) { - nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); - goto app_fail; - } + lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); - nxt_debug(task, "application language module: \"%s\"", lang->file); + if (lang == NULL) { + nxt_alert(task, "unknown application type: \"%V\"", &apcf.type); + goto app_fail; + } - ret = nxt_thread_mutex_create(&app->mutex); - if (ret != NXT_OK) { - goto app_fail; - } + nxt_debug(task, "application language module: \"%s\"", lang->file); - nxt_queue_init(&app->ports); - nxt_queue_init(&app->spare_ports); - nxt_queue_init(&app->idle_ports); - nxt_queue_init(&app->requests); - nxt_queue_init(&app->pending); + ret = nxt_thread_mutex_create(&app->mutex); + if (ret != NXT_OK) { + goto app_fail; + } - app->name.length = name.length; - nxt_memcpy(app->name.start, name.start, name.length); + nxt_queue_init(&app->ports); + nxt_queue_init(&app->spare_ports); + nxt_queue_init(&app->idle_ports); + nxt_queue_init(&app->requests); + nxt_queue_init(&app->pending); - app->type = lang->type; - app->max_processes = apcf.max_processes; - app->spare_processes = apcf.spare_processes; - app->max_pending_processes = apcf.spare_processes - ? apcf.spare_processes : 1; - app->timeout = apcf.timeout; - app->res_timeout = apcf.res_timeout * 1000000; - app->idle_timeout = apcf.idle_timeout; - app->max_pending_responses = 2; - app->max_requests = apcf.requests; + app->name.length = name.length; + nxt_memcpy(app->name.start, name.start, name.length); - engine = task->thread->engine; + app->type = lang->type; + app->max_processes = apcf.max_processes; + app->spare_processes = apcf.spare_processes; + app->max_pending_processes = apcf.spare_processes + ? apcf.spare_processes : 1; + app->timeout = apcf.timeout; + app->res_timeout = apcf.res_timeout * 1000000; + app->idle_timeout = apcf.idle_timeout; + app->max_pending_responses = 2; + app->max_requests = apcf.requests; - app->engine = engine; + engine = task->thread->engine; - app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; - app->adjust_idle_work.task = &engine->task; - app->adjust_idle_work.obj = app; + app->engine = engine; - nxt_queue_insert_tail(&tmcf->apps, &app->link); + app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; + app->adjust_idle_work.task = &engine->task; + app->adjust_idle_work.obj = app; - nxt_router_app_use(task, app, 1); + nxt_queue_insert_tail(&tmcf->apps, &app->link); - app->joint = app_joint; + nxt_router_app_use(task, app, 1); - app_joint->use_count = 1; - app_joint->app = app; + app->joint = app_joint; - app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; - app_joint->idle_timer.work_queue = &engine->fast_work_queue; - app_joint->idle_timer.handler = nxt_router_app_idle_timeout; - app_joint->idle_timer.task = &engine->task; - app_joint->idle_timer.log = app_joint->idle_timer.task->log; + app_joint->use_count = 1; + app_joint->app = app; - app_joint->free_app_work.handler = nxt_router_free_app; - app_joint->free_app_work.task = &engine->task; - app_joint->free_app_work.obj = app_joint; + app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; + app_joint->idle_timer.work_queue = &engine->fast_work_queue; + app_joint->idle_timer.handler = nxt_router_app_idle_timeout; + app_joint->idle_timer.task = &engine->task; + app_joint->idle_timer.log = app_joint->idle_timer.task->log; + + app_joint->free_app_work.handler = nxt_router_free_app; + app_joint->free_app_work.task = &engine->task; + app_joint->free_app_work.obj = app_joint; + } } routes_conf = nxt_conf_get_path(conf, &routes_path); @@ -1622,87 +1630,102 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } #endif - listeners = nxt_conf_get_path(conf, &listeners_path); - if (listeners == NULL) { - nxt_alert(task, "no \"listeners\" block"); - return NXT_ERROR; - } + websocket = nxt_conf_get_path(conf, &websocket_path); - next = 0; + listeners = nxt_conf_get_path(conf, &listeners_path); - for ( ;; ) { - listener = nxt_conf_next_object_member(listeners, &name, &next); - if (listener == NULL) { - break; - } + if (listeners != NULL) { + next = 0; - skcf = nxt_router_socket_conf(task, tmcf, &name); - if (skcf == NULL) { - goto fail; - } + for ( ;; ) { + listener = nxt_conf_next_object_member(listeners, &name, &next); + if (listener == NULL) { + break; + } - nxt_memzero(&lscf, sizeof(lscf)); + skcf = nxt_router_socket_conf(task, tmcf, &name); + if (skcf == NULL) { + goto fail; + } - ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, - nxt_nitems(nxt_router_listener_conf), &lscf); - if (ret != NXT_OK) { - nxt_alert(task, "listener map error"); - goto fail; - } + nxt_memzero(&lscf, sizeof(lscf)); - nxt_debug(task, "application: %V", &lscf.application); - - // STUB, default values if http block is not defined. - skcf->header_buffer_size = 2048; - skcf->large_header_buffer_size = 8192; - skcf->large_header_buffers = 4; - skcf->body_buffer_size = 16 * 1024; - skcf->max_body_size = 8 * 1024 * 1024; - skcf->idle_timeout = 180 * 1000; - skcf->header_read_timeout = 30 * 1000; - skcf->body_read_timeout = 30 * 1000; - skcf->send_timeout = 30 * 1000; - - if (http != NULL) { - ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, - nxt_nitems(nxt_router_http_conf), skcf); + ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf, + nxt_nitems(nxt_router_listener_conf), + &lscf); if (ret != NXT_OK) { - nxt_alert(task, "http map error"); + nxt_alert(task, "listener map error"); goto fail; } - } -#if (NXT_TLS) + nxt_debug(task, "application: %V", &lscf.application); + + // STUB, default values if http block is not defined. + skcf->header_buffer_size = 2048; + skcf->large_header_buffer_size = 8192; + skcf->large_header_buffers = 4; + skcf->body_buffer_size = 16 * 1024; + skcf->max_body_size = 8 * 1024 * 1024; + skcf->idle_timeout = 180 * 1000; + skcf->header_read_timeout = 30 * 1000; + skcf->body_read_timeout = 30 * 1000; + skcf->send_timeout = 30 * 1000; + + skcf->websocket_conf.max_frame_size = 1024 * 1024; + skcf->websocket_conf.read_timeout = 60 * 1000; + skcf->websocket_conf.keepalive_interval = 30 * 1000; + + if (http != NULL) { + ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, + nxt_nitems(nxt_router_http_conf), + skcf); + if (ret != NXT_OK) { + nxt_alert(task, "http map error"); + goto fail; + } + } - value = nxt_conf_get_path(listener, &certificate_path); + if (websocket != NULL) { + ret = nxt_conf_map_object(mp, websocket, + nxt_router_websocket_conf, + nxt_nitems(nxt_router_websocket_conf), + &skcf->websocket_conf); + if (ret != NXT_OK) { + nxt_alert(task, "websocket map error"); + goto fail; + } + } - if (value != NULL) { - nxt_conf_get_string(value, &name); +#if (NXT_TLS) + value = nxt_conf_get_path(listener, &certificate_path); - tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); - if (nxt_slow_path(tls == NULL)) { - goto fail; - } + if (value != NULL) { + nxt_conf_get_string(value, &name); - tls->name = name; - tls->conf = skcf; + tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t)); + if (nxt_slow_path(tls == NULL)) { + goto fail; + } - nxt_queue_insert_tail(&tmcf->tls, &tls->link); - } + tls->name = name; + tls->conf = skcf; + nxt_queue_insert_tail(&tmcf->tls, &tls->link); + } #endif - skcf->listen->handler = nxt_http_conn_init; - skcf->router_conf = tmcf->router_conf; - skcf->router_conf->count++; + skcf->listen->handler = nxt_http_conn_init; + skcf->router_conf = tmcf->router_conf; + skcf->router_conf->count++; - if (lscf.pass.length != 0) { - skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); + if (lscf.pass.length != 0) { + skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); - /* COMPATIBILITY: listener application. */ - } else if (lscf.application.length > 0) { - skcf->pass = nxt_http_pass_application(task, tmcf, - &lscf.application); + /* COMPATIBILITY: listener application. */ + } else if (lscf.application.length > 0) { + skcf->pass = nxt_http_pass_application(task, tmcf, + &lscf.application); + } } } @@ -3058,7 +3081,7 @@ nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, *p++ = ' '; - bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto); + bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto); p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes); @@ -3382,28 +3405,30 @@ static void nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t ret; - nxt_buf_t *b; - nxt_unit_field_t *f; - nxt_http_field_t *field; - nxt_http_request_t *r; - nxt_req_conn_link_t *rc; - nxt_unit_response_t *resp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_port_t *app_port; + nxt_unit_field_t *f; + nxt_http_field_t *field; + nxt_http_request_t *r; + nxt_unit_response_t *resp; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; b = msg->buf; - rc = data; + req_rpc_data = data; if (msg->size == 0) { b = NULL; } - r = rc->request; + r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; } if (r->error) { - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); return; } @@ -3412,13 +3437,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, nxt_http_buf_last(r)); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (rc->app != NULL && rc->app->timeout != 0) { + if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; - r->timer_data = rc; - nxt_timer_add(task->thread->engine, &r->timer, rc->app->timeout); + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, + req_rpc_data->app->timeout); } } @@ -3448,7 +3474,13 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, goto fail; } + field = NULL; + for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { + if (f->skip) { + continue; + } + field = nxt_list_add(r->resp.fields); if (nxt_slow_path(field == NULL)) { @@ -3456,26 +3488,30 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } field->hash = f->hash; - field->skip = f->skip; + field->skip = 0; field->name_length = f->name_length; field->value_length = f->value_length; field->name = nxt_unit_sptr_get(&f->name); field->value = nxt_unit_sptr_get(&f->value); - nxt_debug(task, "header: %*s: %*s", + ret = nxt_http_field_process(field, &nxt_response_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + nxt_debug(task, "header%s: %*s: %*s", + (field->skip ? " skipped" : ""), (size_t) field->name_length, field->name, (size_t) field->value_length, field->value); + + if (field->skip) { + r->resp.fields->last->nelts--; + } } r->status = resp->status; - ret = nxt_http_fields_process(r->resp.fields, - &nxt_response_fields_hash, r); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - if (resp->piggyback_content_length != 0) { b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); b->mem.free = b->mem.pos + resp->piggyback_content_length; @@ -3495,9 +3531,55 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&r->out, b); } - r->state = &nxt_http_request_send_state; - nxt_http_request_header_send(task, r); + + if (r->websocket_handshake + && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) + { + req_app_link = nxt_request_app_link_alloc(task, + req_rpc_data->req_app_link, + req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { + goto fail; + } + + app_port = req_app_link->app_port; + + if (app_port == NULL && req_rpc_data->app_port != NULL) { + req_app_link->app_port = req_rpc_data->app_port; + app_port = req_app_link->app_port; + req_app_link->apr_action = req_rpc_data->apr_action; + + req_rpc_data->app_port = NULL; + } + + if (nxt_slow_path(app_port == NULL)) { + goto fail; + } + + nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + + nxt_queue_insert_tail(&app_port->active_websockets, + &req_app_link->link_port_websockets); + + nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + + nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); + req_app_link->apr_action = NXT_APR_CLOSE; + + nxt_debug(task, "req_app_link stream #%uD upgrade", + req_app_link->stream); + + r->state = &nxt_http_websocket; + + } else { + r->state = &nxt_http_request_send_state; + } + + if (r->out) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_http_request_send_body, task, r, NULL); + } } return; @@ -3506,14 +3588,13 @@ fail: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } static const nxt_http_request_state_t nxt_http_request_send_state nxt_aligned(64) = { - .ready_handler = nxt_http_request_send_body, .error_handler = nxt_http_request_error_handler, }; @@ -3539,36 +3620,37 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_req_app_link_t *ra; - nxt_req_conn_link_t *rc; - - rc = data; + nxt_int_t res; + nxt_port_t *port; + nxt_bool_t cancelled; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; - ra = rc->ra; + req_rpc_data = data; - if (ra != NULL) { - cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + req_app_link = req_rpc_data->req_app_link; + if (req_app_link != NULL) { + cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, + req_app_link->stream); if (cancelled) { - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - res = nxt_router_app_port(task, rc->app, ra); + res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); if (res == NXT_OK) { - port = ra->app_port; + port = req_app_link->app_port; if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra"); + nxt_log(task, NXT_LOG_ERR, + "port is NULL in cancelled req_app_link"); return; } - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, - port->pid); + nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, + req_rpc_data, port->pid); - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } msg->port_msg.last = 0; @@ -3577,12 +3659,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } } - if (rc->request != NULL) { - nxt_http_request_error(task, rc->request, + if (req_rpc_data->request != NULL) { + nxt_http_request_error(task, req_rpc_data->request, NXT_HTTP_SERVICE_UNAVAILABLE); } - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } @@ -3626,7 +3708,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_port_release(task, port, 0, 0); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } @@ -3634,10 +3716,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *lnk; + nxt_request_app_link_t *req_app_link; app_joint = data; @@ -3666,20 +3748,22 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); + req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_requests); } else { - ra = NULL; + req_app_link = NULL; } nxt_thread_mutex_unlock(&app->mutex); - if (ra != NULL) { + if (req_app_link != NULL) { nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, ra->stream); + &app->name, app, req_app_link->stream); - nxt_router_ra_error(ra, 500, "Failed to start application process"); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_error(req_app_link, 500, + "Failed to start application process"); + nxt_request_app_link_use(task, req_app_link, -1); } } @@ -3813,9 +3897,9 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { - nxt_req_app_link_t *ra; + nxt_request_app_link_t *req_app_link; - ra = data; + req_app_link = data; #if (NXT_DEBUG) { @@ -3824,39 +3908,66 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) app = obj; nxt_assert(app != NULL); - nxt_assert(ra != NULL); - nxt_assert(ra->app_port != NULL); + nxt_assert(req_app_link != NULL); + nxt_assert(req_app_link->app_port != NULL); nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, ra->stream); + &app->name, app, req_app_link->stream); } #endif - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - uint32_t request_failed, uint32_t got_response) + nxt_apr_action_t action) { + int inc_use; + uint32_t dec_pending, got_response; nxt_app_t *app; nxt_bool_t port_unchained; nxt_bool_t send_quit, cancelled, adjust_idle_timer; nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra, *pending_ra, *re_ra; + nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; nxt_port_select_state_t state; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - ra = NULL; + req_app_link = NULL; app = port->app; + inc_use = 0; + dec_pending = 0; + got_response = 0; + + switch (action) { + case NXT_APR_NEW_PORT: + break; + case NXT_APR_REQUEST_FAILED: + dec_pending = 1; + inc_use = -1; + break; + case NXT_APR_GOT_RESPONSE: + dec_pending = 1; + got_response = 1; + inc_use = -1; + break; + case NXT_APR_UPGRADE: + dec_pending = 1; + got_response = 1; + break; + case NXT_APR_CLOSE: + inc_use = -1; + break; + } + nxt_thread_mutex_lock(&app->mutex); - port->app_pending_responses -= request_failed + got_response; + port->app_pending_responses -= dec_pending; port->app_responses += got_response; if (port->pair[1] != -1 @@ -3893,24 +4004,25 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); + req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_requests); - ra->app_port = nxt_router_pop_first_port(app); + req_app_link->app_port = nxt_router_pop_first_port(app); - if (ra->app_port->app_pending_responses > 1) { - nxt_router_ra_pending(task, app, ra); + if (req_app_link->app_port->app_pending_responses > 1) { + nxt_request_app_link_pending(task, app, req_app_link); } } /* Pop first pending request for this port. */ - if ((request_failed > 0 || got_response > 0) + if (dec_pending > 0 && !nxt_queue_is_empty(&port->pending_requests)) { lnk = nxt_queue_first(&port->pending_requests); nxt_queue_remove(lnk); lnk->next = NULL; - pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, link_port_pending); nxt_assert(pending_ra->link_app_pending.next != NULL); @@ -3926,7 +4038,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { lnk = nxt_queue_first(&app->pending); - re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); + re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, + link_app_pending); if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { @@ -3937,9 +4050,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, re_ra->stream); if (cancelled) { - nxt_router_ra_inc_use(re_ra); + nxt_request_app_link_inc_use(re_ra); - state.ra = re_ra; + state.req_app_link = re_ra; state.app = app; nxt_router_port_select(task, &state); @@ -3969,9 +4082,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) { - nxt_assert(port->idle_link.next == NULL); - + if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 + && nxt_queue_is_empty(&port->active_websockets) + && port->idle_link.next == NULL) + { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) { @@ -4000,7 +4114,7 @@ re_ra_cancelled: } if (pending_ra != NULL) { - nxt_router_ra_use(task, pending_ra, -1); + nxt_request_app_link_use(task, pending_ra, -1); } if (re_ra != NULL) { @@ -4011,12 +4125,12 @@ re_ra_cancelled: } } - if (ra != NULL) { - nxt_router_ra_use(task, ra, -1); + if (req_app_link != NULL) { + nxt_request_app_link_use(task, req_app_link, -1); nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, - &task->thread->engine->task, app, ra); + &task->thread->engine->task, app, req_app_link); goto adjust_use; } @@ -4048,9 +4162,7 @@ re_ra_cancelled: adjust_use: - if (request_failed > 0 || got_response > 0) { - nxt_port_use(task, port, -1); - } + nxt_port_use(task, port, inc_use); } @@ -4267,33 +4379,33 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_bool_t can_start_process; + nxt_request_app_link_t *req_app_link; - ra = state->ra; + req_app_link = state->req_app_link; app = state->app; state->failed_port_use_delta = 0; - if (nxt_queue_chk_remove(&ra->link_app_requests)) + if (nxt_queue_chk_remove(&req_app_link->link_app_requests)) { - nxt_router_ra_dec_use(ra); + nxt_request_app_link_dec_use(req_app_link); } - if (nxt_queue_chk_remove(&ra->link_port_pending)) + if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) { - nxt_assert(ra->link_app_pending.next != NULL); + nxt_assert(req_app_link->link_app_pending.next != NULL); - nxt_queue_remove(&ra->link_app_pending); - ra->link_app_pending.next = NULL; + nxt_queue_remove(&req_app_link->link_app_pending); + req_app_link->link_app_pending.next = NULL; - nxt_router_ra_dec_use(ra); + nxt_request_app_link_dec_use(req_app_link); } - state->failed_port = ra->app_port; + state->failed_port = req_app_link->app_port; - if (ra->app_port != NULL) { + if (req_app_link->app_port != NULL) { state->failed_port_use_delta--; state->failed_port->app_pending_responses--; @@ -4302,7 +4414,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) state->failed_port_use_delta--; } - ra->app_port = NULL; + req_app_link->app_port = NULL; } can_start_process = nxt_router_app_can_start(app); @@ -4313,22 +4425,25 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) if (nxt_queue_is_empty(&app->ports) || (can_start_process && nxt_router_app_first_port_busy(app)) ) { - ra = nxt_router_ra_create(task, ra); - - if (nxt_slow_path(ra == NULL)) { + req_app_link = nxt_request_app_link_alloc(task, req_app_link, + req_app_link->req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { goto fail; } if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, &ra->link_app_requests); + nxt_queue_insert_head(&app->requests, + &req_app_link->link_app_requests); } else { - nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + nxt_queue_insert_tail(&app->requests, + &req_app_link->link_app_requests); } - nxt_router_ra_inc_use(ra); + nxt_request_app_link_inc_use(req_app_link); - nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); + nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", + req_app_link->stream); if (can_start_process) { app->pending_processes++; @@ -4339,15 +4454,15 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) state->port = nxt_router_pop_first_port(app); if (state->port->app_pending_responses > 1) { - ra = nxt_router_ra_create(task, ra); - - if (nxt_slow_path(ra == NULL)) { + req_app_link = nxt_request_app_link_alloc(task, req_app_link, + req_app_link->req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { goto fail; } - ra->app_port = state->port; + req_app_link->app_port = state->port; - nxt_router_ra_pending(task, app, ra); + nxt_request_app_link_pending(task, app, req_app_link); } if (can_start_process && nxt_router_app_need_start(app)) { @@ -4358,32 +4473,32 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) fail: - state->shared_ra = ra; + state->shared_ra = req_app_link; } static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) { - nxt_int_t res; - nxt_app_t *app; - nxt_req_app_link_t *ra; + nxt_int_t res; + nxt_app_t *app; + nxt_request_app_link_t *req_app_link; - ra = state->shared_ra; + req_app_link = state->shared_ra; app = state->app; if (state->failed_port_use_delta != 0) { nxt_port_use(task, state->failed_port, state->failed_port_use_delta); } - if (nxt_slow_path(ra == NULL)) { + if (nxt_slow_path(req_app_link == NULL)) { if (state->port != NULL) { nxt_port_use(task, state->port, -1); } - nxt_router_ra_error(state->ra, 500, + nxt_request_app_link_error(state->req_app_link, 500, "Failed to allocate shared req<->app link"); - nxt_router_ra_use(task, state->ra, -1); + nxt_request_app_link_use(task, state->req_app_link, -1); return NXT_ERROR; } @@ -4391,7 +4506,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) if (state->port != NULL) { nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); - ra->app_port = state->port; + req_app_link->app_port = state->port; if (state->start_process) { nxt_router_start_app_process(task, app); @@ -4410,8 +4525,9 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) res = nxt_router_start_app_process(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, "Failed to start app process"); - nxt_router_ra_use(task, ra, -1); + nxt_request_app_link_error(req_app_link, 500, + "Failed to start app process"); + nxt_request_app_link_use(task, req_app_link, -1); return NXT_ERROR; } @@ -4421,11 +4537,12 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, + nxt_request_app_link_t *req_app_link) { nxt_port_select_state_t state; - state.ra = ra; + state.req_app_link = req_app_link; state.app = app; nxt_thread_mutex_lock(&app->mutex); @@ -4442,48 +4559,48 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; + nxt_int_t res; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_request_app_link_t ra_local, *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; - rc = nxt_port_rpc_register_handler_ex(task, engine->port, + req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, nxt_router_response_ready_handler, nxt_router_response_error_handler, - sizeof(nxt_req_conn_link_t)); - - if (nxt_slow_path(rc == NULL)) { + sizeof(nxt_request_rpc_data_t)); + if (nxt_slow_path(req_rpc_data == NULL)) { nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } - rc->stream = nxt_port_rpc_ex_stream(rc); - rc->app = app; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); + req_rpc_data->app = app; nxt_router_app_use(task, app, 1); - rc->request = r; + req_rpc_data->request = r; + r->req_rpc_data = req_rpc_data; - ra = &ra_local; - nxt_router_ra_init(task, ra, rc); + req_app_link = &ra_local; + nxt_request_app_link_init(task, req_app_link, req_rpc_data); - res = nxt_router_app_port(task, app, ra); + res = nxt_router_app_port(task, app, req_app_link); if (res != NXT_OK) { return; } - ra = rc->ra; - port = ra->app_port; + req_app_link = req_rpc_data->req_app_link; + port = req_app_link->app_port; nxt_assert(port != NULL); - nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); - nxt_router_app_prepare_request(task, ra); + nxt_router_app_prepare_request(task, req_app_link); } @@ -4494,19 +4611,20 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void -nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_router_app_prepare_request(nxt_task_t *task, + nxt_request_app_link_t *req_app_link) { - uint32_t request_failed; - nxt_buf_t *buf; - nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_buf_t *buf; + nxt_int_t res; + nxt_port_t *port, *c_port, *reply_port; + nxt_apr_action_t apr_action; - nxt_assert(ra->app_port != NULL); + nxt_assert(req_app_link->app_port != NULL); - port = ra->app_port; - reply_port = ra->reply_port; + port = req_app_link->app_port; + reply_port = req_app_link->reply_port; - request_failed = 1; + apr_action = NXT_APR_REQUEST_FAILED; c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); @@ -4514,7 +4632,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, + nxt_request_app_link_error(req_app_link, 500, "Failed to send reply port to application"); goto release_port; } @@ -4522,11 +4640,11 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_process_connected_port_add(port->process, reply_port); } - buf = nxt_router_prepare_msg(task, ra->request, port, + buf = nxt_router_prepare_msg(task, req_app_link->request, port, nxt_app_msg_prefix[port->app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_router_ra_error(ra, 500, + nxt_request_app_link_error(req_app_link, 500, "Failed to prepare message for application"); goto release_port; } @@ -4535,40 +4653,41 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_buf_used_size(buf), port->socket.fd); - request_failed = 0; + apr_action = NXT_APR_NEW_PORT; - ra->msg_info.buf = buf; - ra->msg_info.completion_handler = buf->completion_handler; + req_app_link->msg_info.buf = buf; + req_app_link->msg_info.completion_handler = buf->completion_handler; for (; buf; buf = buf->next) { buf->completion_handler = nxt_router_dummy_buf_completion; } - buf = ra->msg_info.buf; + buf = req_app_link->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, - ra->stream); + res = nxt_port_mmap_get_tracking(task, port, + &req_app_link->msg_info.tracking, + req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, - "Failed to get tracking area"); + nxt_request_app_link_error(req_app_link, 500, + "Failed to get tracking area"); goto release_port; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA, - -1, ra->stream, reply_port->id, buf, - &ra->msg_info.tracking); + res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, + -1, req_app_link->stream, reply_port->id, buf, + &req_app_link->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, - "Failed to send message to application"); + nxt_request_app_link_error(req_app_link, 500, + "Failed to send message to application"); goto release_port; } release_port: - nxt_router_app_port_release(task, port, request_failed, 0); + nxt_router_app_port_release(task, port, apr_action); - nxt_router_ra_update_peer(task, ra); + nxt_request_app_link_update_peer(task, req_app_link); } @@ -4704,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, *p++ = '\0'; req->tls = (r->tls != NULL); + req->websocket_handshake = r->websocket_handshake; req->server_name_length = r->server_name.length; nxt_unit_sptr_set(&req->server_name, p); @@ -4899,8 +5019,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_timer_t *timer; nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_req_app_link_t *pending_ra; - nxt_req_conn_link_t *rc; + nxt_request_app_link_t *pending_ra; + nxt_request_rpc_data_t *req_rpc_data; nxt_port_select_state_t state; timer = obj; @@ -4908,8 +5028,8 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router app timeout"); r = nxt_timer_data(timer, nxt_http_request_t, timer); - rc = r->timer_data; - app = rc->app; + req_rpc_data = r->timer_data; + app = req_rpc_data->app; if (app == NULL) { goto generate_error; @@ -4918,14 +5038,16 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) port = NULL; pending_ra = NULL; - if (rc->app_port != NULL) { - port = rc->app_port; - rc->app_port = NULL; + if (req_rpc_data->app_port != NULL) { + port = req_rpc_data->app_port; + req_rpc_data->app_port = NULL; } - if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) { - port = rc->ra->app_port; - rc->ra->app_port = NULL; + if (port == NULL && req_rpc_data->req_app_link != NULL + && req_rpc_data->req_app_link->app_port != NULL) + { + port = req_rpc_data->req_app_link->app_port; + req_rpc_data->req_app_link->app_port = NULL; } if (port == NULL) { @@ -4939,7 +5061,7 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) if (!nxt_queue_is_empty(&port->pending_requests)) { lnk = nxt_queue_first(&port->pending_requests); - pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, link_port_pending); nxt_assert(pending_ra->link_app_pending.next != NULL); @@ -4951,9 +5073,9 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) pending_ra->stream); if (cancelled) { - nxt_router_ra_inc_use(pending_ra); + nxt_request_app_link_inc_use(pending_ra); - state.ra = pending_ra; + state.req_app_link = pending_ra; state.app = app; nxt_router_port_select(task, &state); @@ -4981,7 +5103,7 @@ generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); - nxt_router_rc_unlink(task, rc); + nxt_request_rpc_data_unlink(task, req_rpc_data); } |