diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_application.h | 6 | ||||
-rw-r--r-- | src/nxt_conn.h | 16 | ||||
-rw-r--r-- | src/nxt_event_engine.c | 4 | ||||
-rw-r--r-- | src/nxt_master_process.c | 11 | ||||
-rw-r--r-- | src/nxt_port.c | 78 | ||||
-rw-r--r-- | src/nxt_port.h | 5 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 24 | ||||
-rw-r--r-- | src/nxt_process.c | 27 | ||||
-rw-r--r-- | src/nxt_process.h | 6 | ||||
-rw-r--r-- | src/nxt_router.c | 250 | ||||
-rw-r--r-- | src/nxt_router.h | 8 | ||||
-rw-r--r-- | src/nxt_runtime.c | 25 |
12 files changed, 298 insertions, 162 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index 449f1158..460ef8ce 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -100,11 +100,13 @@ typedef struct { } nxt_app_request_t; -typedef struct { +typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t; + +struct nxt_app_parse_ctx_s { nxt_app_request_t r; nxt_http_request_parse_t parser; nxt_mp_t *mem_pool; -} nxt_app_parse_ctx_t; +}; nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); diff --git a/src/nxt_conn.h b/src/nxt_conn.h index fa9f5b9d..c08ed376 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -183,15 +183,17 @@ struct nxt_conn_s { typedef uint32_t nxt_req_id_t; +typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t; typedef struct { - nxt_req_id_t req_id; - nxt_conn_t *conn; - nxt_port_t *app_port; - nxt_port_t *reply_port; - - nxt_queue_link_t link; /* for nxt_conn_t.requests */ - nxt_queue_link_t app_link; /* for nxt_app_t.requests */ + nxt_req_id_t req_id; + nxt_conn_t *conn; + nxt_port_t *app_port; + nxt_port_t *reply_port; + nxt_app_parse_ctx_t *ap; + + nxt_queue_link_t link; /* for nxt_conn_t.requests */ + nxt_queue_link_t app_link; /* for nxt_app_t.requests */ } nxt_req_conn_link_t; diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index 9171bcd6..b91f0e50 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -38,6 +38,8 @@ nxt_event_engine_create(nxt_task_t *task, return NULL; } + nxt_debug(task, "create engine %p", engine); + thread = task->thread; engine->task.thread = thread; @@ -440,6 +442,8 @@ nxt_event_engine_change(nxt_event_engine_t *engine, void nxt_event_engine_free(nxt_event_engine_t *engine) { + nxt_thread_log_debug("free engine %p", engine); + nxt_event_engine_signal_pipe_free(engine); nxt_free(engine->signals); diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 44cf8d8b..1a8b5ed8 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -207,12 +207,13 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - port = nxt_process_port_new(rt, process, nxt_port_get_next_id(), - NXT_PROCESS_MASTER); + port = nxt_port_new(0, nxt_pid, NXT_PROCESS_MASTER); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } + nxt_process_port_add(process, port); + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -373,15 +374,17 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process->init = init; - port = nxt_process_port_new(rt, process, 0, init->type); + port = nxt_port_new(0, 0, init->type); if (nxt_slow_path(port == NULL)) { nxt_runtime_process_destroy(rt, process); return NXT_ERROR; } + nxt_process_port_add(process, port); + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_mp_free(rt->mem_pool, port); + nxt_mp_release(port->mem_pool, port); nxt_runtime_process_destroy(rt, process); return ret; } diff --git a/src/nxt_port.c b/src/nxt_port.c index 3abc4125..4c2561d3 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -7,11 +7,72 @@ #include <nxt_main.h> #include <nxt_runtime.h> #include <nxt_port.h> +#include <nxt_router.h> static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -static nxt_atomic_uint_t nxt_port_last_id; +static nxt_atomic_uint_t nxt_port_last_id = 1; + + +nxt_port_t * +nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type) +{ + nxt_mp_t *mp; + nxt_port_t *port; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); + + if (nxt_fast_path(port != NULL)) { + port->id = id; + port->pid = pid; + port->type = type; + port->mem_pool = mp; + + nxt_queue_init(&port->messages); + + } else { + nxt_mp_destroy(mp); + } + + return port; +} + + +nxt_bool_t +nxt_port_release(nxt_port_t *port) +{ + if (port->pair[0] != -1) { + nxt_fd_close(port->pair[0]); + port->pair[0] = -1; + } + + if (port->pair[1] != -1) { + nxt_fd_close(port->pair[1]); + port->pair[1] = -1; + } + + if (port->type == NXT_PROCESS_WORKER) { + if (nxt_router_app_remove_port(port) == 0) { + return 0; + } + } + + if (port->link.next != NULL) { + nxt_process_port_remove(port); + } + + nxt_mp_release(port->mem_pool, port); + + return 1; +} + nxt_port_id_t nxt_port_get_next_id() @@ -32,7 +93,6 @@ nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handler_t *handlers) { port->pid = nxt_pid; - port->engine = task->thread->engine; port->handler = nxt_port_handler; port->data = handlers; @@ -151,7 +211,6 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_mp_t *mp; nxt_port_t *port; nxt_process_t *process; nxt_runtime_t *rt; @@ -180,26 +239,19 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - port = nxt_process_port_new(rt, process, new_port_msg->id, - new_port_msg->type); + port = nxt_port_new(new_port_msg->id, new_port_msg->pid, + new_port_msg->type); if (nxt_slow_path(port == NULL)) { return; } - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return; - } - - port->mem_pool = mp; + nxt_process_port_add(process, port); port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; - nxt_queue_init(&port->messages); - port->socket.task = task; nxt_runtime_port_add(rt, port); diff --git a/src/nxt_port.h b/src/nxt_port.h index 603540e2..d6156015 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -88,6 +88,7 @@ struct nxt_port_s { nxt_pid_t pid; nxt_process_type_t type; + nxt_work_t work; }; @@ -110,6 +111,10 @@ typedef union { } nxt_port_data_t; +nxt_port_t *nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, + nxt_process_type_t type); +nxt_bool_t nxt_port_release(nxt_port_t *port); + nxt_port_id_t nxt_port_get_next_id(void); void nxt_port_reset_next_id(void); diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index a0c6fea3..31572e54 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -19,7 +19,6 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) { - nxt_mp_t *mp; nxt_int_t sndbuf, rcvbuf, size; nxt_socket_t snd, rcv; @@ -28,15 +27,6 @@ nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) port->pair[0] = -1; port->pair[1] = -1; - nxt_queue_init(&port->messages); - - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NXT_ERROR; - } - - port->mem_pool = mp; - if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { goto socketpair_fail; } @@ -100,8 +90,6 @@ getsockopt_fail: socketpair_fail: - nxt_mp_destroy(port->mem_pool); - return NXT_ERROR; } @@ -172,14 +160,9 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) { - nxt_queue_link_t *link; nxt_port_send_msg_t *msg; - for (link = nxt_queue_first(&port->messages); - link != nxt_queue_tail(&port->messages); - link = nxt_queue_next(link)) - { - msg = (nxt_port_send_msg_t *) link; + nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { if (msg->port_msg.stream == stream && msg->port_msg.reply_port == reply_port) { @@ -191,7 +174,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, return NXT_OK; } - } + + } nxt_queue_loop; msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { @@ -256,7 +240,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) return; } - msg = (nxt_port_send_msg_t *) link; + msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); diff --git a/src/nxt_process.c b/src/nxt_process.c index 5899ce30..1832640d 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -563,30 +563,11 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) } -nxt_port_t * -nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process, - nxt_port_id_t id, nxt_process_type_t type) +void +nxt_process_port_add(nxt_process_t *process, nxt_port_t *port) { - size_t size; - nxt_port_t *port; - - size = sizeof(nxt_port_t); - if (size == NXT_PROCESS_WORKER) { - size += sizeof(nxt_work_t); - } - - port = nxt_mp_zalloc(rt->mem_pool, size); - - if (nxt_fast_path(port != NULL)) { - port->id = id; - port->pid = process->pid; - port->process = process; - port->type = type; - - nxt_process_port_add(process, port); - } - - return port; + port->process = process; + nxt_queue_insert_tail(&process->ports, &port->link); } diff --git a/src/nxt_process.h b/src/nxt_process.h index c49d2be3..a647c00a 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -82,17 +82,13 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns); NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp); -NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_runtime_t *rt, - nxt_process_t *process, nxt_port_id_t id, nxt_process_type_t type); - #define nxt_process_port_remove(port) \ nxt_queue_remove(&port->link) #define nxt_process_port_first(process) \ nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link) -#define nxt_process_port_add(process, port) \ - nxt_queue_insert_tail(&process->ports, &port->link) +NXT_EXPORT void nxt_process_port_add(nxt_process_t *process, nxt_port_t *port); #define nxt_process_port_each(process, port) \ nxt_queue_each(port, &process->ports, nxt_port_t, link) diff --git a/src/nxt_router.c b/src/nxt_router.c index 993d623c..5ee70377 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -99,6 +99,7 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); +static nxt_bool_t nxt_router_app_free(nxt_app_t *app); static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); @@ -189,6 +190,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_fast_path(sw != NULL)) { msg->new_port->app = sw->app; + sw->app->workers++; + + nxt_assert(sw->app->pending_workers != 0); + + sw->app->pending_workers--; + nxt_router_app_release_port(task, msg->new_port, sw->app); sw->work.handler = nxt_router_sw_release; @@ -754,25 +761,25 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) static nxt_socket_conf_t * nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) { - nxt_socket_conf_t *conf; + nxt_socket_conf_t *skcf; - conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); - if (nxt_slow_path(conf == NULL)) { + skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); + if (nxt_slow_path(skcf == NULL)) { return NULL; } - conf->sockaddr = sa; + skcf->sockaddr = sa; - conf->listen.sockaddr = sa; - conf->listen.socklen = sa->socklen; - conf->listen.address_length = sa->length; + skcf->listen.sockaddr = sa; + skcf->listen.socklen = sa->socklen; + skcf->listen.address_length = sa->length; - conf->listen.socket = -1; - conf->listen.backlog = NXT_LISTEN_BACKLOG; - conf->listen.flags = NXT_NONBLOCK; - conf->listen.read_after_accept = 1; + skcf->listen.socket = -1; + skcf->listen.backlog = NXT_LISTEN_BACKLOG; + skcf->listen.flags = NXT_NONBLOCK; + skcf->listen.read_after_accept = 1; - return conf; + return skcf; } @@ -1179,17 +1186,33 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - // TODO RELEASE APP -#if 0 - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); -#endif + app->live = 0; + + if (nxt_router_app_free(app) != 0) { + continue; + } + + if (nxt_queue_is_empty(&app->requests)) { + + do { + port = nxt_router_app_get_port(app); + if (port == NULL) { + break; + } + + nxt_port_socket_write(&port->engine->task, port, + NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + } while (1); + + } + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); @@ -1266,21 +1289,17 @@ nxt_router_thread_start(void *data) engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); - port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t)); + port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER); if (nxt_slow_path(port == NULL)) { return; } - port->id = nxt_port_get_next_id(); - port->pid = nxt_pid; - ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_mp_release(port->mem_pool, port); return; } - port->type = NXT_PROCESS_ROUTER; - engine->port = port; nxt_port_enable(task, port, nxt_router_app_port_handlers); @@ -1391,6 +1410,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) nxt_fd_event_delete(engine, &listen->socket); + nxt_debug(task, "engine %p: listen socket delete: %d", engine, + listen->socket.fd); + listen->timer.handler = nxt_router_listen_socket_close; listen->timer.work_queue = &engine->fast_work_queue; @@ -1414,6 +1436,9 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) listen = nxt_timer_data(timer, nxt_listen_event_t, timer); joint = listen->socket.data; + nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, + listen->socket.fd); + nxt_queue_remove(&listen->link); /* 'task' refers to listen->task and we cannot use after nxt_free() */ @@ -1439,6 +1464,9 @@ nxt_router_listen_socket_release(nxt_task_t *task, nxt_thread_spin_lock(lock); + nxt_debug(task, "engine %p: listen socket release: rtsk->count %D", + task->thread->engine, rtsk->count); + if (--rtsk->count != 0) { rtsk = NULL; } @@ -1463,7 +1491,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_router_conf_t *rtcf; nxt_thread_spinlock_t *lock; - nxt_debug(task, "conf joint count: %D", joint->count); + nxt_debug(task, "conf joint %p count: %D", joint, joint->count); if (--joint->count != 0) { return; @@ -1477,6 +1505,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_lock(lock); + nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, + rtcf, rtcf->count); + if (--skcf->count != 0) { rtcf = NULL; @@ -1531,18 +1562,10 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) // TODO notify all apps - if (port->pair[0] != -1) { - nxt_fd_close(port->pair[0]); - } - - if (port->pair[1] != -1) { - nxt_fd_close(port->pair[1]); - } - - if (port->mem_pool) { - nxt_mp_destroy(port->mem_pool); - } + nxt_mp_thread_adopt(port->mem_pool); + nxt_port_release(port); + nxt_mp_thread_adopt(engine->mem_pool); nxt_mp_destroy(engine->mem_pool); nxt_event_engine_free(engine); @@ -1683,17 +1706,17 @@ nxt_router_text_by_code(int code) } } -static void -nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...) + +static nxt_buf_t * +nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, + const char* fmt, va_list args) { - va_list args; - nxt_buf_t *b, *last; - const char *msg; + nxt_buf_t *b, *last; + const char *msg; - b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0); + b = nxt_buf_mem_ts_alloc(task, mp, 16384); if (nxt_slow_path(b == NULL)) { - /* TODO pogorevaTb */ + return NULL; } b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, @@ -1704,19 +1727,38 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, msg = (const char *) b->mem.free; - va_start(args, fmt); b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); - va_end(args); nxt_log_alert(task->log, "error %d: %s", code, msg); - last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + last = nxt_buf_mem_ts_alloc(task, mp, 0); + if (nxt_slow_path(last == NULL)) { - /* TODO pogorevaTb */ + nxt_mp_release(mp, b); + return NULL; } + nxt_buf_set_sync(last); + nxt_buf_set_last(last); + nxt_buf_chain_add(&b, last); + return b; +} + + + +static void +nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...) +{ + va_list args; + nxt_buf_t *b; + + va_start(args, fmt); + b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); + va_end(args); + if (c->write == NULL) { c->write = b; c->write_state = &nxt_router_conn_write_state; @@ -1742,6 +1784,19 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) sw = obj; app = sw->app; + if (app->workers + app->pending_workers >= app->max_workers) { + sw->work.handler = nxt_router_sw_release; + + nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD " + "release to %p", sw->stream, sw->work.data); + + nxt_event_engine_post(sw->work.data, &sw->work); + + return; + } + + app->pending_workers++; + nxt_debug(task, "send sw #%uD", sw->stream); nxt_router_sw_add(task, nxt_router, sw); @@ -1758,6 +1813,23 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) } +static nxt_bool_t +nxt_router_app_free(nxt_app_t *app) +{ + if (app->live == 0 && app->workers == 0 && + app->pending_workers == 0 && + nxt_queue_is_empty(&app->requests)) { + + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); + + return 1; + } + + return 0; +} + + static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app) { @@ -1789,6 +1861,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) nxt_app_t *app; nxt_port_t *port; nxt_work_t *work; + nxt_process_t *process; nxt_queue_link_t *lnk; nxt_req_conn_link_t *rc; @@ -1801,7 +1874,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) if (task->thread->engine != port->engine) { - work = (nxt_work_t *) (port + 1); + work = &port->work; nxt_debug(task, "post release port to engine %p", port->engine); @@ -1822,7 +1895,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); - nxt_debug(task, "process request #%uxD", rc->req_id); + nxt_debug(task, "app '%V' process next request #%uxD", + &app->name, rc->req_id); rc->app_port = port; @@ -1831,7 +1905,37 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) return; } - nxt_debug(task, "app requests queue is empty"); + if (port->pair[1] == -1) { + nxt_debug(task, "app '%V' port already closed (pid %PI dead?)", + &app->name, port->pid); + + app->workers--; + nxt_router_app_free(app); + + port->app = NULL; + process = port->process; + + nxt_port_release(port); + + if (nxt_queue_is_empty(&process->ports)) { + nxt_runtime_process_destroy(task->thread->runtime, process); + } + + return; + } + + if (!app->live) { + nxt_debug(task, "app '%V' is not alive, send QUIT to port", + &app->name); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + return; + } + + nxt_debug(task, "app '%V' requests queue is empty, keep the port", + &app->name); nxt_thread_mutex_lock(&app->mutex); @@ -1841,29 +1945,42 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) } -void +nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port) { - nxt_app_t *app; - - if (port->app_link.next == NULL) { - return; - } + nxt_app_t *app; + nxt_bool_t busy; app = port->app; + busy = 1; + + if (app == NULL) { + nxt_assert(port->app_link.next == NULL); -#if (NXT_DEBUG) - if (nxt_slow_path(app == NULL)) { - nxt_abort(); + return 1; } -#endif nxt_thread_mutex_lock(&app->mutex); - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + if (port->app_link.next != NULL) { + + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + busy = 0; + } nxt_thread_mutex_unlock(&app->mutex); + + if (busy == 0) { + + app->workers--; + nxt_router_app_free(app); + + return 1; + } + + return 0; } @@ -1894,11 +2011,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) port = nxt_router_app_get_port(app); if (port != NULL) { + nxt_debug(task, "already have port for app '%V'", &app->name); + rc->app_port = port; return NXT_OK; } - sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); if (nxt_slow_path(sw == NULL)) { @@ -2069,6 +2187,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } + rc->ap = ap; + nxt_event_engine_request_add(engine, rc); nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", @@ -2104,7 +2224,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, } reply_port = rc->reply_port; - ap = rc->conn->socket.data; + ap = rc->ap; port_mp = port->mem_pool; port->mem_pool = mp; diff --git a/src/nxt_router.h b/src/nxt_router.h index ea7a54bc..5724c192 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -69,11 +69,13 @@ typedef struct { struct nxt_app_s { - nxt_thread_mutex_t mutex; - nxt_queue_t ports; + nxt_thread_mutex_t mutex; /* Protects ports queue. */ + nxt_queue_t ports; /* of nxt_port_t.app_link */ + nxt_queue_t requests; /* of nxt_req_conn_link_t */ nxt_str_t name; + uint32_t pending_workers; uint32_t workers; uint32_t max_workers; @@ -123,6 +125,6 @@ typedef struct { void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_router_app_remove_port(nxt_port_t *port); +nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port); #endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 6ddf27c1..8b216337 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1542,7 +1542,6 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) lhq.proto = &lvlhsh_processes_proto; if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { - nxt_thread_log_debug("process %PI found", pid); return lhq.value; } @@ -1662,9 +1661,13 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) nxt_runtime_port_remove(rt, port); + nxt_port_release(port); + } nxt_process_port_loop; - nxt_runtime_process_destroy(rt, process); + if (nxt_queue_is_empty(&process->ports)) { + nxt_runtime_process_destroy(rt, process); + } break; @@ -1709,24 +1712,6 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) if (rt->port_by_type[port->type] == port) { rt->port_by_type[port->type] = NULL; } - - if (port->pair[0] != -1) { - nxt_fd_close(port->pair[0]); - } - - if (port->pair[1] != -1) { - nxt_fd_close(port->pair[1]); - } - - if (port->type == NXT_PROCESS_WORKER) { - nxt_router_app_remove_port(port); - } - - if (port->mem_pool) { - nxt_mp_destroy(port->mem_pool); - } - - nxt_mp_free(rt->mem_pool, port); } |