summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_application.h6
-rw-r--r--src/nxt_conn.h16
-rw-r--r--src/nxt_event_engine.c4
-rw-r--r--src/nxt_master_process.c11
-rw-r--r--src/nxt_port.c78
-rw-r--r--src/nxt_port.h5
-rw-r--r--src/nxt_port_socket.c24
-rw-r--r--src/nxt_process.c27
-rw-r--r--src/nxt_process.h6
-rw-r--r--src/nxt_router.c250
-rw-r--r--src/nxt_router.h8
-rw-r--r--src/nxt_runtime.c25
12 files changed, 298 insertions, 162 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 449f1158..460ef8ce 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -100,11 +100,13 @@ typedef struct {
} nxt_app_request_t;
-typedef struct {
+typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t;
+
+struct nxt_app_parse_ctx_s {
nxt_app_request_t r;
nxt_http_request_parse_t parser;
nxt_mp_t *mem_pool;
-} nxt_app_parse_ctx_t;
+};
nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
diff --git a/src/nxt_conn.h b/src/nxt_conn.h
index fa9f5b9d..c08ed376 100644
--- a/src/nxt_conn.h
+++ b/src/nxt_conn.h
@@ -183,15 +183,17 @@ struct nxt_conn_s {
typedef uint32_t nxt_req_id_t;
+typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t;
typedef struct {
- nxt_req_id_t req_id;
- nxt_conn_t *conn;
- nxt_port_t *app_port;
- nxt_port_t *reply_port;
-
- nxt_queue_link_t link; /* for nxt_conn_t.requests */
- nxt_queue_link_t app_link; /* for nxt_app_t.requests */
+ nxt_req_id_t req_id;
+ nxt_conn_t *conn;
+ nxt_port_t *app_port;
+ nxt_port_t *reply_port;
+ nxt_app_parse_ctx_t *ap;
+
+ nxt_queue_link_t link; /* for nxt_conn_t.requests */
+ nxt_queue_link_t app_link; /* for nxt_app_t.requests */
} nxt_req_conn_link_t;
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index 9171bcd6..b91f0e50 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -38,6 +38,8 @@ nxt_event_engine_create(nxt_task_t *task,
return NULL;
}
+ nxt_debug(task, "create engine %p", engine);
+
thread = task->thread;
engine->task.thread = thread;
@@ -440,6 +442,8 @@ nxt_event_engine_change(nxt_event_engine_t *engine,
void
nxt_event_engine_free(nxt_event_engine_t *engine)
{
+ nxt_thread_log_debug("free engine %p", engine);
+
nxt_event_engine_signal_pipe_free(engine);
nxt_free(engine->signals);
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 44cf8d8b..1a8b5ed8 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -207,12 +207,13 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
- port = nxt_process_port_new(rt, process, nxt_port_get_next_id(),
- NXT_PROCESS_MASTER);
+ port = nxt_port_new(0, nxt_pid, NXT_PROCESS_MASTER);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
+ nxt_process_port_add(process, port);
+
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -373,15 +374,17 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
process->init = init;
- port = nxt_process_port_new(rt, process, 0, init->type);
+ port = nxt_port_new(0, 0, init->type);
if (nxt_slow_path(port == NULL)) {
nxt_runtime_process_destroy(rt, process);
return NXT_ERROR;
}
+ nxt_process_port_add(process, port);
+
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_free(rt->mem_pool, port);
+ nxt_mp_release(port->mem_pool, port);
nxt_runtime_process_destroy(rt, process);
return ret;
}
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 3abc4125..4c2561d3 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -7,11 +7,72 @@
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
+#include <nxt_router.h>
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-static nxt_atomic_uint_t nxt_port_last_id;
+static nxt_atomic_uint_t nxt_port_last_id = 1;
+
+
+nxt_port_t *
+nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
+{
+ nxt_mp_t *mp;
+ nxt_port_t *port;
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ if (nxt_slow_path(mp == NULL)) {
+ return NULL;
+ }
+
+ port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
+
+ if (nxt_fast_path(port != NULL)) {
+ port->id = id;
+ port->pid = pid;
+ port->type = type;
+ port->mem_pool = mp;
+
+ nxt_queue_init(&port->messages);
+
+ } else {
+ nxt_mp_destroy(mp);
+ }
+
+ return port;
+}
+
+
+nxt_bool_t
+nxt_port_release(nxt_port_t *port)
+{
+ if (port->pair[0] != -1) {
+ nxt_fd_close(port->pair[0]);
+ port->pair[0] = -1;
+ }
+
+ if (port->pair[1] != -1) {
+ nxt_fd_close(port->pair[1]);
+ port->pair[1] = -1;
+ }
+
+ if (port->type == NXT_PROCESS_WORKER) {
+ if (nxt_router_app_remove_port(port) == 0) {
+ return 0;
+ }
+ }
+
+ if (port->link.next != NULL) {
+ nxt_process_port_remove(port);
+ }
+
+ nxt_mp_release(port->mem_pool, port);
+
+ return 1;
+}
+
nxt_port_id_t
nxt_port_get_next_id()
@@ -32,7 +93,6 @@ nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers)
{
port->pid = nxt_pid;
- port->engine = task->thread->engine;
port->handler = nxt_port_handler;
port->data = handlers;
@@ -151,7 +211,6 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
void
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- nxt_mp_t *mp;
nxt_port_t *port;
nxt_process_t *process;
nxt_runtime_t *rt;
@@ -180,26 +239,19 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- port = nxt_process_port_new(rt, process, new_port_msg->id,
- new_port_msg->type);
+ port = nxt_port_new(new_port_msg->id, new_port_msg->pid,
+ new_port_msg->type);
if (nxt_slow_path(port == NULL)) {
return;
}
- mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(mp == NULL)) {
- return;
- }
-
- port->mem_pool = mp;
+ nxt_process_port_add(process, port);
port->pair[0] = -1;
port->pair[1] = msg->fd;
port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share;
- nxt_queue_init(&port->messages);
-
port->socket.task = task;
nxt_runtime_port_add(rt, port);
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 603540e2..d6156015 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -88,6 +88,7 @@ struct nxt_port_s {
nxt_pid_t pid;
nxt_process_type_t type;
+ nxt_work_t work;
};
@@ -110,6 +111,10 @@ typedef union {
} nxt_port_data_t;
+nxt_port_t *nxt_port_new(nxt_port_id_t id, nxt_pid_t pid,
+ nxt_process_type_t type);
+nxt_bool_t nxt_port_release(nxt_port_t *port);
+
nxt_port_id_t nxt_port_get_next_id(void);
void nxt_port_reset_next_id(void);
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index a0c6fea3..31572e54 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -19,7 +19,6 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t
nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
{
- nxt_mp_t *mp;
nxt_int_t sndbuf, rcvbuf, size;
nxt_socket_t snd, rcv;
@@ -28,15 +27,6 @@ nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
port->pair[0] = -1;
port->pair[1] = -1;
- nxt_queue_init(&port->messages);
-
- mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(mp == NULL)) {
- return NXT_ERROR;
- }
-
- port->mem_pool = mp;
-
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
}
@@ -100,8 +90,6 @@ getsockopt_fail:
socketpair_fail:
- nxt_mp_destroy(port->mem_pool);
-
return NXT_ERROR;
}
@@ -172,14 +160,9 @@ nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
{
- nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
- for (link = nxt_queue_first(&port->messages);
- link != nxt_queue_tail(&port->messages);
- link = nxt_queue_next(link))
- {
- msg = (nxt_port_send_msg_t *) link;
+ nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
if (msg->port_msg.stream == stream &&
msg->port_msg.reply_port == reply_port) {
@@ -191,7 +174,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
return NXT_OK;
}
- }
+
+ } nxt_queue_loop;
msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
@@ -256,7 +240,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
return;
}
- msg = (nxt_port_send_msg_t *) link;
+ msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 5899ce30..1832640d 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -563,30 +563,11 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
}
-nxt_port_t *
-nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process,
- nxt_port_id_t id, nxt_process_type_t type)
+void
+nxt_process_port_add(nxt_process_t *process, nxt_port_t *port)
{
- size_t size;
- nxt_port_t *port;
-
- size = sizeof(nxt_port_t);
- if (size == NXT_PROCESS_WORKER) {
- size += sizeof(nxt_work_t);
- }
-
- port = nxt_mp_zalloc(rt->mem_pool, size);
-
- if (nxt_fast_path(port != NULL)) {
- port->id = id;
- port->pid = process->pid;
- port->process = process;
- port->type = type;
-
- nxt_process_port_add(process, port);
- }
-
- return port;
+ port->process = process;
+ nxt_queue_insert_tail(&process->ports, &port->link);
}
diff --git a/src/nxt_process.h b/src/nxt_process.h
index c49d2be3..a647c00a 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -82,17 +82,13 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
char ***orig_envp);
-NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_runtime_t *rt,
- nxt_process_t *process, nxt_port_id_t id, nxt_process_type_t type);
-
#define nxt_process_port_remove(port) \
nxt_queue_remove(&port->link)
#define nxt_process_port_first(process) \
nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link)
-#define nxt_process_port_add(process, port) \
- nxt_queue_insert_tail(&process->ports, &port->link)
+NXT_EXPORT void nxt_process_port_add(nxt_process_t *process, nxt_port_t *port);
#define nxt_process_port_each(process, port) \
nxt_queue_each(port, &process->ports, nxt_port_t, link)
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 993d623c..5ee70377 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -99,6 +99,7 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
+static nxt_bool_t nxt_router_app_free(nxt_app_t *app);
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
void *data);
@@ -189,6 +190,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_fast_path(sw != NULL)) {
msg->new_port->app = sw->app;
+ sw->app->workers++;
+
+ nxt_assert(sw->app->pending_workers != 0);
+
+ sw->app->pending_workers--;
+
nxt_router_app_release_port(task, msg->new_port, sw->app);
sw->work.handler = nxt_router_sw_release;
@@ -754,25 +761,25 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
{
- nxt_socket_conf_t *conf;
+ nxt_socket_conf_t *skcf;
- conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
- if (nxt_slow_path(conf == NULL)) {
+ skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
+ if (nxt_slow_path(skcf == NULL)) {
return NULL;
}
- conf->sockaddr = sa;
+ skcf->sockaddr = sa;
- conf->listen.sockaddr = sa;
- conf->listen.socklen = sa->socklen;
- conf->listen.address_length = sa->length;
+ skcf->listen.sockaddr = sa;
+ skcf->listen.socklen = sa->socklen;
+ skcf->listen.address_length = sa->length;
- conf->listen.socket = -1;
- conf->listen.backlog = NXT_LISTEN_BACKLOG;
- conf->listen.flags = NXT_NONBLOCK;
- conf->listen.read_after_accept = 1;
+ skcf->listen.socket = -1;
+ skcf->listen.backlog = NXT_LISTEN_BACKLOG;
+ skcf->listen.flags = NXT_NONBLOCK;
+ skcf->listen.read_after_accept = 1;
- return conf;
+ return skcf;
}
@@ -1179,17 +1186,33 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
+ nxt_app_t *app;
+ nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- // TODO RELEASE APP
-#if 0
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
-#endif
+ app->live = 0;
+
+ if (nxt_router_app_free(app) != 0) {
+ continue;
+ }
+
+ if (nxt_queue_is_empty(&app->requests)) {
+
+ do {
+ port = nxt_router_app_get_port(app);
+ if (port == NULL) {
+ break;
+ }
+
+ nxt_port_socket_write(&port->engine->task, port,
+ NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+ } while (1);
+
+ }
+
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1266,21 +1289,17 @@ nxt_router_thread_start(void *data)
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
- port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
+ port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER);
if (nxt_slow_path(port == NULL)) {
return;
}
- port->id = nxt_port_get_next_id();
- port->pid = nxt_pid;
-
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_mp_release(port->mem_pool, port);
return;
}
- port->type = NXT_PROCESS_ROUTER;
-
engine->port = port;
nxt_port_enable(task, port, nxt_router_app_port_handlers);
@@ -1391,6 +1410,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
nxt_fd_event_delete(engine, &listen->socket);
+ nxt_debug(task, "engine %p: listen socket delete: %d", engine,
+ listen->socket.fd);
+
listen->timer.handler = nxt_router_listen_socket_close;
listen->timer.work_queue = &engine->fast_work_queue;
@@ -1414,6 +1436,9 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = listen->socket.data;
+ nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
+ listen->socket.fd);
+
nxt_queue_remove(&listen->link);
/* 'task' refers to listen->task and we cannot use after nxt_free() */
@@ -1439,6 +1464,9 @@ nxt_router_listen_socket_release(nxt_task_t *task,
nxt_thread_spin_lock(lock);
+ nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
+ task->thread->engine, rtsk->count);
+
if (--rtsk->count != 0) {
rtsk = NULL;
}
@@ -1463,7 +1491,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_router_conf_t *rtcf;
nxt_thread_spinlock_t *lock;
- nxt_debug(task, "conf joint count: %D", joint->count);
+ nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
if (--joint->count != 0) {
return;
@@ -1477,6 +1505,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_lock(lock);
+ nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
+ rtcf, rtcf->count);
+
if (--skcf->count != 0) {
rtcf = NULL;
@@ -1531,18 +1562,10 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
- if (port->pair[0] != -1) {
- nxt_fd_close(port->pair[0]);
- }
-
- if (port->pair[1] != -1) {
- nxt_fd_close(port->pair[1]);
- }
-
- if (port->mem_pool) {
- nxt_mp_destroy(port->mem_pool);
- }
+ nxt_mp_thread_adopt(port->mem_pool);
+ nxt_port_release(port);
+ nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine);
@@ -1683,17 +1706,17 @@ nxt_router_text_by_code(int code)
}
}
-static void
-nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
- const char* fmt, ...)
+
+static nxt_buf_t *
+nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
+ const char* fmt, va_list args)
{
- va_list args;
- nxt_buf_t *b, *last;
- const char *msg;
+ nxt_buf_t *b, *last;
+ const char *msg;
- b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
+ b = nxt_buf_mem_ts_alloc(task, mp, 16384);
if (nxt_slow_path(b == NULL)) {
- /* TODO pogorevaTb */
+ return NULL;
}
b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
@@ -1704,19 +1727,38 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
msg = (const char *) b->mem.free;
- va_start(args, fmt);
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
- va_end(args);
nxt_log_alert(task->log, "error %d: %s", code, msg);
- last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
+ last = nxt_buf_mem_ts_alloc(task, mp, 0);
+
if (nxt_slow_path(last == NULL)) {
- /* TODO pogorevaTb */
+ nxt_mp_release(mp, b);
+ return NULL;
}
+ nxt_buf_set_sync(last);
+ nxt_buf_set_last(last);
+
nxt_buf_chain_add(&b, last);
+ return b;
+}
+
+
+
+static void
+nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
+ const char* fmt, ...)
+{
+ va_list args;
+ nxt_buf_t *b;
+
+ va_start(args, fmt);
+ b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
+ va_end(args);
+
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
@@ -1742,6 +1784,19 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
sw = obj;
app = sw->app;
+ if (app->workers + app->pending_workers >= app->max_workers) {
+ sw->work.handler = nxt_router_sw_release;
+
+ nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD "
+ "release to %p", sw->stream, sw->work.data);
+
+ nxt_event_engine_post(sw->work.data, &sw->work);
+
+ return;
+ }
+
+ app->pending_workers++;
+
nxt_debug(task, "send sw #%uD", sw->stream);
nxt_router_sw_add(task, nxt_router, sw);
@@ -1758,6 +1813,23 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
}
+static nxt_bool_t
+nxt_router_app_free(nxt_app_t *app)
+{
+ if (app->live == 0 && app->workers == 0 &&
+ app->pending_workers == 0 &&
+ nxt_queue_is_empty(&app->requests)) {
+
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
static nxt_port_t *
nxt_router_app_get_port(nxt_app_t *app)
{
@@ -1789,6 +1861,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
nxt_app_t *app;
nxt_port_t *port;
nxt_work_t *work;
+ nxt_process_t *process;
nxt_queue_link_t *lnk;
nxt_req_conn_link_t *rc;
@@ -1801,7 +1874,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
if (task->thread->engine != port->engine) {
- work = (nxt_work_t *) (port + 1);
+ work = &port->work;
nxt_debug(task, "post release port to engine %p", port->engine);
@@ -1822,7 +1895,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
- nxt_debug(task, "process request #%uxD", rc->req_id);
+ nxt_debug(task, "app '%V' process next request #%uxD",
+ &app->name, rc->req_id);
rc->app_port = port;
@@ -1831,7 +1905,37 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_debug(task, "app requests queue is empty");
+ if (port->pair[1] == -1) {
+ nxt_debug(task, "app '%V' port already closed (pid %PI dead?)",
+ &app->name, port->pid);
+
+ app->workers--;
+ nxt_router_app_free(app);
+
+ port->app = NULL;
+ process = port->process;
+
+ nxt_port_release(port);
+
+ if (nxt_queue_is_empty(&process->ports)) {
+ nxt_runtime_process_destroy(task->thread->runtime, process);
+ }
+
+ return;
+ }
+
+ if (!app->live) {
+ nxt_debug(task, "app '%V' is not alive, send QUIT to port",
+ &app->name);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+
+ return;
+ }
+
+ nxt_debug(task, "app '%V' requests queue is empty, keep the port",
+ &app->name);
nxt_thread_mutex_lock(&app->mutex);
@@ -1841,29 +1945,42 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
}
-void
+nxt_bool_t
nxt_router_app_remove_port(nxt_port_t *port)
{
- nxt_app_t *app;
-
- if (port->app_link.next == NULL) {
- return;
- }
+ nxt_app_t *app;
+ nxt_bool_t busy;
app = port->app;
+ busy = 1;
+
+ if (app == NULL) {
+ nxt_assert(port->app_link.next == NULL);
-#if (NXT_DEBUG)
- if (nxt_slow_path(app == NULL)) {
- nxt_abort();
+ return 1;
}
-#endif
nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ if (port->app_link.next != NULL) {
+
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ busy = 0;
+ }
nxt_thread_mutex_unlock(&app->mutex);
+
+ if (busy == 0) {
+
+ app->workers--;
+ nxt_router_app_free(app);
+
+ return 1;
+ }
+
+ return 0;
}
@@ -1894,11 +2011,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
port = nxt_router_app_get_port(app);
if (port != NULL) {
+ nxt_debug(task, "already have port for app '%V'", &app->name);
+
rc->app_port = port;
return NXT_OK;
}
-
sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
if (nxt_slow_path(sw == NULL)) {
@@ -2069,6 +2187,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
return;
}
+ rc->ap = ap;
+
nxt_event_engine_request_add(engine, rc);
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
@@ -2104,7 +2224,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
}
reply_port = rc->reply_port;
- ap = rc->conn->socket.data;
+ ap = rc->ap;
port_mp = port->mem_pool;
port->mem_pool = mp;
diff --git a/src/nxt_router.h b/src/nxt_router.h
index ea7a54bc..5724c192 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -69,11 +69,13 @@ typedef struct {
struct nxt_app_s {
- nxt_thread_mutex_t mutex;
- nxt_queue_t ports;
+ nxt_thread_mutex_t mutex; /* Protects ports queue. */
+ nxt_queue_t ports; /* of nxt_port_t.app_link */
+
nxt_queue_t requests; /* of nxt_req_conn_link_t */
nxt_str_t name;
+ uint32_t pending_workers;
uint32_t workers;
uint32_t max_workers;
@@ -123,6 +125,6 @@ typedef struct {
void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-void nxt_router_app_remove_port(nxt_port_t *port);
+nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port);
#endif /* _NXT_ROUTER_H_INCLUDED_ */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 6ddf27c1..8b216337 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1542,7 +1542,6 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
lhq.proto = &lvlhsh_processes_proto;
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
- nxt_thread_log_debug("process %PI found", pid);
return lhq.value;
}
@@ -1662,9 +1661,13 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
nxt_runtime_port_remove(rt, port);
+ nxt_port_release(port);
+
} nxt_process_port_loop;
- nxt_runtime_process_destroy(rt, process);
+ if (nxt_queue_is_empty(&process->ports)) {
+ nxt_runtime_process_destroy(rt, process);
+ }
break;
@@ -1709,24 +1712,6 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
if (rt->port_by_type[port->type] == port) {
rt->port_by_type[port->type] = NULL;
}
-
- if (port->pair[0] != -1) {
- nxt_fd_close(port->pair[0]);
- }
-
- if (port->pair[1] != -1) {
- nxt_fd_close(port->pair[1]);
- }
-
- if (port->type == NXT_PROCESS_WORKER) {
- nxt_router_app_remove_port(port);
- }
-
- if (port->mem_pool) {
- nxt_mp_destroy(port->mem_pool);
- }
-
- nxt_mp_free(rt->mem_pool, port);
}