diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:15 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:15 +0300 |
commit | 83595606121a821f9e3cef0f0b7e7fe87eb1e50a (patch) | |
tree | 2374867dd2f69654a71e95b7abec3fdad13ffd1a | |
parent | 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 (diff) | |
download | unit-83595606121a821f9e3cef0f0b7e7fe87eb1e50a.tar.gz unit-83595606121a821f9e3cef0f0b7e7fe87eb1e50a.tar.bz2 |
Introducing the shared application port.
This is the port shared between all application processes which use it to pass
requests for processing. Using it significantly simplifies the request
processing code in the router. The drawback is 2 more file descriptors per each
configured application and more complex libunit message wait/read code.
Diffstat (limited to '')
-rw-r--r-- | go/nxt_cgo_lib.c | 11 | ||||
-rw-r--r-- | go/nxt_cgo_lib.h | 2 | ||||
-rw-r--r-- | go/port.go | 8 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 22 | ||||
-rw-r--r-- | src/nxt_http_websocket.c | 32 | ||||
-rw-r--r-- | src/nxt_port.c | 2 | ||||
-rw-r--r-- | src/nxt_port.h | 11 | ||||
-rw-r--r-- | src/nxt_router.c | 1504 | ||||
-rw-r--r-- | src/nxt_router.h | 14 | ||||
-rw-r--r-- | src/nxt_router_request.h | 37 | ||||
-rw-r--r-- | src/nxt_unit.c | 467 | ||||
-rw-r--r-- | src/nxt_unit.h | 13 |
12 files changed, 894 insertions, 1229 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c index 937996b0..f7171f55 100644 --- a/go/nxt_cgo_lib.c +++ b/go/nxt_cgo_lib.c @@ -44,7 +44,7 @@ nxt_cgo_run(uintptr_t handler) return NXT_UNIT_ERROR; } - rc = nxt_unit_run(ctx); + rc = nxt_unit_run_ctx(ctx); nxt_unit_done(ctx); @@ -105,7 +105,7 @@ nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) static int nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - nxt_go_add_port(port->id.pid, port->id.id, + nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id, port->in_fd, port->out_fd); port->in_fd = -1; @@ -204,6 +204,13 @@ nxt_cgo_request_done(uintptr_t req, int res) void +nxt_cgo_unit_run_shared(uintptr_t ctx) +{ + nxt_unit_run_shared((nxt_unit_ctx_t *) ctx); +} + + +void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) { nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h index 5317380b..fa515be5 100644 --- a/go/nxt_cgo_lib.h +++ b/go/nxt_cgo_lib.h @@ -35,6 +35,8 @@ int nxt_cgo_request_close(uintptr_t req); void nxt_cgo_request_done(uintptr_t req, int res); +void nxt_cgo_unit_run_shared(uintptr_t ctx); + void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); #endif /* _NXT_CGO_LIB_H_INCLUDED_ */ @@ -93,7 +93,7 @@ func getUnixConn(fd int) *net.UnixConn { } //export nxt_go_add_port -func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { +func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) { p := &port{ key: port_key{ pid: int(pid), @@ -104,6 +104,12 @@ func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { } add_port(p) + + if id == 65535 { + go func(ctx C.uintptr_t) { + C.nxt_cgo_unit_run_shared(ctx); + }(ctx) + } } //export nxt_go_remove_port diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 468acf96..1ee5b742 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -20,7 +20,7 @@ napi_ref Unit::constructor_; struct port_data_t { nxt_unit_ctx_t *ctx; - nxt_unit_port_id_t port_id; + nxt_unit_port_t *port; uv_poll_t poll; }; @@ -351,7 +351,11 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { - nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); + port_data_t *data; + + data = (port_data_t *) handle->data; + + nxt_unit_process_port_msg(data->ctx, data->port); } @@ -396,21 +400,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->data = data; data->ctx = ctx; - data->port_id = port->id; - data->poll.data = ctx; + data->port = port; + data->poll.data = data; } return NXT_UNIT_OK; } -inline bool -operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) -{ - return p1.pid == p2.pid && p1.id == p2.id; -} - - void Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) { @@ -419,10 +416,9 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) if (port->data != NULL) { data = (port_data_t *) port->data; - if (data->port_id == port->id) { + if (data->port == port) { uv_poll_stop(&data->poll); - data->poll.data = data; uv_close((uv_handle_t *) &data->poll, delete_port_data); } } diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index 4d31b320..393c20ac 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -33,15 +33,13 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) nxt_buf_t *out, *buf, **out_tail, *b, *next; nxt_int_t res; nxt_http_request_t *r; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; nxt_websocket_header_t *wsh; r = obj; + req_rpc_data = r->req_rpc_data; - if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL - || (req_app_link = req_rpc_data->req_app_link) == NULL)) - { + if (nxt_slow_path(req_rpc_data == NULL)) { nxt_debug(task, "websocket client frame for destroyed request"); return; @@ -69,8 +67,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) if (buf == NULL || buf_free_size == 0) { buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, - &req_app_link->app_port->process->outgoing, + buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing, buf_free_size); *out_tail = buf; @@ -101,10 +98,10 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) b = next; } - res = nxt_port_socket_twrite(task, req_app_link->app_port, + res = nxt_port_socket_twrite(task, req_rpc_data->app_port, NXT_PORT_MSG_WEBSOCKET, -1, - req_app_link->stream, - req_app_link->reply_port->id, out, NULL); + req_rpc_data->stream, + task->thread->engine->port->id, out, NULL); if (nxt_slow_path(res != NXT_OK)) { // TODO: handle } @@ -130,32 +127,27 @@ static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; nxt_debug(task, "http websocket error handler"); r = obj; + req_rpc_data = r->req_rpc_data; - if ((req_rpc_data = r->req_rpc_data) == NULL) { + if (req_rpc_data == NULL) { nxt_debug(task, " req_rpc_data is NULL"); goto close_handler; } - if ((req_app_link = req_rpc_data->req_app_link) == NULL) { - nxt_debug(task, " req_app_link is NULL"); - goto close_handler; - } - - if (req_app_link->app_port == NULL) { + if (req_rpc_data->app_port == NULL) { nxt_debug(task, " app_port is NULL"); goto close_handler; } - (void) nxt_port_socket_twrite(task, req_app_link->app_port, + (void) nxt_port_socket_twrite(task, req_rpc_data->app_port, NXT_PORT_MSG_WEBSOCKET_LAST, - -1, req_app_link->stream, - req_app_link->reply_port->id, NULL, NULL); + -1, req_rpc_data->stream, + task->thread->engine->port->id, NULL, NULL); close_handler: diff --git a/src/nxt_port.c b/src/nxt_port.c index 7232c465..54434d70 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -67,8 +67,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); - nxt_queue_init(&port->pending_requests); - nxt_queue_init(&port->active_websockets); } else { nxt_mp_destroy(mp); diff --git a/src/nxt_port.h b/src/nxt_port.h index 3a8a200a..9a933e75 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -41,6 +41,7 @@ struct nxt_port_handlers_s { /* Request headers. */ nxt_port_handler_t req_headers; + nxt_port_handler_t req_headers_ack; /* Websocket frame. */ nxt_port_handler_t websocket_frame; @@ -89,6 +90,7 @@ typedef enum { _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), + _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack), _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), @@ -113,7 +115,8 @@ typedef enum { NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT), NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT), NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP) - | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, + | NXT_PORT_MSG_SYNC, + NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP), NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED), NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY), @@ -193,6 +196,7 @@ struct nxt_port_s { nxt_queue_link_t app_link; /* for nxt_app_t.ports */ nxt_app_t *app; + nxt_port_t *main_app_port; nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */ nxt_msec_t idle_start; @@ -205,11 +209,10 @@ struct nxt_port_s { /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_pending_responses; uint32_t app_responses; - nxt_queue_t pending_requests; - nxt_queue_t active_websockets; + uint32_t active_websockets; + uint32_t active_requests; nxt_port_handler_t handler; nxt_port_handler_t *data; diff --git a/src/nxt_router.c b/src/nxt_router.c index 4df1489d..44b303e4 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -61,59 +61,13 @@ typedef struct { } nxt_app_rpc_t; -struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - nxt_port_t *failed_port; - int failed_port_use_delta; - - uint8_t start_process; /* 1 bit */ - nxt_request_app_link_t *shared_ra; - nxt_port_t *port; -}; - -typedef struct nxt_port_select_state_s nxt_port_select_state_t; - static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); -static void nxt_router_port_select(nxt_task_t *task, - nxt_port_select_state_t *state); - -static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, - nxt_port_select_state_t *state); - static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); -static void nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); - - -nxt_inline void -nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) -{ - nxt_atomic_fetch_add(&req_app_link->use_count, 1); -} - -nxt_inline void -nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) -{ -#if (NXT_DEBUG) - int c; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - nxt_assert((c + i) > 0); -#else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); -#endif -} - -static void nxt_request_app_link_use(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, int i); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -196,6 +150,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf); @@ -220,6 +176,8 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, + nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -227,13 +185,15 @@ static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link); +static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, + void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); + nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, - nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); + nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, @@ -250,7 +210,7 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i); -static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, +static void nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); @@ -501,83 +461,6 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) } -nxt_inline void -nxt_request_app_link_init(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_buf_t *body; - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - - req_app_link->stream = req_rpc_data->stream; - req_app_link->use_count = 1; - req_app_link->req_rpc_data = req_rpc_data; - req_rpc_data->req_app_link = req_app_link; - req_app_link->reply_port = engine->port; - req_app_link->request = req_rpc_data->request; - req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - - req_app_link->work.handler = NULL; - req_app_link->work.task = &engine->task; - req_app_link->work.obj = req_app_link; - req_app_link->work.data = engine; - - body = req_rpc_data->request->body; - - if (body != NULL && nxt_buf_is_file(body)) { - req_app_link->body_fd = body->file->fd; - - body->file->fd = -1; - - } else { - req_app_link->body_fd = -1; - } -} - - -nxt_inline nxt_request_app_link_t * -nxt_request_app_link_alloc(nxt_task_t *task, - nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_mp_t *mp; - nxt_request_app_link_t *req_app_link; - - if (ra_src != NULL && ra_src->mem_pool != NULL) { - return ra_src; - } - - mp = req_rpc_data->request->mem_pool; - - req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - - if (nxt_slow_path(req_app_link == NULL)) { - - req_rpc_data->req_app_link = NULL; - - if (ra_src != NULL) { - ra_src->req_rpc_data = NULL; - } - - return NULL; - } - - nxt_mp_retain(mp); - - nxt_request_app_link_init(task, req_app_link, req_rpc_data); - - if (ra_src != NULL) { - req_app_link->body_fd = ra_src->body_fd; - } - - req_app_link->mem_pool = mp; - - return req_app_link; -} - - nxt_inline nxt_bool_t nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, uint32_t stream) @@ -614,198 +497,6 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, } -static void -nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, - void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_request_app_link_update_peer(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_event_engine_t *engine; - nxt_request_rpc_data_t *req_rpc_data; - - engine = req_app_link->work.data; - - if (task->thread->engine != engine) { - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_update_peer_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post update peer to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - - return; - } - - nxt_debug(task, "req_app_link stream #%uD update peer", - req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL && req_app_link->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, - req_app_link->app_port->pid); - } -} - - -static void -nxt_request_app_link_release(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_mp_t *mp; - nxt_http_request_t *r; - nxt_request_rpc_data_t *req_rpc_data; - - nxt_assert(task->thread->engine == req_app_link->work.data); - nxt_assert(req_app_link->use_count == 0); - - nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL) { - if (nxt_slow_path(req_app_link->err_code != 0)) { - nxt_http_request_error(task, req_rpc_data->request, - req_app_link->err_code); - - } else { - req_rpc_data->app_port = req_app_link->app_port; - req_rpc_data->apr_action = req_app_link->apr_action; - req_rpc_data->msg_info = req_app_link->msg_info; - - if (req_rpc_data->app->timeout != 0) { - r = req_rpc_data->request; - - r->timer.handler = nxt_router_app_timeout; - r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); - } - - req_app_link->app_port = NULL; - req_app_link->msg_info.buf = NULL; - } - - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - } - - if (req_app_link->app_port != NULL) { - nxt_router_app_port_release(task, req_app_link->app_port, - req_app_link->apr_action); - - req_app_link->app_port = NULL; - } - - if (req_app_link->body_fd != -1) { - nxt_fd_close(req_app_link->body_fd); - - req_app_link->body_fd = -1; - } - - nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); - - mp = req_app_link->mem_pool; - - if (mp != NULL) { - nxt_mp_free(mp, req_app_link); - nxt_mp_release(mp); - } -} - - -static void -nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_assert(req_app_link->work.data == data); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, - int i) -{ - int c; - nxt_event_engine_t *engine; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - if (i < 0 && c == -i) { - engine = req_app_link->work.data; - - if (task->thread->engine == engine) { - nxt_request_app_link_release(task, req_app_link); - - return; - } - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_release_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post release to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - } -} - - -nxt_inline void -nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link, const char *str) -{ - req_app_link->app_port = NULL; - req_app_link->err_code = 500; - req_app_link->err_str = str; - - nxt_alert(task, "app \"%V\" internal error: %s on #%uD", - &app->name, str, req_app_link->stream); -} - - -nxt_inline void -nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, - &req_app_link->link_port_pending); - nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->res_time = nxt_thread_monotonic_time(task->thread) - + app->res_timeout; - - nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", - req_app_link->stream); -} - - nxt_inline nxt_bool_t nxt_queue_chk_remove(nxt_queue_link_t *lnk) { @@ -825,8 +516,9 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - int ra_use_delta; - nxt_request_app_link_t *req_app_link; + nxt_http_request_t *r; + + nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -835,53 +527,34 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - - req_app_link = req_rpc_data->req_app_link; - if (req_app_link != NULL) { - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - - ra_use_delta = 0; - - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + if (req_rpc_data->app != NULL) { + nxt_router_app_use(task, req_rpc_data->app, -1); - if (req_app_link->link_app_requests.next == NULL - && req_app_link->link_port_pending.next == NULL - && req_app_link->link_app_pending.next == NULL - && req_app_link->link_port_websockets.next == NULL) - { - req_app_link = NULL; + req_rpc_data->app = NULL; + } - } else { - ra_use_delta -= - nxt_queue_chk_remove(&req_app_link->link_app_requests) - + nxt_queue_chk_remove(&req_app_link->link_port_pending) - + nxt_queue_chk_remove(&req_app_link->link_port_websockets); + r = req_rpc_data->request; - nxt_queue_chk_remove(&req_app_link->link_app_pending); - } + if (r != NULL) { + r->timer_data = NULL; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_router_http_request_release_post(task, r); - if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, ra_use_delta); - } + r->req_rpc_data = NULL; + req_rpc_data->request = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_fd_close(req_rpc_data->msg_info.body_fd); - req_rpc_data->app = NULL; + req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->request != NULL) { - req_rpc_data->request->timer_data = NULL; - - nxt_router_http_request_done(task, req_rpc_data->request); + if (req_rpc_data->rpc_cancel) { + req_rpc_data->rpc_cancel = 0; - req_rpc_data->request->req_rpc_data = NULL; - req_rpc_data->request = NULL; + nxt_port_rpc_cancel(task, task->thread->engine->port, + req_rpc_data->stream); } } @@ -889,25 +562,62 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_app_t *app; + nxt_port_t *port, *main_app_port; + nxt_runtime_t *rt; + nxt_port_new_port_handler(task, msg); - if (msg->u.new_port != NULL - && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) - { + port = msg->u.new_port; + + if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { nxt_router_greet_controller(task, msg->u.new_port); } - if (msg->port_msg.stream == 0) { - return; - } + if (port == NULL || port->type != NXT_PROCESS_APP) { + + if (msg->port_msg.stream == 0) { + return; + } - if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_APP) - { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } - nxt_port_rpc_handler(task, msg); + if (msg->port_msg.stream != 0) { + nxt_port_rpc_handler(task, msg); + return; + } + + /* + * Port with "id == 0" is application 'main' port and it always + * should come with non-zero stream. + */ + nxt_assert(port->id != 0); + + /* Find 'main' app port and get app reference. */ + rt = task->thread->runtime; + + /* + * It is safe to access 'runtime->ports' hash because 'NEW_PORT' + * sent to main port (with id == 0) and processed in main thread. + */ + main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); + nxt_assert(main_app_port != NULL); + + app = main_app_port->app; + nxt_assert(app != NULL); + + nxt_thread_mutex_lock(&app->mutex); + + /* TODO here should be find-and-add code because there can be + port waiters in port_hash */ + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; + + nxt_thread_mutex_unlock(&app->mutex); + + port->app = app; + port->main_app_port = main_app_port; } @@ -1100,8 +810,10 @@ nxt_router_app_can_start(nxt_app_t *app) nxt_inline nxt_bool_t nxt_router_app_need_start(nxt_app_t *app) { - return app->idle_processes + app->pending_processes - < app->spare_processes; + return (app->active_requests + > app->port_hash_count + app->pending_processes) + || (app->spare_processes + > app->idle_processes + app->pending_processes); } @@ -1530,6 +1242,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_str_t *t, *s, *targets; nxt_uint_t n, i; + nxt_port_t *port; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1744,8 +1457,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); - nxt_queue_init(&app->requests); - nxt_queue_init(&app->pending); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1758,7 +1469,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; - app->max_pending_responses = 2; app->max_requests = apcf.requests; app->targets = targets; @@ -1789,6 +1499,25 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.handler = nxt_router_free_app; app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; + + port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + + nxt_port_write_enable(task, port); + port->app = app; + + app->shared_port = port; + + nxt_thread_mutex_create(&app->outgoing.mutex); } } @@ -2522,7 +2251,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; port = msg->u.new_port; + + nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); + port->app = app; + port->main_app_port = port; app->pending_processes--; app->processes++; @@ -2532,11 +2267,15 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_insert_tail(&app->ports, &port->app_link); nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; port->idle_start = 0; nxt_port_inc_use(port); + nxt_router_app_shared_port_send(task, port); + nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); } @@ -2939,10 +2678,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) static nxt_port_handlers_t nxt_router_app_port_handlers = { - .rpc_error = nxt_port_rpc_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_rpc_handler, - .oosm = nxt_router_oosm_handler, + .rpc_error = nxt_port_rpc_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, + .req_headers_ack = nxt_port_rpc_handler, }; @@ -3736,22 +3476,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; nxt_http_request_t *r; nxt_unit_response_t *resp; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; - b = msg->buf; req_rpc_data = data; - if (msg->size == 0) { - b = NULL; - } - r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; @@ -3762,19 +3497,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } + app = req_rpc_data->app; + nxt_assert(app != NULL); + + if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { + nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); + + return; + } + + b = (msg->size == 0) ? NULL : msg->buf; + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); nxt_buf_chain_add(&b, nxt_http_buf_last(r)); + req_rpc_data->rpc_cancel = 0; + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { + if (app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); } } @@ -3870,39 +3618,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) { - req_app_link = nxt_request_app_link_alloc(task, - req_rpc_data->req_app_link, - req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - app_port = req_app_link->app_port; - - if (app_port == NULL && req_rpc_data->app_port != NULL) { - req_app_link->app_port = req_rpc_data->app_port; - app_port = req_app_link->app_port; - req_app_link->apr_action = req_rpc_data->apr_action; - - req_rpc_data->app_port = NULL; - } - + app_port = req_rpc_data->app_port; if (nxt_slow_path(app_port == NULL)) { goto fail; } - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + nxt_thread_mutex_lock(&app->mutex); - nxt_queue_insert_tail(&app_port->active_websockets, - &req_app_link->link_port_websockets); + app_port->main_app_port->active_websockets++; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_thread_mutex_unlock(&app->mutex); nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); - req_app_link->apr_action = NXT_APR_CLOSE; + req_rpc_data->apr_action = NXT_APR_CLOSE; - nxt_debug(task, "req_app_link stream #%uD upgrade", - req_app_link->stream); + nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); r->state = &nxt_http_websocket; @@ -3921,6 +3651,94 @@ fail: } +static void +nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) +{ + nxt_app_t *app; + nxt_bool_t start_process; + nxt_port_t *app_port, *main_app_port, *idle_port; + nxt_queue_link_t *idle_lnk; + nxt_http_request_t *r; + + nxt_debug(task, "stream #%uD: got ack from %PI:%d", + req_rpc_data->stream, + msg->port_msg.pid, msg->port_msg.reply_port); + + nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, + msg->port_msg.pid); + + app = req_rpc_data->app; + + start_process = 0; + + nxt_thread_mutex_lock(&app->mutex); + + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(app_port == NULL)) { + nxt_thread_mutex_unlock(&app->mutex); + + r = req_rpc_data->request; + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; + } + + main_app_port = app_port->main_app_port; + + if (nxt_queue_chk_remove(&main_app_port->idle_link)) { + app->idle_processes--; + + /* Check port was in 'spare_ports' using idle_start field. */ + if (main_app_port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + /* + * If there is a vacant space in spare ports, + * move the last idle to spare_ports. + */ + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + } + + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; + } + } + + main_app_port->active_requests++; + + nxt_port_inc_use(app_port); + + nxt_thread_mutex_unlock(&app->mutex); + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + nxt_port_use(task, req_rpc_data->app_port, -1); + + req_rpc_data->app_port = app_port; + + if (app->timeout != 0) { + r = req_rpc_data->request; + + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); + } +} + + static const nxt_http_request_state_t nxt_http_request_send_state nxt_aligned(64) = { @@ -3949,42 +3767,14 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; req_rpc_data = data; - req_app_link = req_rpc_data->req_app_link; - - if (req_app_link != NULL) { - cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, - req_app_link->stream); - if (cancelled) { - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); - - if (res == NXT_OK) { - port = req_app_link->app_port; - - if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, - "port is NULL in cancelled req_app_link"); - return; - } - - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, - req_rpc_data, port->pid); - - nxt_router_app_prepare_request(task, req_app_link); - } + req_rpc_data->rpc_cancel = 0; - msg->port_msg.last = 0; - - return; - } - } + /* TODO cancel message and return if cancelled. */ + // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->request != NULL) { nxt_http_request_error(task, req_rpc_data->request, @@ -4008,6 +3798,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); app = app_joint->app; @@ -4022,6 +3814,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, } port->app = app; + port->main_app_port = port; nxt_thread_mutex_lock(&app->mutex); @@ -4029,24 +3822,60 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; app->processes++; + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; nxt_thread_mutex_unlock(&app->mutex); nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); + nxt_router_app_shared_port_send(task, port); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } +static nxt_int_t +nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) +{ + nxt_buf_t *b; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; + + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + port = app_port->app->shared_port; + + nxt_debug(task, "send port %FD to process %PI", + port->pair[0], app_port->pid); + + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; + + msg->id = port->id; + msg->pid = port->pid; + msg->max_size = port->max_size; + msg->max_share = port->max_share; + msg->type = port->type; + + return nxt_port_socket_twrite(task, app_port, + NXT_PORT_MSG_NEW_PORT, + port->pair[0], + 0, 0, b, NULL); +} + + static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link; + nxt_app_t *app; + nxt_app_joint_t *app_joint; app_joint = data; @@ -4070,32 +3899,11 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; - if (!nxt_queue_is_empty(&app->requests)) { - lnk = nxt_queue_last(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); - - } else { - req_app_link = NULL; - } - nxt_thread_mutex_unlock(&app->mutex); - if (req_app_link != NULL) { - nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, req_app_link->stream); - - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start application process"); - nxt_request_app_link_use(task, req_app_link, -1); - } + /* TODO req_app_link to cancel first pending message */ } -nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) @@ -4116,63 +3924,6 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) } -nxt_inline nxt_bool_t -nxt_router_app_first_port_busy(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - return port->app_pending_responses > 0; -} - - -nxt_inline nxt_port_t * -nxt_router_pop_first_port(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); - - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - port->app_pending_responses++; - - if (nxt_queue_chk_remove(&port->idle_link)) { - app->idle_processes--; - - if (port->idle_start == 0) { - nxt_assert(app->idle_processes < app->spare_processes); - - } else { - nxt_assert(app->idle_processes >= app->spare_processes); - - port->idle_start = 0; - } - } - - if ((app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - nxt_queue_insert_tail(&app->ports, lnk); - - nxt_port_inc_use(port); - - } else { - lnk->next = NULL; - } - - return port; -} - - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_app_t *app) { @@ -4184,12 +3935,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - if (port->app_pending_responses > 0) { - port = NULL; - - continue; - } - /* Caller is responsible to decrease port use count. */ nxt_queue_chk_remove(&port->app_link); @@ -4197,6 +3942,9 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) app->idle_processes--; } + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + port->app = NULL; app->processes--; @@ -4222,71 +3970,36 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void -nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = data; - -#if (NXT_DEBUG) - { - nxt_app_t *app; - - app = obj; - - nxt_assert(app != NULL); - nxt_assert(req_app_link != NULL); - nxt_assert(req_app_link->app_port != NULL); - - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, req_app_link->stream); - } -#endif - - nxt_router_app_prepare_request(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action) { - int inc_use; - uint32_t dec_pending, got_response; - nxt_app_t *app; - nxt_bool_t port_unchained; - nxt_bool_t send_quit, cancelled, adjust_idle_timer; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; - nxt_port_select_state_t state; + int inc_use; + uint32_t got_response, dec_requests; + nxt_app_t *app; + nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_port_t *main_app_port; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - req_app_link = NULL; - app = port->app; inc_use = 0; - dec_pending = 0; got_response = 0; + dec_requests = 0; switch (action) { case NXT_APR_NEW_PORT: break; case NXT_APR_REQUEST_FAILED: - dec_pending = 1; + dec_requests = 1; inc_use = -1; break; case NXT_APR_GOT_RESPONSE: - dec_pending = 1; got_response = 1; inc_use = -1; break; case NXT_APR_UPGRADE: - dec_pending = 1; got_response = 1; break; case NXT_APR_CLOSE: @@ -4294,120 +4007,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, break; } - nxt_thread_mutex_lock(&app->mutex); - - port->app_pending_responses -= dec_pending; - port->app_responses += got_response; + nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, + port->pid, port->id, + (int) inc_use, (int) got_response); - if (port->pair[1] != -1 - && (app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - if (port->app_link.next == NULL) { - if (port->app_pending_responses > 0) { - nxt_queue_insert_tail(&app->ports, &port->app_link); + if (port == app->shared_port) { + nxt_thread_mutex_lock(&app->mutex); - } else { - nxt_queue_insert_head(&app->ports, &port->app_link); - } + app->active_requests -= got_response + dec_requests; - nxt_port_inc_use(port); + nxt_thread_mutex_unlock(&app->mutex); - } else { - if (port->app_pending_responses == 0 - && nxt_queue_first(&app->ports) != &port->app_link) - { - nxt_queue_remove(&port->app_link); - nxt_queue_insert_head(&app->ports, &port->app_link); - } - } + goto adjust_use; } - if (!nxt_queue_is_empty(&app->ports) - && !nxt_queue_is_empty(&app->requests)) - { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); + main_app_port = port->main_app_port; - req_app_link->app_port = nxt_router_pop_first_port(app); + nxt_thread_mutex_lock(&app->mutex); - if (req_app_link->app_port->app_pending_responses > 1) { - nxt_request_app_link_pending(task, app, req_app_link); - } - } + main_app_port->app_responses += got_response; + main_app_port->active_requests -= got_response + dec_requests; + app->active_requests -= got_response + dec_requests; - /* Pop first pending request for this port. */ - if (dec_pending > 0 - && !nxt_queue_is_empty(&port->pending_requests)) + if (main_app_port->pair[1] != -1 + && (app->max_requests == 0 + || main_app_port->app_responses < app->max_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_queue_remove(&pending_ra->link_app_pending); - pending_ra->link_app_pending.next = NULL; - - } else { - pending_ra = NULL; - } - - /* Try to cancel and re-schedule first stalled request for this app. */ - if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { - lnk = nxt_queue_first(&app->pending); - - re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_pending); - - if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { + if (main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - nxt_debug(task, "app '%V' stalled request #%uD detected", - &app->name, re_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, - re_ra->stream); - - if (cancelled) { - state.req_app_link = re_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove re_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(re_ra); - - nxt_router_port_select(task, &state); - - goto re_ra_cancelled; - } + nxt_port_inc_use(main_app_port); } } - re_ra = NULL; - -re_ra_cancelled: - send_quit = (app->max_requests > 0 - && port->app_pending_responses == 0 - && port->app_responses >= app->max_requests); + && main_app_port->app_responses >= app->max_requests); if (send_quit) { - port_unchained = nxt_queue_chk_remove(&port->app_link); + port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - port->app = NULL; + nxt_port_hash_remove(&app->port_hash, main_app_port); + app->port_hash_count--; + + main_app_port->app = NULL; app->processes--; } else { @@ -4416,9 +4058,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 - && nxt_queue_is_empty(&port->active_websockets) - && port->idle_link.next == NULL) + if (main_app_port->pair[1] != -1 && !send_quit + && main_app_port->active_requests == 0 + && main_app_port->active_websockets == 0 + && main_app_port->idle_link.next == NULL) { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) @@ -4429,12 +4072,12 @@ re_ra_cancelled: } if (app->idle_processes < app->spare_processes) { - nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); } else { - nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); - port->idle_start = task->thread->engine->timers.now; + main_app_port->idle_start = task->thread->engine->timers.now; } app->idle_processes++; @@ -4447,60 +4090,22 @@ re_ra_cancelled: nxt_event_engine_post(app->engine, &app->adjust_idle_work); } - if (pending_ra != NULL) { - nxt_request_app_link_use(task, pending_ra, -1); - } - - if (re_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep re_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, re_ra); - - } else { - nxt_request_app_link_use(task, re_ra, -1); - } - } - - if (req_app_link != NULL) { - /* - * There should be call nxt_request_app_link_inc_use(req_app_link), - * because of one more link in the queue. But one link was - * recently removed from app->requests linked list. - * Corresponding decrement is in nxt_router_app_process_request(). - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, req_app_link); - - goto adjust_use; - } - /* ? */ - if (port->pair[1] == -1) { + if (main_app_port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", - &app->name, app, port, port->pid); + &app->name, app, main_app_port, main_app_port->pid); goto adjust_use; } if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", - &app->name, app); + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); if (port_unchained) { - nxt_port_use(task, port, -1); + nxt_port_use(task, main_app_port, -1); } goto adjust_use; @@ -4529,6 +4134,18 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + + if (port->id != 0) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, + port->pid, port->id); + + return; + } + unchain = nxt_queue_chk_remove(&port->app_link); if (nxt_queue_chk_remove(&port->idle_link)) { @@ -4553,8 +4170,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) start_process = !task->thread->engine->shutdown && nxt_router_app_can_start(app) - && (!nxt_queue_is_empty(&app->requests) - || nxt_router_app_need_start(app)); + && nxt_router_app_need_start(app); if (start_process) { app->pending_processes++; @@ -4603,6 +4219,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) app->adjust_idle_work.data = NULL; } + nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", + &app->name, + (int) app->idle_processes, (int) app->spare_processes); + while (app->idle_processes > app->spare_processes) { nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); @@ -4612,6 +4232,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) timeout = port->idle_start + app->idle_timeout; + nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", + &app->name, port->pid, + port->idle_start, timeout, threshold); + if (timeout > threshold) { break; } @@ -4621,6 +4245,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_queue_chk_remove(&port->app_link); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + app->idle_processes--; app->processes--; port->app = NULL; @@ -4704,12 +4331,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) } nxt_assert(app->processes == 0); + nxt_assert(app->active_requests == 0); + nxt_assert(app->port_hash_count == 0); nxt_assert(app->idle_processes == 0); - nxt_assert(nxt_queue_is_empty(&app->requests)); nxt_assert(nxt_queue_is_empty(&app->ports)); nxt_assert(nxt_queue_is_empty(&app->spare_ports)); nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + nxt_port_mmaps_destroy(&app->outgoing, 1); + + nxt_thread_mutex_destroy(&app->outgoing.mutex); + + if (app->shared_port != NULL) { + app->shared_port->app = NULL; + nxt_port_close(task, app->shared_port); + nxt_port_use(task, app->shared_port, -1); + } + nxt_thread_mutex_destroy(&app->mutex); nxt_mp_destroy(app->mem_pool); @@ -4726,178 +4364,34 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void -nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) -{ - int ra_use_delta; - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->req_app_link; - app = state->app; - - state->failed_port_use_delta = 0; - ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); - - if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) - { - nxt_assert(req_app_link->link_app_pending.next != NULL); - - nxt_queue_remove(&req_app_link->link_app_pending); - req_app_link->link_app_pending.next = NULL; - - ra_use_delta--; - } - - state->failed_port = req_app_link->app_port; - - if (req_app_link->app_port != NULL) { - state->failed_port_use_delta--; - - state->failed_port->app_pending_responses--; - - if (nxt_queue_chk_remove(&state->failed_port->app_link)) { - state->failed_port_use_delta--; - } - - req_app_link->app_port = NULL; - } - - can_start_process = nxt_router_app_can_start(app); - - state->port = NULL; - state->start_process = 0; - - if (nxt_queue_is_empty(&app->ports) - || (can_start_process && nxt_router_app_first_port_busy(app)) ) - { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, - &req_app_link->link_app_requests); - - } else { - nxt_queue_insert_tail(&app->requests, - &req_app_link->link_app_requests); - } - - nxt_request_app_link_inc_use(req_app_link); - - nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", - req_app_link->stream); - - if (can_start_process) { - app->pending_processes++; - state->start_process = 1; - } - - } else { - state->port = nxt_router_pop_first_port(app); - - if (state->port->app_pending_responses > 1) { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - req_app_link->app_port = state->port; - - nxt_request_app_link_pending(task, app, req_app_link); - } - - if (can_start_process && nxt_router_app_need_start(app)) { - app->pending_processes++; - state->start_process = 1; - } - } - - nxt_request_app_link_chk_use(req_app_link, ra_use_delta); - -fail: - - state->shared_ra = req_app_link; -} - - -static nxt_int_t -nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data) { - nxt_int_t res; - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->shared_ra; - app = state->app; - - if (state->failed_port_use_delta != 0) { - nxt_port_use(task, state->failed_port, state->failed_port_use_delta); - } - - if (nxt_slow_path(req_app_link == NULL)) { - if (state->port != NULL) { - nxt_port_use(task, state->port, -1); - } - - nxt_request_app_link_error(task, app, state->req_app_link, - "Failed to allocate shared req<->app link"); - - return NXT_ERROR; - } - - if (state->port != NULL) { - nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); + nxt_bool_t start_process; + nxt_port_t *port; - req_app_link->app_port = state->port; + start_process = 0; - if (state->start_process) { - nxt_router_start_app_process(task, app); - } + nxt_thread_mutex_lock(&app->mutex); - return NXT_OK; - } + port = app->shared_port; + nxt_port_inc_use(port); - if (!state->start_process) { - nxt_debug(task, "app '%V' %p too many running or pending processes", - &app->name, app); + app->active_requests++; - return NXT_AGAIN; + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; } - res = nxt_router_start_app_process(task, app); + nxt_thread_mutex_unlock(&app->mutex); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start app process"); + req_rpc_data->app_port = port; + req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; - return NXT_ERROR; + if (start_process) { + nxt_router_start_app_process(task, app); } - - return NXT_AGAIN; -} - - -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_port_select_state_t state; - - state.req_app_link = req_app_link; - state.app = app; - - nxt_thread_mutex_lock(&app->mutex); - - nxt_router_port_select(task, &state); - - nxt_thread_mutex_unlock(&app->mutex); - - return nxt_router_port_post_select(task, &state); } @@ -4905,10 +4399,7 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; nxt_event_engine_t *engine; - nxt_request_app_link_t ra_local, *req_app_link; nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; @@ -4927,7 +4418,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, * in port handlers. Need to fixup request memory pool. Counterpart * release will be called via following call chain: * nxt_request_rpc_data_unlink() -> - * nxt_router_http_request_done() -> + * nxt_router_http_request_release_post() -> * nxt_router_http_request_release() */ nxt_mp_retain(r->mem_pool); @@ -4939,29 +4430,37 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; + req_rpc_data->msg_info.body_fd = -1; + req_rpc_data->rpc_cancel = 1; nxt_router_app_use(task, app, 1); req_rpc_data->request = r; r->req_rpc_data = req_rpc_data; - req_app_link = &ra_local; - nxt_request_app_link_init(task, req_app_link, req_rpc_data); + if (r->last != NULL) { + r->last->completion_handler = nxt_router_http_request_done; + } - res = nxt_router_app_port(task, app, req_app_link); - req_app_link = req_rpc_data->req_app_link; + nxt_router_app_port_get(task, app, req_rpc_data); + nxt_router_app_prepare_request(task, req_rpc_data); +} - if (res == NXT_OK) { - port = req_app_link->app_port; - nxt_assert(port != NULL); +static void +nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - nxt_router_app_prepare_request(task, req_app_link); + if (r->req_rpc_data) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } - nxt_request_app_link_use(task, req_app_link, -1); + nxt_http_request_close_handler(task, r, r->proto.any); } @@ -4973,76 +4472,80 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) + nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *buf; + nxt_app_t *app; + nxt_buf_t *buf, *body; nxt_int_t res; nxt_port_t *port, *reply_port; - nxt_apr_action_t apr_action; - nxt_assert(req_app_link->app_port != NULL); + app = req_rpc_data->app; - port = req_app_link->app_port; - reply_port = req_app_link->reply_port; + nxt_assert(app != NULL); - apr_action = NXT_APR_REQUEST_FAILED; + port = req_rpc_data->app_port; - buf = nxt_router_prepare_msg(task, req_app_link->request, port, - nxt_app_msg_prefix[port->app->type]); + nxt_assert(port != NULL); + + reply_port = task->thread->engine->port; + buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, + nxt_app_msg_prefix[app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to prepare message for application"); - goto release_port; + nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", + req_rpc_data->stream, &app->name); + + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; } nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(buf), port->socket.fd); - apr_action = NXT_APR_NEW_PORT; - - req_app_link->msg_info.buf = buf; - req_app_link->msg_info.completion_handler = buf->completion_handler; + req_rpc_data->msg_info.buf = buf; + req_rpc_data->msg_info.completion_handler = buf->completion_handler; - for (; buf; buf = buf->next) { + do { buf->completion_handler = nxt_router_dummy_buf_completion; - } + buf = buf->next; + } while (buf != NULL); - buf = req_app_link->msg_info.buf; + buf = req_rpc_data->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, - &req_app_link->msg_info.tracking, - req_app_link->stream); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to get tracking area"); - goto release_port; - } + body = req_rpc_data->request->body; - if (req_app_link->body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, - req_app_link->body_fd); + if (body != NULL && nxt_buf_is_file(body)) { + req_rpc_data->msg_info.body_fd = body->file->fd; - lseek(req_app_link->body_fd, 0, SEEK_SET); + body->file->fd = -1; + + } else { + req_rpc_data->msg_info.body_fd = -1; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - req_app_link->body_fd, - req_app_link->stream, reply_port->id, buf, - &req_app_link->msg_info.tracking); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send message to application"); - goto release_port; + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); } -release_port: + res = nxt_port_socket_twrite(task, port, + NXT_PORT_MSG_REQ_HEADERS, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, reply_port->id, buf, + NULL); - nxt_router_app_port_release(task, port, apr_action); + if (nxt_slow_path(res != NXT_OK)) { + nxt_alert(task, "stream #%uD, app '%V': failed to send app message", + req_rpc_data->stream, &app->name); - nxt_request_app_link_update_peer(task, req_app_link); + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + } } @@ -5100,7 +4603,7 @@ nxt_fields_next(nxt_fields_iter_t *i) static nxt_buf_t * nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, - nxt_port_t *port, const nxt_str_t *prefix) + nxt_app_t *app, const nxt_str_t *prefix) { void *target_pos, *query_pos; u_char *pos, *end, *p, c; @@ -5141,7 +4644,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, &port->process->outgoing, + out = nxt_port_mmap_get_buf(task, &app->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5323,8 +4826,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, - free_size); + buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5372,15 +4874,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_bool_t cancelled, unlinked; - nxt_port_t *port; nxt_timer_t *timer; - nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_request_app_link_t *pending_ra; nxt_request_rpc_data_t *req_rpc_data; - nxt_port_select_state_t state; timer = obj; @@ -5388,94 +4884,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) r = nxt_timer_data(timer, nxt_http_request_t, timer); req_rpc_data = r->timer_data; - app = req_rpc_data->app; - - if (app == NULL) { - goto generate_error; - } - - port = NULL; - pending_ra = NULL; - - if (req_rpc_data->app_port != NULL) { - port = req_rpc_data->app_port; - req_rpc_data->app_port = NULL; - } - - if (port == NULL && req_rpc_data->req_app_link != NULL - && req_rpc_data->req_app_link->app_port != NULL) - { - port = req_rpc_data->req_app_link->app_port; - req_rpc_data->req_app_link->app_port = NULL; - } - - if (port == NULL) { - goto generate_error; - } - - nxt_thread_mutex_lock(&app->mutex); - - unlinked = nxt_queue_chk_remove(&port->app_link); - - if (!nxt_queue_is_empty(&port->pending_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_debug(task, "app '%V' pending request #%uD found", - &app->name, pending_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, - pending_ra->stream); - - if (cancelled) { - state.req_app_link = pending_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove pending_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(pending_ra); - - nxt_router_port_select(task, &state); - - } else { - pending_ra = NULL; - } - } - - nxt_thread_mutex_unlock(&app->mutex); - - if (pending_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep pending_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, pending_ra); - - } else { - nxt_request_app_link_use(task, pending_ra, -1); - } - } - - nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - - nxt_port_use(task, port, unlinked ? -2 : -1); - -generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); @@ -5483,13 +4891,11 @@ generate_error: } -static nxt_int_t -nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r) +static void +nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) { r->timer.handler = nxt_router_http_request_release; nxt_timer_add(task->thread->engine, &r->timer, 0); - - return NXT_OK; } @@ -5498,7 +4904,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; - nxt_debug(task, "http app release"); + nxt_debug(task, "http request pool release"); r = nxt_timer_data(obj, nxt_http_request_t, timer); @@ -5593,7 +4999,18 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_assert(port->type == NXT_PROCESS_APP); - mmaps = &port->process->outgoing; + if (nxt_slow_path(port->app == NULL)) { + nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", + port->pid, port->id); + + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + + return; + } + + mmaps = &port->app->outgoing; nxt_thread_mutex_lock(&mmaps->mutex); if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { @@ -5602,6 +5019,9 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", (int) get_mmap_msg->id); + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); return; } diff --git a/src/nxt_router.h b/src/nxt_router.h index d8e93be6..7e0dc7d7 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -101,18 +101,20 @@ typedef struct { struct nxt_app_s { - nxt_thread_mutex_t mutex; /* Protects ports queue. */ - nxt_queue_t ports; /* of nxt_port_t.app_link */ + nxt_thread_mutex_t mutex; /* Protects ports queue. */ + nxt_queue_t ports; /* of nxt_port_t.app_link */ + nxt_lvlhsh_t port_hash; /* of nxt_port_t */ nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */ nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */ nxt_work_t adjust_idle_work; nxt_event_engine_t *engine; - nxt_queue_t requests; /* of nxt_request_app_link_t */ - nxt_queue_t pending; /* of nxt_request_app_link_t */ nxt_str_t name; + uint32_t port_hash_count; + + uint32_t active_requests; uint32_t pending_processes; uint32_t processes; uint32_t idle_processes; @@ -120,7 +122,6 @@ struct nxt_app_s { uint32_t max_processes; uint32_t spare_processes; uint32_t max_pending_processes; - uint32_t max_pending_responses; uint32_t max_requests; nxt_msec_t timeout; @@ -139,6 +140,9 @@ struct nxt_app_s { nxt_atomic_t use_count; nxt_app_joint_t *joint; + nxt_port_t *shared_port; + + nxt_port_mmaps_t outgoing; }; diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h index a38980ee..1271520d 100644 --- a/src/nxt_router_request.h +++ b/src/nxt_router_request.h @@ -9,14 +9,12 @@ typedef struct nxt_msg_info_s { nxt_buf_t *buf; + nxt_fd_t body_fd; nxt_port_mmap_tracking_t tracking; nxt_work_handler_t completion_handler; } nxt_msg_info_t; -typedef struct nxt_request_app_link_s nxt_request_app_link_t; - - typedef enum { NXT_APR_NEW_PORT, NXT_APR_REQUEST_FAILED, @@ -35,38 +33,9 @@ typedef struct { nxt_http_request_t *request; nxt_msg_info_t msg_info; - nxt_request_app_link_t *req_app_link; -} nxt_request_rpc_data_t; - - -struct nxt_request_app_link_s { - uint32_t stream; - nxt_atomic_t use_count; - - nxt_port_t *app_port; - nxt_apr_action_t apr_action; - - nxt_port_t *reply_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_request_rpc_data_t *req_rpc_data; - nxt_fd_t body_fd; - nxt_nsec_t res_time; - - nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ - /* for nxt_port_t.pending_requests */ - nxt_queue_link_t link_port_pending; - nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ - /* for nxt_port_t.active_websockets */ - nxt_queue_link_t link_port_websockets; - - nxt_mp_t *mem_pool; - nxt_work_t work; - - int err_code; - const char *err_str; -}; + nxt_bool_t rpc_cancel; +} nxt_request_rpc_data_t; #endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */ diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b321a0d4..7fb2826d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); -nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); -nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, @@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); +static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); -static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, - nxt_unit_read_buf_t *rbuf); -static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); +static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); +static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); +static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port, void *buf, size_t buf_size, - void *oob, size_t oob_size); +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); @@ -308,6 +311,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_port_t *router_port; + nxt_unit_port_t *shared_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init) fail: - nxt_unit_ctx_release(&lib->main_ctx); + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; } @@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->use_count = 0; lib->router_port = NULL; + lib->shared_port = NULL; rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void -nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) { + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); } nxt_inline void -nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) { - long c; + long c; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); @@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_port_release(lib->router_port); } + if (nxt_fast_path(lib->shared_port != NULL)) { + nxt_unit_port_release(lib->shared_port); + } + free(lib); } } @@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) recv_msg.incoming_buf = NULL; if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + if (nxt_slow_path(rbuf->size == 0)) { + nxt_unit_debug(ctx, "read port closed"); + + nxt_unit_quit(ctx); + rc = NXT_UNIT_OK; + + goto fail; + } + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -946,6 +971,13 @@ fail: nxt_unit_process_release(recv_msg.process); } + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { +#if (NXT_DEBUG) + memset(rbuf->buf, 0xAC, rbuf->size); +#endif + nxt_unit_read_buf_release(ctx, rbuf); + } + return rc; } @@ -954,6 +986,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, recv_msg->fd); - nb = 0; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (new_port_msg->id == (nxt_port_id_t) -1) { + nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", - recv_msg->stream, recv_msg->fd, strerror(errno), errno); + new_port.in_fd = recv_msg->fd; + new_port.out_fd = -1; - return NXT_UNIT_ERROR; + } else { + nb = 0; + + if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", + recv_msg->stream, recv_msg->fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); + + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd; } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd; new_port.data = NULL; recv_msg->fd = -1; @@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_release(port); + if (new_port_msg->id == (nxt_port_id_t) -1) { + lib->shared_port = port; + + } else { + nxt_unit_port_release(port); + } return NXT_UNIT_OK; } @@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } if (nxt_fast_path(res == NXT_UNIT_OK)) { + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; + } + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->callbacks.request_handler(req); @@ -1221,6 +1276,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int +nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.stream = req_impl->stream; + msg.pid = lib->pid; + msg.reply_port = ctx_impl->read_port->id.id; + msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; + + res = nxt_unit_port_send(req->ctx, req->response_port, + &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { size_t hsize; @@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - nxt_unit_read_buf(ctx, rbuf); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { nxt_unit_read_buf_release(ctx, rbuf); @@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + int rc; + nxt_unit_impl_t *lib; - nxt_unit_ctx_use(ctx_impl); + nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_run_once(ctx); + rc = nxt_unit_run_once_impl(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_ctx_release(ctx); return rc; } @@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { - int rc; - nxt_queue_link_t *link; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_read_buf_t *rbuf; + int rc; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx); - nxt_unit_ctx_use(ctx_impl); + rc = nxt_unit_run_once_impl(ctx); - pthread_mutex_lock(&ctx_impl->mutex); + nxt_unit_ctx_release(ctx); - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + return rc; +} -next_pending: - link = nxt_queue_first(&ctx_impl->pending_rbuf); - nxt_queue_remove(link); +static int +nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_read_buf_t *rbuf; - rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); - } else { - rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + return rc; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - if (nxt_slow_path(rbuf == NULL)) { + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_process_ready_req(ctx); - return NXT_UNIT_ERROR; - } + return rc; +} - nxt_unit_read_buf(ctx, rbuf); - } - if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, rbuf); +static int +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + int res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + struct pollfd fds[2]; -#if (NXT_DEBUG) - if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { - memset(rbuf->buf, 0xAC, rbuf->size); - } -#endif + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - } else { - rc = NXT_UNIT_ERROR; + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); } - if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { - rc = NXT_UNIT_OK; +retry: - } else { - nxt_unit_read_buf_release(ctx, rbuf); - } + fds[0].fd = ctx_impl->read_port->in_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; - if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { - rc = NXT_UNIT_OK; - } + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - pthread_mutex_lock(&ctx_impl->mutex); + res = poll(fds, 2, -1); + if (nxt_slow_path(res < 0)) { + err = errno; - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - goto next_pending; + if (err == EINTR) { + goto retry; } - pthread_mutex_unlock(&ctx_impl->mutex); + nxt_unit_alert(ctx, "poll() failed: %s (%d)", + strerror(err), err); - nxt_unit_process_ready_req(ctx_impl); + rbuf->size = -1; + + return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - nxt_unit_ctx_release(ctx_impl); + if ((fds[0].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + } - return rc; + if ((fds[1].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + } + + rbuf->size = -1; + + return NXT_UNIT_ERROR; } -static void -nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +static int +nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { + int rc; + nxt_queue_t pending_rbuf; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + nxt_queue_init(&pending_rbuf); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return NXT_UNIT_OK; + } + + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->pending_rbuf); - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + pthread_mutex_unlock(&ctx_impl->mutex); + + rc = NXT_UNIT_OK; + + nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { + + if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + } nxt_queue_loop; + + return rc; } static void -nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { nxt_queue_t ready_req; nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); if (nxt_queue_is_empty(&ctx_impl->ready_req)) { @@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + (void) nxt_unit_send_req_headers_ack(&req_impl->req); + lib->callbacks.request_handler(&req_impl->req); } nxt_queue_loop; } -void -nxt_unit_done(nxt_unit_ctx_t *ctx) +int +nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { + int rc; + nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - nxt_unit_ctx_release(ctx_impl); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_run_shared(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + + nxt_unit_ctx_use(ctx); + + rc = nxt_unit_process_port_msg_impl(ctx, port); + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +static int +nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + nxt_unit_read_buf_t *rbuf; + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + rc = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); + return rc; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_process_ready_req(ctx); + + return rc; +} + + +void +nxt_unit_done(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_release(ctx); } @@ -5056,12 +5295,11 @@ retry: } -static ssize_t +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, - void *buf, size_t buf_size, void *oob, size_t oob_size) + nxt_unit_read_buf_t *rbuf) { - int fd; - ssize_t res; + int fd, err; struct iovec iov[1]; struct msghdr msg; nxt_unit_impl_t *lib; @@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (lib->callbacks.port_recv != NULL) { - return lib->callbacks.port_recv(ctx, port, - buf, buf_size, oob, oob_size); + rbuf->size = lib->callbacks.port_recv(ctx, port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + if (nxt_slow_path(rbuf->size < 0)) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; } - iov[0].iov_base = buf; - iov[0].iov_len = buf_size; + iov[0].iov_base = rbuf->buf; + iov[0].iov_len = sizeof(rbuf->buf); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_flags = 0; - msg.msg_control = oob; - msg.msg_controllen = oob_size; + msg.msg_control = rbuf->oob; + msg.msg_controllen = sizeof(rbuf->oob); fd = port->in_fd; retry: - res = recvmsg(fd, &msg, 0); + rbuf->size = recvmsg(fd, &msg, 0); - if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + if (nxt_slow_path(rbuf->size == -1)) { + err = errno; + + if (err == EINTR) { goto retry; } + if (err == EAGAIN) { + nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(errno), errno); + + return NXT_UNIT_AGAIN; + } + nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", fd, strerror(errno), errno); - } else { - nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); + return NXT_UNIT_ERROR; } - return res; + nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); + + return NXT_UNIT_OK; } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 79157f5f..0f16773f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -202,8 +202,21 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); */ int nxt_unit_run(nxt_unit_ctx_t *); +int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); + +int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); + +/* + * Receive and process one message, invoke configured callbacks. + * + * If application implements it's own event loop, each datagram received + * from port socket should be initially processed by unit. This function + * may invoke other application-defined callback for message processing. + */ int nxt_unit_run_once(nxt_unit_ctx_t *ctx); +int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); + /* Destroy application library object. */ void nxt_unit_done(nxt_unit_ctx_t *); |