diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-18 00:21:14 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-18 00:21:14 +0300 |
commit | eb675f2d78178b2cdd54d934022f9b739bfa8952 (patch) | |
tree | 0b072c752fff35cd5c4498cded7016a5738caa58 /src/nxt_router.c | |
parent | 47b359388cdf6348238e7fc05770426448049189 (diff) | |
download | unit-eb675f2d78178b2cdd54d934022f9b739bfa8952.tar.gz unit-eb675f2d78178b2cdd54d934022f9b739bfa8952.tar.bz2 |
Port allocation and destroy changed. Worker process stop introduced.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 250 |
1 files changed, 185 insertions, 65 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 993d623c..5ee70377 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -99,6 +99,7 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); +static nxt_bool_t nxt_router_app_free(nxt_app_t *app); static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); @@ -189,6 +190,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_fast_path(sw != NULL)) { msg->new_port->app = sw->app; + sw->app->workers++; + + nxt_assert(sw->app->pending_workers != 0); + + sw->app->pending_workers--; + nxt_router_app_release_port(task, msg->new_port, sw->app); sw->work.handler = nxt_router_sw_release; @@ -754,25 +761,25 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) static nxt_socket_conf_t * nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) { - nxt_socket_conf_t *conf; + nxt_socket_conf_t *skcf; - conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); - if (nxt_slow_path(conf == NULL)) { + skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); + if (nxt_slow_path(skcf == NULL)) { return NULL; } - conf->sockaddr = sa; + skcf->sockaddr = sa; - conf->listen.sockaddr = sa; - conf->listen.socklen = sa->socklen; - conf->listen.address_length = sa->length; + skcf->listen.sockaddr = sa; + skcf->listen.socklen = sa->socklen; + skcf->listen.address_length = sa->length; - conf->listen.socket = -1; - conf->listen.backlog = NXT_LISTEN_BACKLOG; - conf->listen.flags = NXT_NONBLOCK; - conf->listen.read_after_accept = 1; + skcf->listen.socket = -1; + skcf->listen.backlog = NXT_LISTEN_BACKLOG; + skcf->listen.flags = NXT_NONBLOCK; + skcf->listen.read_after_accept = 1; - return conf; + return skcf; } @@ -1179,17 +1186,33 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - // TODO RELEASE APP -#if 0 - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); -#endif + app->live = 0; + + if (nxt_router_app_free(app) != 0) { + continue; + } + + if (nxt_queue_is_empty(&app->requests)) { + + do { + port = nxt_router_app_get_port(app); + if (port == NULL) { + break; + } + + nxt_port_socket_write(&port->engine->task, port, + NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + } while (1); + + } + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); @@ -1266,21 +1289,17 @@ nxt_router_thread_start(void *data) engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); - port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t)); + port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER); if (nxt_slow_path(port == NULL)) { return; } - port->id = nxt_port_get_next_id(); - port->pid = nxt_pid; - ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_mp_release(port->mem_pool, port); return; } - port->type = NXT_PROCESS_ROUTER; - engine->port = port; nxt_port_enable(task, port, nxt_router_app_port_handlers); @@ -1391,6 +1410,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) nxt_fd_event_delete(engine, &listen->socket); + nxt_debug(task, "engine %p: listen socket delete: %d", engine, + listen->socket.fd); + listen->timer.handler = nxt_router_listen_socket_close; listen->timer.work_queue = &engine->fast_work_queue; @@ -1414,6 +1436,9 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) listen = nxt_timer_data(timer, nxt_listen_event_t, timer); joint = listen->socket.data; + nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, + listen->socket.fd); + nxt_queue_remove(&listen->link); /* 'task' refers to listen->task and we cannot use after nxt_free() */ @@ -1439,6 +1464,9 @@ nxt_router_listen_socket_release(nxt_task_t *task, nxt_thread_spin_lock(lock); + nxt_debug(task, "engine %p: listen socket release: rtsk->count %D", + task->thread->engine, rtsk->count); + if (--rtsk->count != 0) { rtsk = NULL; } @@ -1463,7 +1491,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_router_conf_t *rtcf; nxt_thread_spinlock_t *lock; - nxt_debug(task, "conf joint count: %D", joint->count); + nxt_debug(task, "conf joint %p count: %D", joint, joint->count); if (--joint->count != 0) { return; @@ -1477,6 +1505,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_lock(lock); + nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, + rtcf, rtcf->count); + if (--skcf->count != 0) { rtcf = NULL; @@ -1531,18 +1562,10 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) // TODO notify all apps - if (port->pair[0] != -1) { - nxt_fd_close(port->pair[0]); - } - - if (port->pair[1] != -1) { - nxt_fd_close(port->pair[1]); - } - - if (port->mem_pool) { - nxt_mp_destroy(port->mem_pool); - } + nxt_mp_thread_adopt(port->mem_pool); + nxt_port_release(port); + nxt_mp_thread_adopt(engine->mem_pool); nxt_mp_destroy(engine->mem_pool); nxt_event_engine_free(engine); @@ -1683,17 +1706,17 @@ nxt_router_text_by_code(int code) } } -static void -nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...) + +static nxt_buf_t * +nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, + const char* fmt, va_list args) { - va_list args; - nxt_buf_t *b, *last; - const char *msg; + nxt_buf_t *b, *last; + const char *msg; - b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0); + b = nxt_buf_mem_ts_alloc(task, mp, 16384); if (nxt_slow_path(b == NULL)) { - /* TODO pogorevaTb */ + return NULL; } b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, @@ -1704,19 +1727,38 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, msg = (const char *) b->mem.free; - va_start(args, fmt); b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); - va_end(args); nxt_log_alert(task->log, "error %d: %s", code, msg); - last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + last = nxt_buf_mem_ts_alloc(task, mp, 0); + if (nxt_slow_path(last == NULL)) { - /* TODO pogorevaTb */ + nxt_mp_release(mp, b); + return NULL; } + nxt_buf_set_sync(last); + nxt_buf_set_last(last); + nxt_buf_chain_add(&b, last); + return b; +} + + + +static void +nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...) +{ + va_list args; + nxt_buf_t *b; + + va_start(args, fmt); + b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); + va_end(args); + if (c->write == NULL) { c->write = b; c->write_state = &nxt_router_conn_write_state; @@ -1742,6 +1784,19 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) sw = obj; app = sw->app; + if (app->workers + app->pending_workers >= app->max_workers) { + sw->work.handler = nxt_router_sw_release; + + nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD " + "release to %p", sw->stream, sw->work.data); + + nxt_event_engine_post(sw->work.data, &sw->work); + + return; + } + + app->pending_workers++; + nxt_debug(task, "send sw #%uD", sw->stream); nxt_router_sw_add(task, nxt_router, sw); @@ -1758,6 +1813,23 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) } +static nxt_bool_t +nxt_router_app_free(nxt_app_t *app) +{ + if (app->live == 0 && app->workers == 0 && + app->pending_workers == 0 && + nxt_queue_is_empty(&app->requests)) { + + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); + + return 1; + } + + return 0; +} + + static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app) { @@ -1789,6 +1861,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) nxt_app_t *app; nxt_port_t *port; nxt_work_t *work; + nxt_process_t *process; nxt_queue_link_t *lnk; nxt_req_conn_link_t *rc; @@ -1801,7 +1874,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) if (task->thread->engine != port->engine) { - work = (nxt_work_t *) (port + 1); + work = &port->work; nxt_debug(task, "post release port to engine %p", port->engine); @@ -1822,7 +1895,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); - nxt_debug(task, "process request #%uxD", rc->req_id); + nxt_debug(task, "app '%V' process next request #%uxD", + &app->name, rc->req_id); rc->app_port = port; @@ -1831,7 +1905,37 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) return; } - nxt_debug(task, "app requests queue is empty"); + if (port->pair[1] == -1) { + nxt_debug(task, "app '%V' port already closed (pid %PI dead?)", + &app->name, port->pid); + + app->workers--; + nxt_router_app_free(app); + + port->app = NULL; + process = port->process; + + nxt_port_release(port); + + if (nxt_queue_is_empty(&process->ports)) { + nxt_runtime_process_destroy(task->thread->runtime, process); + } + + return; + } + + if (!app->live) { + nxt_debug(task, "app '%V' is not alive, send QUIT to port", + &app->name); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + return; + } + + nxt_debug(task, "app '%V' requests queue is empty, keep the port", + &app->name); nxt_thread_mutex_lock(&app->mutex); @@ -1841,29 +1945,42 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) } -void +nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port) { - nxt_app_t *app; - - if (port->app_link.next == NULL) { - return; - } + nxt_app_t *app; + nxt_bool_t busy; app = port->app; + busy = 1; + + if (app == NULL) { + nxt_assert(port->app_link.next == NULL); -#if (NXT_DEBUG) - if (nxt_slow_path(app == NULL)) { - nxt_abort(); + return 1; } -#endif nxt_thread_mutex_lock(&app->mutex); - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + if (port->app_link.next != NULL) { + + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + busy = 0; + } nxt_thread_mutex_unlock(&app->mutex); + + if (busy == 0) { + + app->workers--; + nxt_router_app_free(app); + + return 1; + } + + return 0; } @@ -1894,11 +2011,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) port = nxt_router_app_get_port(app); if (port != NULL) { + nxt_debug(task, "already have port for app '%V'", &app->name); + rc->app_port = port; return NXT_OK; } - sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); if (nxt_slow_path(sw == NULL)) { @@ -2069,6 +2187,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } + rc->ap = ap; + nxt_event_engine_request_add(engine, rc); nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", @@ -2104,7 +2224,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, } reply_port = rc->reply_port; - ap = rc->conn->socket.data; + ap = rc->ap; port_mp = port->mem_pool; port->mem_pool = mp; |