diff options
-rw-r--r-- | src/nxt_main_process.c | 15 | ||||
-rw-r--r-- | src/nxt_port.c | 129 | ||||
-rw-r--r-- | src/nxt_port.h | 17 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 104 | ||||
-rw-r--r-- | src/nxt_process.c | 12 | ||||
-rw-r--r-- | src/nxt_router.c | 762 | ||||
-rw-r--r-- | src/nxt_router.h | 6 | ||||
-rw-r--r-- | src/nxt_runtime.c | 36 | ||||
-rw-r--r-- | src/nxt_runtime.h | 8 |
9 files changed, 682 insertions, 407 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 90c06a05..9c27a89b 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -260,14 +260,17 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } + nxt_process_port_add(task, process, port); + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); return ret; } - nxt_process_port_add(task, process, port); + nxt_runtime_port_add(task, port); - nxt_runtime_port_add(rt, port); + nxt_port_use(task, port, -1); /* * A main process port. A write port is not closed @@ -508,7 +511,7 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, port = nxt_port_new(task, 0, 0, init->type); if (nxt_slow_path(port == NULL)) { - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); return NXT_ERROR; } @@ -516,12 +519,14 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_mp_release(port->mem_pool, port); + nxt_port_use(task, port, -1); return ret; } pid = nxt_process_create(task, process); + nxt_port_use(task, port, -1); + switch (pid) { case -1: @@ -755,7 +760,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); if (!nxt_exiting) { nxt_runtime_process_each(rt, process) { diff --git a/src/nxt_port.c b/src/nxt_port.c index d7f42012..3f7dc411 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -27,13 +27,15 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) nxt_assert(port->pair[0] == -1); nxt_assert(port->pair[1] == -1); - nxt_assert(port->app_stream == 0); + nxt_assert(port->use_count == 0); nxt_assert(port->app_link.next == NULL); nxt_assert(nxt_queue_is_empty(&port->messages)); nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); + nxt_thread_mutex_destroy(&port->write_mutex); + nxt_mp_free(mp, port); } @@ -58,10 +60,12 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, port->pid = pid; port->type = type; port->mem_pool = mp; + port->use_count = 1; nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); nxt_queue_init(&port->messages); + nxt_thread_mutex_create(&port->write_mutex); } else { nxt_mp_destroy(mp); @@ -73,11 +77,11 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, } -nxt_bool_t -nxt_port_release(nxt_port_t *port) +void +nxt_port_close(nxt_task_t *task, nxt_port_t *port) { - nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid, - port->id, port->type); + nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, + port->id, port->type); if (port->pair[0] != -1) { nxt_fd_close(port->pair[0]); @@ -87,21 +91,31 @@ nxt_port_release(nxt_port_t *port) if (port->pair[1] != -1) { nxt_fd_close(port->pair[1]); port->pair[1] = -1; - } - if (port->type == NXT_PROCESS_WORKER) { - if (nxt_router_app_remove_port(port) == 0) { - return 0; + if (port->app != NULL) { + nxt_router_app_port_close(task, port); } } +} + + +static void +nxt_port_release(nxt_task_t *task, nxt_port_t *port) +{ + nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, + port->id, port->type); + + if (port->app != NULL) { + nxt_router_app_use(task, port->app, -1); + + port->app = NULL; + } if (port->link.next != NULL) { nxt_process_port_remove(port); } nxt_mp_release(port->mem_pool, NULL); - - return 1; } @@ -263,7 +277,9 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->socket.task = task; - nxt_runtime_port_add(rt, port); + nxt_runtime_port_add(task, port); + + nxt_port_use(task, port, -1); nxt_port_write_enable(task, port); @@ -434,7 +450,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process = nxt_runtime_process_find(rt, pid); if (process) { - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); } } @@ -444,3 +460,90 @@ nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "port empty handler"); } + + +typedef struct { + nxt_work_t work; + nxt_port_t *port; + nxt_port_post_handler_t handler; +} nxt_port_work_t; + + +static void +nxt_port_post_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_port_t *port; + nxt_port_work_t *pw; + nxt_port_post_handler_t handler; + + pw = obj; + port = pw->port; + handler = pw->handler; + + nxt_free(pw); + + handler(task, port, data); + + nxt_port_use(task, port, -1); +} + + +nxt_int_t +nxt_port_post(nxt_task_t *task, nxt_port_t *port, + nxt_port_post_handler_t handler, void *data) +{ + nxt_port_work_t *pw; + + if (task->thread->engine == port->engine) { + handler(task, port, data); + + return NXT_OK; + } + + pw = nxt_zalloc(sizeof(nxt_port_work_t)); + + if (nxt_slow_path(pw == NULL)) { + return NXT_ERROR; + } + + nxt_atomic_fetch_add(&port->use_count, 1); + + pw->work.handler = nxt_port_post_handler; + pw->work.task = &port->engine->task; + pw->work.obj = pw; + pw->work.data = data; + + pw->port = port; + pw->handler = handler; + + nxt_event_engine_post(port->engine, &pw->work); + + return NXT_OK; +} + + +static void +nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) +{ + /* no op */ +} + + +void +nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) +{ + int c; + + c = nxt_atomic_fetch_add(&port->use_count, i); + + if (i < 0 && c == -i) { + + if (task->thread->engine == port->engine) { + nxt_port_release(task, port); + + return; + } + + nxt_port_post(task, port, nxt_port_release_handler, NULL); + } +} diff --git a/src/nxt_port.h b/src/nxt_port.h index c5b2b40e..f6679bb2 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -115,7 +115,6 @@ typedef struct { size_t share; nxt_fd_t fd; nxt_bool_t close_fd; - nxt_bool_t opened; nxt_port_msg_t port_msg; nxt_work_t work; @@ -145,12 +144,15 @@ struct nxt_port_s { nxt_app_t *app; nxt_queue_t messages; /* of nxt_port_send_msg_t */ + nxt_thread_mutex_t write_mutex; /* Maximum size of message part. */ uint32_t max_size; /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_stream; + + uint32_t app_requests; + uint32_t app_responses; nxt_port_handler_t handler; nxt_port_handler_t *data; @@ -167,8 +169,9 @@ struct nxt_port_s { nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */ nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */ + nxt_atomic_t use_count; + nxt_process_type_t type; - nxt_work_t work; struct iovec *iov; void *mmsg_buf; @@ -194,9 +197,11 @@ typedef union { } nxt_port_data_t; +typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port, + void *data); + nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type); -nxt_bool_t nxt_port_release(nxt_port_t *port); nxt_port_id_t nxt_port_get_next_id(void); void nxt_port_reset_next_id(void); @@ -204,6 +209,7 @@ void nxt_port_reset_next_id(void); nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size); void nxt_port_destroy(nxt_port_t *port); +void nxt_port_close(nxt_task_t *task, nxt_port_t *port); void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_write_close(nxt_port_t *port); void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); @@ -231,5 +237,8 @@ void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port, + nxt_port_post_handler_t handler, void *data); +void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i); #endif /* _NXT_PORT_H_INCLUDED_ */ diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 75706459..d5ed493d 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, { nxt_port_send_msg_t *msg; - nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - - if ((type & NXT_PORT_MSG_SYNC) != 0) { - msg->opened = 0; - continue; - } - - if (msg->port_msg.stream == stream - && msg->port_msg.reply_port == reply_port - && msg->port_msg.last == 0 - && msg->opened) { - - /* - * An fd is ignored since a file descriptor - * must be sent only in the first message of a stream. - */ - nxt_buf_chain_add(&msg->buf, b); - - msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0; - - return NXT_OK; - } - - } nxt_queue_loop; - msg = nxt_mp_retain(task->thread->engine->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { @@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->fd = fd; msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg->share = 0; - msg->opened = 1; msg->work.next = NULL; msg->work.handler = nxt_port_release_send_msg; @@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg->port_msg.mmap = 0; + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_insert_tail(&port->messages, &msg->link); + nxt_thread_mutex_unlock(&port->write_mutex); + + nxt_port_use(task, port, 1); + if (port->socket.write_ready) { nxt_port_write_handler(task, &port->socket, NULL); } @@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, static void +nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) +{ + nxt_fd_event_block_write(task->thread->engine, &port->socket); +} + + +static void +nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) +{ + nxt_fd_event_enable_write(task->thread->engine, &port->socket); +} + + +static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { + int use_delta; size_t plain_size; ssize_t n; + nxt_bool_t block_write, enable_write; nxt_port_t *port; struct iovec *iov; nxt_work_queue_t *wq; @@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) port = nxt_container_of(obj, nxt_port_t, socket); + block_write = 0; + enable_write = 0; + use_delta = 0; + + nxt_thread_mutex_lock(&port->write_mutex); + iov = port->iov; do { link = nxt_queue_first(&port->messages); if (link == nxt_queue_tail(&port->messages)) { - nxt_fd_event_block_write(task->thread->engine, &port->socket); - return; + block_write = 1; + goto unlock_mutex; } msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); @@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); + use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, msg->engine); } @@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } while (port->socket.write_ready); if (nxt_fd_event_is_disabled(port->socket.write)) { - /* TODO task->thread->engine or port->engine ? */ - nxt_fd_event_enable_write(task->thread->engine, &port->socket); + enable_write = 1; } - return; + goto unlock_mutex; fail: + use_delta++; + nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_port_error_handler, task, &port->socket, NULL); + nxt_port_error_handler, task, &port->socket, + &port->socket); + +unlock_mutex: + nxt_thread_mutex_unlock(&port->write_mutex); + + if (block_write && nxt_fd_event_is_active(port->socket.write)) { + nxt_port_post(task, port, nxt_port_fd_block_write, NULL); + } + + if (enable_write) { + nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); + } + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } } @@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { + int use_delta; nxt_buf_t *b; nxt_port_t *port; nxt_work_queue_t *wq; @@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) port = nxt_container_of(obj, nxt_port_t, socket); - nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { + use_delta = 0; + + if (obj == data) { + use_delta--; + } + + wq = &task->thread->engine->fast_work_queue; - wq = &task->thread->engine->fast_work_queue; + nxt_thread_mutex_lock(&port->write_mutex); + + nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { for(b = msg->buf; b != NULL; b = b->next) { if (nxt_buf_is_sync(b)) { @@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) } nxt_queue_remove(&msg->link); + use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, msg->engine); } nxt_queue_loop; + + nxt_thread_mutex_unlock(&port->write_mutex); + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } } diff --git a/src/nxt_process.c b/src/nxt_process.c index 95e701f8..d3ec36ed 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -58,7 +58,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) if (!p->ready) { nxt_debug(task, "remove not ready process %PI", p->pid); - nxt_runtime_process_remove(rt, p); + nxt_runtime_process_remove(task, p); } else { nxt_port_mmaps_destroy(p->incoming, 0); @@ -67,7 +67,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) } nxt_runtime_process_loop; - nxt_runtime_process_add(rt, process); + nxt_runtime_process_add(task, process); nxt_process_start(task, process); @@ -81,7 +81,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) process->pid = pid; - nxt_runtime_process_add(rt, process); + nxt_runtime_process_add(task, process); break; } @@ -589,16 +589,14 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) static void nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) { - nxt_runtime_t *rt; nxt_process_t *process; process = obj; - rt = data; process->port_cleanups--; if (process->port_cleanups == 0) { - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); } } @@ -609,7 +607,7 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) nxt_queue_insert_tail(&process->ports, &port->link); nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process, - task->thread->runtime); + NULL); process->port_cleanups++; } diff --git a/src/nxt_router.c b/src/nxt_router.c index df3caf82..5eb95b59 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,19 +24,12 @@ typedef struct { typedef struct nxt_req_app_link_s nxt_req_app_link_t; -typedef struct nxt_start_worker_s nxt_start_worker_t; - -struct nxt_start_worker_s { - nxt_app_t *app; - nxt_req_app_link_t *ra; - - nxt_work_t work; -}; typedef struct { uint32_t stream; nxt_conn_t *conn; + nxt_app_t *app; nxt_port_t *app_port; nxt_req_app_link_t *ra; @@ -72,6 +65,8 @@ typedef struct { } nxt_remove_pid_msg_t; +static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); + static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, @@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine); -static void nxt_router_apps_sort(nxt_router_t *router, +static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_t *router, @@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); -static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, - void *data); -static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); -static void nxt_router_app_release_port(nxt_task_t *task, void *obj, - void *data); +static void nxt_router_app_port_ready(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_port_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); + +static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); +static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, - nxt_req_app_link_t *ra, nxt_port_t *port); + nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, @@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data) } -static nxt_start_worker_t * -nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +static void +nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) { - nxt_port_t *main_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; + size_t size; + uint32_t stream; + nxt_app_t *app; + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; - sw = nxt_zalloc(sizeof(nxt_start_worker_t)); + app = data; - if (nxt_slow_path(sw == NULL)) { - return NULL; - } + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - sw->app = app; - sw->ra = ra; + nxt_debug(task, "app '%V' %p start worker", &app->name, app); - nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, - ra->stream, &app->name, app); + size = app->name.length + 1 + app->conf.length; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - sw->work.handler = nxt_router_send_sw_request; - sw->work.task = &main_port->engine->task; - sw->work.obj = sw; - sw->work.data = task->thread->engine; - sw->work.next = NULL; + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); - if (task->thread->engine != main_port->engine) { - nxt_debug(task, "sw %p post send to main engine %p", sw, - main_port->engine); + stream = nxt_port_rpc_register_handler(task, port, + nxt_router_app_port_ready, + nxt_router_app_port_error, + -1, app); - nxt_event_engine_post(main_port->engine, &sw->work); + if (nxt_slow_path(stream == 0)) { + nxt_mp_release(b->data, b); - } else { - nxt_router_send_sw_request(task, sw, sw->work.data); + goto failed; } - return sw; -} + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, + stream, port->id, b); + + return; +failed: -nxt_inline void -nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) -{ - nxt_debug(task, "sw %p release", sw); + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; - nxt_free(sw); + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); } -nxt_inline void -nxt_router_rc_unlink(nxt_req_conn_link_t *rc) +static nxt_int_t +nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) { - nxt_queue_remove(&rc->link); + nxt_int_t res; + nxt_port_t *router_port; + nxt_runtime_t *rt; - if (rc->ra != NULL) { - rc->ra->rc = NULL; - rc->ra = NULL; + rt = task->thread->runtime; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + nxt_router_app_use(task, app, 1); + + res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, + app); + + if (res == NXT_OK) { + return res; } - rc->conn = NULL; + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); + + return NXT_ERROR; } @@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) static void nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { - nxt_port_t *app_port; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; ra = obj; engine = data; - if (ra->app_port != NULL) { - - app_port = ra->app_port; - ra->app_port = NULL; - - if (task->thread->engine != engine) { - ra->app_pid = app_port->pid; - } - - nxt_router_app_release_port(task, app_port, app_port->app); - -#if 0 - /* Uncomment to hold app port until complete response received. */ - if (ra->rc != NULL) { - ra->rc->app_port = ra->app_port; - - } else { - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + if (task->thread->engine != engine) { + if (ra->app_port != NULL) { + ra->app_pid = ra->app_port->pid; } -#endif - } - if (task->thread->engine != engine) { ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; @@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) return; } - if (ra->rc != NULL && ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); + nxt_debug(task, "ra stream #%uD release", ra->stream); + + rc = ra->rc; + + if (rc != NULL) { + if (ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + } + + rc->app_port = ra->app_port; + + ra->app_port = NULL; + rc->ra = NULL; + ra->rc = NULL; } - nxt_debug(task, "ra stream #%uD release", ra->stream); + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; + } nxt_mp_release(ra->mem_pool, ra); } @@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) static void nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_conn_t *c; + nxt_req_app_link_t *ra; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; ra = obj; engine = data; @@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "ra stream #%uD abort", ra->stream); - if (ra->rc != NULL) { - c = ra->rc->conn; + rc = ra->rc; + + if (rc != NULL) { + c = rc->conn; nxt_router_gen_error(task, c, 500, "Failed to start application worker"); + + rc->ra = NULL; + ra->rc = NULL; + } + + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; } nxt_mp_release(ra->mem_pool, ra); } +nxt_inline void +nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_req_app_link_t *ra; + + if (rc->app_port != NULL) { + nxt_router_app_port_release(task, rc->app_port, 0, 1); + + rc->app_port = NULL; + } + + ra = rc->ra; + + if (ra != NULL) { + rc->ra = NULL; + ra->rc = NULL; + + nxt_thread_mutex_lock(&rc->app->mutex); + + if (ra->link.next != NULL) { + nxt_queue_remove(&ra->link); + + ra->link.next = NULL; + + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&rc->app->mutex); + } + + if (ra != NULL) { + nxt_router_ra_release(task, ra, ra->work.data); + } + + if (rc->app != NULL) { + nxt_router_app_use(task, rc->app, -1); + + rc->app = NULL; + } + + nxt_queue_remove(&rc->link); + + rc->conn = NULL; +} + + void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) { + nxt_pid_t pid; + nxt_buf_t *buf; nxt_event_engine_t *engine; nxt_remove_pid_msg_t *rp; rp = obj; - nxt_port_remove_pid_handler(task, &rp->msg); + buf = rp->msg.buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + + nxt_port_rpc_remove_peer(task, rp->msg.port, pid); engine = rp->work.data; @@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) goto fail; } - nxt_router_apps_sort(router, tmcf); + nxt_router_apps_sort(task, router, tmcf); nxt_router_engines_post(router, tmcf); @@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->max_workers = apcf.workers; app->timeout = apcf.timeout; app->live = 1; + app->max_pending_responses = 2; app->prepare_msg = nxt_app_prepare_msg[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); + + nxt_router_app_use(task, app, 1); } http = nxt_conf_get_path(conf, &http_path); @@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void -nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) +nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, + nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app); + nxt_debug(task, "about to free app '%V' %p", &app->name, app); app->live = 0; - if (nxt_router_app_free(NULL, app) != 0) { - continue; - } - - if (!nxt_queue_is_empty(&app->requests)) { - - nxt_thread_log_debug("app '%V' %p pending requests found", - &app->name, app); - continue; - } - do { - port = nxt_router_app_get_port(app, 0); + port = nxt_router_app_get_idle_port(app); if (port == NULL) { break; } - nxt_thread_log_debug("port %p send quit", port); + nxt_debug(task, "port %p send quit", port); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); - nxt_port_socket_write(&port->engine->task, port, - NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + nxt_port_use(task, port, -1); } while (1); + nxt_router_app_use(task, app, -1); + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); @@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data) ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_mp_release(port->mem_pool, port); + nxt_port_use(task, port, -1); return; } @@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) // TODO notify all apps + port->engine = task->thread->engine; nxt_mp_thread_adopt(port->mem_pool); - nxt_port_release(port); + nxt_port_use(task, port, -1); nxt_mp_thread_adopt(engine->mem_pool); nxt_mp_destroy(engine->mem_pool); @@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, last); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } if (b == NULL) { @@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_gen_error(task, rc->conn, 500, "Application terminated unexpectedly"); - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } @@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static void -nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { - nxt_start_worker_t *sw; + nxt_app_t *app; + nxt_port_t *port; + + app = data; + port = msg->new_port; - sw = data; + nxt_assert(app != NULL); + nxt_assert(port != NULL); - nxt_assert(sw != NULL); - nxt_assert(sw->app->pending_workers != 0); + port->app = app; - msg->new_port->app = sw->app; + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; - sw->app->workers++; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p got port %p", sw, msg->new_port); + app->pending_workers--; + app->workers++; + + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_app_release_port(task, msg->new_port, sw->app); + nxt_debug(task, "app '%V' %p new port ready", &app->name, app); - nxt_router_sw_release(task, sw); + nxt_router_app_port_release(task, port, 0, 0); } static void -nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { nxt_app_t *app; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra; - nxt_start_worker_t *sw; - sw = data; + app = data; - nxt_assert(sw != NULL); - nxt_assert(sw->app != NULL); - nxt_assert(sw->app->pending_workers != 0); + nxt_assert(app != NULL); - app = sw->app; + nxt_debug(task, "app '%V' %p start error", &app->name, app); + + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p error, failed to start app '%V'", - sw, &app->name); + app->pending_workers--; if (!nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_last(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (ra != NULL) { nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); nxt_router_ra_abort(task, ra, ra->work.data); } - nxt_router_sw_release(task, sw); + nxt_router_app_use(task, app, -1); } -static void -nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) +void +nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) { - size_t size; - uint32_t stream; - nxt_buf_t *b; - nxt_app_t *app; - nxt_port_t *main_port, *router_port, *app_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; - nxt_req_app_link_t *ra; - - sw = obj; - app = sw->app; + int c; - if (nxt_queue_is_empty(&app->requests)) { - ra = sw->ra; - app_port = nxt_router_app_get_port(app, ra->stream); + c = nxt_atomic_fetch_add(&app->use_count, i); - if (app_port != NULL) { - nxt_debug(task, "app '%V' %p process stream #%uD", - &app->name, app, ra->stream); + if (i < 0 && c == -i) { - ra->app_port = app_port; + nxt_assert(app->live == 0); + nxt_assert(app->workers == 0); + nxt_assert(app->pending_workers == 0); + nxt_assert(nxt_queue_is_empty(&app->requests) != 0); + nxt_assert(nxt_queue_is_empty(&app->ports) != 0); - nxt_router_process_http_request_mp(task, ra, app_port); - - nxt_router_ra_release(task, ra, ra->work.data); - nxt_router_sw_release(task, sw); - - return; - } - } - - nxt_queue_insert_tail(&app->requests, &sw->ra->link); - - if (app->workers + app->pending_workers >= app->max_workers) { - nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, " - "max_workers (%uD) reached", &app->name, app, - app->workers, app->pending_workers, app->max_workers); - - nxt_router_sw_release(task, sw); - - return; + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); } - - app->pending_workers++; - - nxt_debug(task, "sw %p send", sw); - - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - - size = app->name.length + 1 + app->conf.length; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); - - stream = nxt_port_rpc_register_handler(task, router_port, - nxt_router_sw_ready, - nxt_router_sw_error, - main_port->pid, sw); - - nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, router_port->id, b); } -static nxt_bool_t -nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) +nxt_inline nxt_port_t * +nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_port_t *port; + nxt_queue_link_t *lnk; - nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, - app->live, app->workers, app->pending_workers, - nxt_queue_is_empty(&app->requests)); + lnk = nxt_queue_first(&app->ports); + nxt_queue_remove(lnk); - if (app->live == 0 - && app->workers == 0 - && app->pending_workers == 0 - && nxt_queue_is_empty(&app->requests)) - { - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - return 1; - } + port->app_requests++; - if (app->live == 1 - && nxt_queue_is_empty(&app->requests) == 0 - && app->workers + app->pending_workers < app->max_workers) + if (app->live && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); + nxt_queue_insert_tail(&app->ports, lnk); - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + lnk->next = NULL; - nxt_router_sw_create(task, app, ra); + (*use_delta)--; } - return 0; + return port; } static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) +nxt_router_app_get_idle_port(nxt_app_t *app) { - nxt_port_t *port; - nxt_queue_link_t *lnk; + nxt_port_t *port; port = NULL; nxt_thread_mutex_lock(&app->mutex); - if (!nxt_queue_is_empty(&app->ports)) { - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); + nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - lnk->next = NULL; + if (port->app_requests > port->app_responses) { + port = NULL; - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + continue; + } - port->app_stream = stream; - } + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + break; + + } nxt_queue_loop; nxt_thread_mutex_unlock(&app->mutex); @@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) static void -nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) +nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_port_t *port; - nxt_work_t *work; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_req_app_link_t *ra; - port = obj; - app = data; + app = obj; + ra = data; nxt_assert(app != NULL); - nxt_assert(app == port->app); - nxt_assert(port->app_link.next == NULL); + nxt_assert(ra != NULL); + nxt_assert(ra->app_port != NULL); + nxt_debug(task, "app '%V' %p process next stream #%uD", + &app->name, app, ra->stream); - if (task->thread->engine != port->engine) { - work = &port->work; + nxt_router_process_http_request_mp(task, ra); +} - nxt_debug(task, "post release port to engine %p", port->engine); - work->next = NULL; - work->handler = nxt_router_app_release_port; - work->task = &port->engine->task; - work->obj = port; - work->data = app; +static void +nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response) +{ + int use_delta, ra_use_delta; + nxt_app_t *app; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; - nxt_event_engine_post(port->engine, work); + nxt_assert(port != NULL); + nxt_assert(port->app != NULL); - return; + app = port->app; + + use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; + + nxt_thread_mutex_lock(&app->mutex); + + port->app_requests -= request_failed; + port->app_responses += got_response; + + if (app->live != 0 && + port->pair[1] != -1 && + port->app_link.next == NULL && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) + { + nxt_queue_insert_tail(&app->ports, &port->app_link); + use_delta++; } - if (!nxt_queue_is_empty(&app->requests)) { + if (app->live != 0 && + !nxt_queue_is_empty(&app->ports) && + !nxt_queue_is_empty(&app->requests)) + { lnk = nxt_queue_first(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, ra->stream); + ra_use_delta = 1; + ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); - ra->app_port = port; - port->app_stream = ra->stream; + } else { + ra = NULL; + ra_use_delta = 0; + } - nxt_router_process_http_request_mp(task, ra, port); + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, ra); - return; + goto adjust_use; } - port->app_stream = 0; - + /* ? */ if (port->pair[1] == -1) { - nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", - &app->name, app, port->pid); - - app->workers--; - nxt_router_app_free(task, app); + nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", + &app->name, app, port, port->pid); - port->app = NULL; - - nxt_port_release(port); - - return; + goto adjust_use; } - if (!app->live) { + if (app->live == 0) { nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - return; + goto adjust_use; } nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); - nxt_thread_mutex_lock(&app->mutex); +adjust_use: - nxt_queue_insert_head(&app->ports, &port->app_link); + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } - nxt_thread_mutex_unlock(&app->mutex); + if (ra_use_delta != 0) { + nxt_port_use(task, ra->app_port, ra_use_delta); + } } -nxt_bool_t -nxt_router_app_remove_port(nxt_port_t *port) +void +nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) { nxt_app_t *app; - nxt_bool_t busy; + nxt_bool_t unchain, start_worker; app = port->app; - busy = port->app_stream != 0; - - if (app == NULL) { - nxt_thread_log_debug("port %p app remove, no app", port); - - nxt_assert(port->app_link.next == NULL); - return 1; - } + nxt_assert(app != NULL); nxt_thread_mutex_lock(&app->mutex); - if (port->app_link.next != NULL) { + unchain = port->app_link.next != NULL; + if (unchain) { nxt_queue_remove(&port->app_link); port->app_link.next = NULL; + } + app->workers--; + + start_worker = app->live != 0 && + nxt_queue_is_empty(&app->requests) == 0 && + app->workers + app->pending_workers < app->max_workers; + + if (start_worker) { + app->pending_workers++; } nxt_thread_mutex_unlock(&app->mutex); - if (busy == 0) { - nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port, - &app->name, app); - - app->workers--; - nxt_router_app_free(&port->engine->task, app); + nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); - return 1; + if (unchain) { + nxt_port_use(task, port, -1); } - nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " - "app stream #%uD", port, &app->name, app, - port->app_stream); - - return 0; + if (start_worker) { + nxt_router_start_worker(task, app); + } } static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { + int use_delta; + nxt_int_t res; nxt_app_t *app; + nxt_bool_t can_start_worker; nxt_conn_t *c; nxt_port_t *port; nxt_event_engine_t *engine; - nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; port = NULL; + use_delta = 1; c = ra->rc->conn; joint = c->listen->socket.data; @@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_ERROR; } + ra->rc->app = app; + + nxt_router_app_use(task, app, 1); + engine = task->thread->engine; nxt_timer_disable(engine, &c->read_timer); @@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_timer_add(engine, &c->read_timer, app->timeout); } - port = nxt_router_app_get_port(app, ra->stream); + nxt_thread_mutex_lock(&app->mutex); + + if (!nxt_queue_is_empty(&app->ports)) { + port = nxt_router_app_get_port_unsafe(app, &use_delta); + + can_start_worker = 0; + + } else { + nxt_queue_insert_tail(&app->requests, &ra->link); + + can_start_worker = (app->workers + app->pending_workers) < + app->max_workers; + if (can_start_worker) { + app->pending_workers++; + } + + port = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); if (port != NULL) { - nxt_debug(task, "already have port for app '%V'", &app->name); + nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); ra->app_port = port; + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } return NXT_OK; } - sw = nxt_router_sw_create(task, app, ra); + if (!can_start_worker) { + nxt_debug(task, "app '%V' %p too many running or pending workers", + &app->name, app); + + return NXT_AGAIN; + } + + res = nxt_router_start_worker(task, app); + + if (nxt_slow_path(res != NXT_OK)) { + nxt_router_gen_error(task, c, 500, "Failed to start worker"); - if (nxt_slow_path(sw == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate start worker struct"); return NXT_ERROR; } @@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - nxt_router_process_http_request_mp(task, ra, port); - - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_process_http_request_mp(task, ra); } static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, - nxt_port_t *port) +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) { + uint32_t request_failed; nxt_int_t res; - nxt_port_t *c_port, *reply_port; + nxt_port_t *port, *c_port, *reply_port; nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; @@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ nxt_assert(ra->rc != NULL); + nxt_assert(ra->app_port != NULL); + port = ra->app_port; reply_port = ra->reply_port; ap = ra->ap; c = ra->rc->conn; + request_failed = 1; + c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); if (nxt_slow_path(c_port != reply_port)) { @@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send reply port to application"); - return; + goto release_port; } nxt_process_connected_port_add(port->process, reply_port); @@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to prepare message for application"); - return; + goto release_port; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); + request_failed = 0; + res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send message to application"); - return; + goto release_port; } + +release_port: + + if (request_failed != 0) { + ra->app_port = 0; + } + + nxt_router_app_port_release(task, port, request_failed, 0); + + nxt_router_ra_release(task, ra, ra->work.data); } @@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); diff --git a/src/nxt_router.h b/src/nxt_router.h index f5c5f7aa..5056021e 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -87,6 +87,7 @@ struct nxt_app_s { uint32_t pending_workers; uint32_t workers; uint32_t max_workers; + uint32_t max_pending_responses; nxt_msec_t timeout; @@ -97,6 +98,8 @@ struct nxt_app_s { nxt_str_t conf; nxt_app_prepare_msg_t prepare_msg; + + nxt_atomic_t use_count; }; @@ -141,6 +144,7 @@ void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port); +void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port); +void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); #endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index deab980e..a96f4cea 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -553,7 +553,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_runtime_process_each(rt, process) { - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); } nxt_runtime_process_loop; @@ -1725,13 +1725,16 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) void -nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) +nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) { nxt_port_t *port; + nxt_runtime_t *rt; nxt_lvlhsh_query_t lhq; nxt_assert(process->registered == 0); + rt = task->thread->runtime; + nxt_runtime_process_lhq_pid(&lhq, &process->pid); lhq.replace = 0; @@ -1753,7 +1756,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) port->pid = process->pid; - nxt_runtime_port_add(rt, port); + nxt_runtime_port_add(task, port); } nxt_process_port_loop; @@ -1809,9 +1812,12 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) void -nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process) { - nxt_port_t *port; + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; if (process->port_cleanups == 0) { if (process->registered == 1) { @@ -1823,9 +1829,9 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } else { nxt_process_port_each(process, port) { - nxt_runtime_port_remove(rt, port); + nxt_port_close(task, port); - nxt_port_release(port); + nxt_runtime_port_remove(task, port); } nxt_process_port_loop; } @@ -1851,22 +1857,34 @@ nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) void -nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) +nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) { + nxt_runtime_t *rt; + + rt = task->thread->runtime; + nxt_port_hash_add(&rt->ports, port); rt->port_by_type[port->type] = port; + + nxt_port_use(task, port, 1); } void -nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) +nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port) { + nxt_runtime_t *rt; + + rt = task->thread->runtime; + nxt_port_hash_remove(&rt->ports, port); if (rt->port_by_type[port->type] == port) { rt->port_by_type[port->type] = NULL; } + + nxt_port_use(task, port, -1); } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index d0decfd2..5aa897dc 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -102,11 +102,11 @@ nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process); +void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); +void nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe); @@ -115,9 +115,9 @@ nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each(&rt->processes, lhe) -void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port); +void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); -void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port); +void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, nxt_port_id_t port_id); |