summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-18 00:21:14 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-18 00:21:14 +0300
commiteb675f2d78178b2cdd54d934022f9b739bfa8952 (patch)
tree0b072c752fff35cd5c4498cded7016a5738caa58 /src/nxt_router.c
parent47b359388cdf6348238e7fc05770426448049189 (diff)
downloadunit-eb675f2d78178b2cdd54d934022f9b739bfa8952.tar.gz
unit-eb675f2d78178b2cdd54d934022f9b739bfa8952.tar.bz2
Port allocation and destroy changed. Worker process stop introduced.
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c250
1 files changed, 185 insertions, 65 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 993d623c..5ee70377 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -99,6 +99,7 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
+static nxt_bool_t nxt_router_app_free(nxt_app_t *app);
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
void *data);
@@ -189,6 +190,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_fast_path(sw != NULL)) {
msg->new_port->app = sw->app;
+ sw->app->workers++;
+
+ nxt_assert(sw->app->pending_workers != 0);
+
+ sw->app->pending_workers--;
+
nxt_router_app_release_port(task, msg->new_port, sw->app);
sw->work.handler = nxt_router_sw_release;
@@ -754,25 +761,25 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
{
- nxt_socket_conf_t *conf;
+ nxt_socket_conf_t *skcf;
- conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
- if (nxt_slow_path(conf == NULL)) {
+ skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
+ if (nxt_slow_path(skcf == NULL)) {
return NULL;
}
- conf->sockaddr = sa;
+ skcf->sockaddr = sa;
- conf->listen.sockaddr = sa;
- conf->listen.socklen = sa->socklen;
- conf->listen.address_length = sa->length;
+ skcf->listen.sockaddr = sa;
+ skcf->listen.socklen = sa->socklen;
+ skcf->listen.address_length = sa->length;
- conf->listen.socket = -1;
- conf->listen.backlog = NXT_LISTEN_BACKLOG;
- conf->listen.flags = NXT_NONBLOCK;
- conf->listen.read_after_accept = 1;
+ skcf->listen.socket = -1;
+ skcf->listen.backlog = NXT_LISTEN_BACKLOG;
+ skcf->listen.flags = NXT_NONBLOCK;
+ skcf->listen.read_after_accept = 1;
- return conf;
+ return skcf;
}
@@ -1179,17 +1186,33 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
+ nxt_app_t *app;
+ nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- // TODO RELEASE APP
-#if 0
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
-#endif
+ app->live = 0;
+
+ if (nxt_router_app_free(app) != 0) {
+ continue;
+ }
+
+ if (nxt_queue_is_empty(&app->requests)) {
+
+ do {
+ port = nxt_router_app_get_port(app);
+ if (port == NULL) {
+ break;
+ }
+
+ nxt_port_socket_write(&port->engine->task, port,
+ NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+ } while (1);
+
+ }
+
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1266,21 +1289,17 @@ nxt_router_thread_start(void *data)
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
- port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
+ port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER);
if (nxt_slow_path(port == NULL)) {
return;
}
- port->id = nxt_port_get_next_id();
- port->pid = nxt_pid;
-
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_mp_release(port->mem_pool, port);
return;
}
- port->type = NXT_PROCESS_ROUTER;
-
engine->port = port;
nxt_port_enable(task, port, nxt_router_app_port_handlers);
@@ -1391,6 +1410,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
nxt_fd_event_delete(engine, &listen->socket);
+ nxt_debug(task, "engine %p: listen socket delete: %d", engine,
+ listen->socket.fd);
+
listen->timer.handler = nxt_router_listen_socket_close;
listen->timer.work_queue = &engine->fast_work_queue;
@@ -1414,6 +1436,9 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = listen->socket.data;
+ nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
+ listen->socket.fd);
+
nxt_queue_remove(&listen->link);
/* 'task' refers to listen->task and we cannot use after nxt_free() */
@@ -1439,6 +1464,9 @@ nxt_router_listen_socket_release(nxt_task_t *task,
nxt_thread_spin_lock(lock);
+ nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
+ task->thread->engine, rtsk->count);
+
if (--rtsk->count != 0) {
rtsk = NULL;
}
@@ -1463,7 +1491,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_router_conf_t *rtcf;
nxt_thread_spinlock_t *lock;
- nxt_debug(task, "conf joint count: %D", joint->count);
+ nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
if (--joint->count != 0) {
return;
@@ -1477,6 +1505,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_lock(lock);
+ nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
+ rtcf, rtcf->count);
+
if (--skcf->count != 0) {
rtcf = NULL;
@@ -1531,18 +1562,10 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
- if (port->pair[0] != -1) {
- nxt_fd_close(port->pair[0]);
- }
-
- if (port->pair[1] != -1) {
- nxt_fd_close(port->pair[1]);
- }
-
- if (port->mem_pool) {
- nxt_mp_destroy(port->mem_pool);
- }
+ nxt_mp_thread_adopt(port->mem_pool);
+ nxt_port_release(port);
+ nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine);
@@ -1683,17 +1706,17 @@ nxt_router_text_by_code(int code)
}
}
-static void
-nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
- const char* fmt, ...)
+
+static nxt_buf_t *
+nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
+ const char* fmt, va_list args)
{
- va_list args;
- nxt_buf_t *b, *last;
- const char *msg;
+ nxt_buf_t *b, *last;
+ const char *msg;
- b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
+ b = nxt_buf_mem_ts_alloc(task, mp, 16384);
if (nxt_slow_path(b == NULL)) {
- /* TODO pogorevaTb */
+ return NULL;
}
b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
@@ -1704,19 +1727,38 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
msg = (const char *) b->mem.free;
- va_start(args, fmt);
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
- va_end(args);
nxt_log_alert(task->log, "error %d: %s", code, msg);
- last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
+ last = nxt_buf_mem_ts_alloc(task, mp, 0);
+
if (nxt_slow_path(last == NULL)) {
- /* TODO pogorevaTb */
+ nxt_mp_release(mp, b);
+ return NULL;
}
+ nxt_buf_set_sync(last);
+ nxt_buf_set_last(last);
+
nxt_buf_chain_add(&b, last);
+ return b;
+}
+
+
+
+static void
+nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
+ const char* fmt, ...)
+{
+ va_list args;
+ nxt_buf_t *b;
+
+ va_start(args, fmt);
+ b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
+ va_end(args);
+
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
@@ -1742,6 +1784,19 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
sw = obj;
app = sw->app;
+ if (app->workers + app->pending_workers >= app->max_workers) {
+ sw->work.handler = nxt_router_sw_release;
+
+ nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD "
+ "release to %p", sw->stream, sw->work.data);
+
+ nxt_event_engine_post(sw->work.data, &sw->work);
+
+ return;
+ }
+
+ app->pending_workers++;
+
nxt_debug(task, "send sw #%uD", sw->stream);
nxt_router_sw_add(task, nxt_router, sw);
@@ -1758,6 +1813,23 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
}
+static nxt_bool_t
+nxt_router_app_free(nxt_app_t *app)
+{
+ if (app->live == 0 && app->workers == 0 &&
+ app->pending_workers == 0 &&
+ nxt_queue_is_empty(&app->requests)) {
+
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
static nxt_port_t *
nxt_router_app_get_port(nxt_app_t *app)
{
@@ -1789,6 +1861,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
nxt_app_t *app;
nxt_port_t *port;
nxt_work_t *work;
+ nxt_process_t *process;
nxt_queue_link_t *lnk;
nxt_req_conn_link_t *rc;
@@ -1801,7 +1874,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
if (task->thread->engine != port->engine) {
- work = (nxt_work_t *) (port + 1);
+ work = &port->work;
nxt_debug(task, "post release port to engine %p", port->engine);
@@ -1822,7 +1895,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
- nxt_debug(task, "process request #%uxD", rc->req_id);
+ nxt_debug(task, "app '%V' process next request #%uxD",
+ &app->name, rc->req_id);
rc->app_port = port;
@@ -1831,7 +1905,37 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_debug(task, "app requests queue is empty");
+ if (port->pair[1] == -1) {
+ nxt_debug(task, "app '%V' port already closed (pid %PI dead?)",
+ &app->name, port->pid);
+
+ app->workers--;
+ nxt_router_app_free(app);
+
+ port->app = NULL;
+ process = port->process;
+
+ nxt_port_release(port);
+
+ if (nxt_queue_is_empty(&process->ports)) {
+ nxt_runtime_process_destroy(task->thread->runtime, process);
+ }
+
+ return;
+ }
+
+ if (!app->live) {
+ nxt_debug(task, "app '%V' is not alive, send QUIT to port",
+ &app->name);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+
+ return;
+ }
+
+ nxt_debug(task, "app '%V' requests queue is empty, keep the port",
+ &app->name);
nxt_thread_mutex_lock(&app->mutex);
@@ -1841,29 +1945,42 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
}
-void
+nxt_bool_t
nxt_router_app_remove_port(nxt_port_t *port)
{
- nxt_app_t *app;
-
- if (port->app_link.next == NULL) {
- return;
- }
+ nxt_app_t *app;
+ nxt_bool_t busy;
app = port->app;
+ busy = 1;
+
+ if (app == NULL) {
+ nxt_assert(port->app_link.next == NULL);
-#if (NXT_DEBUG)
- if (nxt_slow_path(app == NULL)) {
- nxt_abort();
+ return 1;
}
-#endif
nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ if (port->app_link.next != NULL) {
+
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ busy = 0;
+ }
nxt_thread_mutex_unlock(&app->mutex);
+
+ if (busy == 0) {
+
+ app->workers--;
+ nxt_router_app_free(app);
+
+ return 1;
+ }
+
+ return 0;
}
@@ -1894,11 +2011,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
port = nxt_router_app_get_port(app);
if (port != NULL) {
+ nxt_debug(task, "already have port for app '%V'", &app->name);
+
rc->app_port = port;
return NXT_OK;
}
-
sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
if (nxt_slow_path(sw == NULL)) {
@@ -2069,6 +2187,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
return;
}
+ rc->ap = ap;
+
nxt_event_engine_request_add(engine, rc);
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
@@ -2104,7 +2224,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
}
reply_port = rc->reply_port;
- ap = rc->conn->socket.data;
+ ap = rc->ap;
port_mp = port->mem_pool;
port->mem_pool = mp;