summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_main_process.c15
-rw-r--r--src/nxt_port.c129
-rw-r--r--src/nxt_port.h17
-rw-r--r--src/nxt_port_socket.c104
-rw-r--r--src/nxt_process.c12
-rw-r--r--src/nxt_router.c762
-rw-r--r--src/nxt_router.h6
-rw-r--r--src/nxt_runtime.c36
-rw-r--r--src/nxt_runtime.h8
9 files changed, 682 insertions, 407 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 90c06a05..9c27a89b 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -260,14 +260,17 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
+ nxt_process_port_add(task, process, port);
+
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
return ret;
}
- nxt_process_port_add(task, process, port);
+ nxt_runtime_port_add(task, port);
- nxt_runtime_port_add(rt, port);
+ nxt_port_use(task, port, -1);
/*
* A main process port. A write port is not closed
@@ -508,7 +511,7 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
port = nxt_port_new(task, 0, 0, init->type);
if (nxt_slow_path(port == NULL)) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
return NXT_ERROR;
}
@@ -516,12 +519,14 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_release(port->mem_pool, port);
+ nxt_port_use(task, port, -1);
return ret;
}
pid = nxt_process_create(task, process);
+ nxt_port_use(task, port, -1);
+
switch (pid) {
case -1:
@@ -755,7 +760,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
if (process) {
init = process->init;
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
if (!nxt_exiting) {
nxt_runtime_process_each(rt, process) {
diff --git a/src/nxt_port.c b/src/nxt_port.c
index d7f42012..3f7dc411 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -27,13 +27,15 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
nxt_assert(port->pair[0] == -1);
nxt_assert(port->pair[1] == -1);
- nxt_assert(port->app_stream == 0);
+ nxt_assert(port->use_count == 0);
nxt_assert(port->app_link.next == NULL);
nxt_assert(nxt_queue_is_empty(&port->messages));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
+ nxt_thread_mutex_destroy(&port->write_mutex);
+
nxt_mp_free(mp, port);
}
@@ -58,10 +60,12 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
port->pid = pid;
port->type = type;
port->mem_pool = mp;
+ port->use_count = 1;
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
nxt_queue_init(&port->messages);
+ nxt_thread_mutex_create(&port->write_mutex);
} else {
nxt_mp_destroy(mp);
@@ -73,11 +77,11 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
}
-nxt_bool_t
-nxt_port_release(nxt_port_t *port)
+void
+nxt_port_close(nxt_task_t *task, nxt_port_t *port)
{
- nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
- port->id, port->type);
+ nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
+ port->id, port->type);
if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]);
@@ -87,21 +91,31 @@ nxt_port_release(nxt_port_t *port)
if (port->pair[1] != -1) {
nxt_fd_close(port->pair[1]);
port->pair[1] = -1;
- }
- if (port->type == NXT_PROCESS_WORKER) {
- if (nxt_router_app_remove_port(port) == 0) {
- return 0;
+ if (port->app != NULL) {
+ nxt_router_app_port_close(task, port);
}
}
+}
+
+
+static void
+nxt_port_release(nxt_task_t *task, nxt_port_t *port)
+{
+ nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
+ port->id, port->type);
+
+ if (port->app != NULL) {
+ nxt_router_app_use(task, port->app, -1);
+
+ port->app = NULL;
+ }
if (port->link.next != NULL) {
nxt_process_port_remove(port);
}
nxt_mp_release(port->mem_pool, NULL);
-
- return 1;
}
@@ -263,7 +277,9 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(rt, port);
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
nxt_port_write_enable(task, port);
@@ -434,7 +450,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
process = nxt_runtime_process_find(rt, pid);
if (process) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
}
}
@@ -444,3 +460,90 @@ nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "port empty handler");
}
+
+
+typedef struct {
+ nxt_work_t work;
+ nxt_port_t *port;
+ nxt_port_post_handler_t handler;
+} nxt_port_work_t;
+
+
+static void
+nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_port_t *port;
+ nxt_port_work_t *pw;
+ nxt_port_post_handler_t handler;
+
+ pw = obj;
+ port = pw->port;
+ handler = pw->handler;
+
+ nxt_free(pw);
+
+ handler(task, port, data);
+
+ nxt_port_use(task, port, -1);
+}
+
+
+nxt_int_t
+nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data)
+{
+ nxt_port_work_t *pw;
+
+ if (task->thread->engine == port->engine) {
+ handler(task, port, data);
+
+ return NXT_OK;
+ }
+
+ pw = nxt_zalloc(sizeof(nxt_port_work_t));
+
+ if (nxt_slow_path(pw == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&port->use_count, 1);
+
+ pw->work.handler = nxt_port_post_handler;
+ pw->work.task = &port->engine->task;
+ pw->work.obj = pw;
+ pw->work.data = data;
+
+ pw->port = port;
+ pw->handler = handler;
+
+ nxt_event_engine_post(port->engine, &pw->work);
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ /* no op */
+}
+
+
+void
+nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
+{
+ int c;
+
+ c = nxt_atomic_fetch_add(&port->use_count, i);
+
+ if (i < 0 && c == -i) {
+
+ if (task->thread->engine == port->engine) {
+ nxt_port_release(task, port);
+
+ return;
+ }
+
+ nxt_port_post(task, port, nxt_port_release_handler, NULL);
+ }
+}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index c5b2b40e..f6679bb2 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -115,7 +115,6 @@ typedef struct {
size_t share;
nxt_fd_t fd;
nxt_bool_t close_fd;
- nxt_bool_t opened;
nxt_port_msg_t port_msg;
nxt_work_t work;
@@ -145,12 +144,15 @@ struct nxt_port_s {
nxt_app_t *app;
nxt_queue_t messages; /* of nxt_port_send_msg_t */
+ nxt_thread_mutex_t write_mutex;
/* Maximum size of message part. */
uint32_t max_size;
/* Maximum interleave of message parts. */
uint32_t max_share;
- uint32_t app_stream;
+
+ uint32_t app_requests;
+ uint32_t app_responses;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
@@ -167,8 +169,9 @@ struct nxt_port_s {
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
+ nxt_atomic_t use_count;
+
nxt_process_type_t type;
- nxt_work_t work;
struct iovec *iov;
void *mmsg_buf;
@@ -194,9 +197,11 @@ typedef union {
} nxt_port_data_t;
+typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
+ void *data);
+
nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_process_type_t type);
-nxt_bool_t nxt_port_release(nxt_port_t *port);
nxt_port_id_t nxt_port_get_next_id(void);
void nxt_port_reset_next_id(void);
@@ -204,6 +209,7 @@ void nxt_port_reset_next_id(void);
nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
size_t max_size);
void nxt_port_destroy(nxt_port_t *port);
+void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
@@ -231,5 +237,8 @@ void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data);
+void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
#endif /* _NXT_PORT_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 75706459..d5ed493d 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
{
nxt_port_send_msg_t *msg;
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
-
- if ((type & NXT_PORT_MSG_SYNC) != 0) {
- msg->opened = 0;
- continue;
- }
-
- if (msg->port_msg.stream == stream
- && msg->port_msg.reply_port == reply_port
- && msg->port_msg.last == 0
- && msg->opened) {
-
- /*
- * An fd is ignored since a file descriptor
- * must be sent only in the first message of a stream.
- */
- nxt_buf_chain_add(&msg->buf, b);
-
- msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
-
- return NXT_OK;
- }
-
- } nxt_queue_loop;
-
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
@@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
- msg->opened = 1;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_insert_tail(&port->messages, &msg->link);
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ nxt_port_use(task, port, 1);
+
if (port->socket.write_ready) {
nxt_port_write_handler(task, &port->socket, NULL);
}
@@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
static void
+nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_block_write(task->thread->engine, &port->socket);
+}
+
+
+static void
+nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+}
+
+
+static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
size_t plain_size;
ssize_t n;
+ nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
@@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
+ block_write = 0;
+ enable_write = 0;
+ use_delta = 0;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
iov = port->iov;
do {
link = nxt_queue_first(&port->messages);
if (link == nxt_queue_tail(&port->messages)) {
- nxt_fd_event_block_write(task->thread->engine, &port->socket);
- return;
+ block_write = 1;
+ goto unlock_mutex;
}
msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
@@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
}
@@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) {
- /* TODO task->thread->engine or port->engine ? */
- nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+ enable_write = 1;
}
- return;
+ goto unlock_mutex;
fail:
+ use_delta++;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket, NULL);
+ nxt_port_error_handler, task, &port->socket,
+ &port->socket);
+
+unlock_mutex:
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (block_write && nxt_fd_event_is_active(port->socket.write)) {
+ nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
+ }
+
+ if (enable_write) {
+ nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
+ }
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
@@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
nxt_buf_t *b;
nxt_port_t *port;
nxt_work_queue_t *wq;
@@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
+ use_delta = 0;
+
+ if (obj == data) {
+ use_delta--;
+ }
+
+ wq = &task->thread->engine->fast_work_queue;
- wq = &task->thread->engine->fast_work_queue;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
for(b = msg->buf; b != NULL; b = b->next) {
if (nxt_buf_is_sync(b)) {
@@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
}
nxt_queue_remove(&msg->link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
} nxt_queue_loop;
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 95e701f8..d3ec36ed 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -58,7 +58,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
if (!p->ready) {
nxt_debug(task, "remove not ready process %PI", p->pid);
- nxt_runtime_process_remove(rt, p);
+ nxt_runtime_process_remove(task, p);
} else {
nxt_port_mmaps_destroy(p->incoming, 0);
@@ -67,7 +67,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
} nxt_runtime_process_loop;
- nxt_runtime_process_add(rt, process);
+ nxt_runtime_process_add(task, process);
nxt_process_start(task, process);
@@ -81,7 +81,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
process->pid = pid;
- nxt_runtime_process_add(rt, process);
+ nxt_runtime_process_add(task, process);
break;
}
@@ -589,16 +589,14 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
static void
nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
{
- nxt_runtime_t *rt;
nxt_process_t *process;
process = obj;
- rt = data;
process->port_cleanups--;
if (process->port_cleanups == 0) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
}
}
@@ -609,7 +607,7 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
nxt_queue_insert_tail(&process->ports, &port->link);
nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process,
- task->thread->runtime);
+ NULL);
process->port_cleanups++;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df3caf82..5eb95b59 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -24,19 +24,12 @@ typedef struct {
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
-typedef struct nxt_start_worker_s nxt_start_worker_t;
-
-struct nxt_start_worker_s {
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
-
- nxt_work_t work;
-};
typedef struct {
uint32_t stream;
nxt_conn_t *conn;
+ nxt_app_t *app;
nxt_port_t *app_port;
nxt_req_app_link_t *ra;
@@ -72,6 +65,8 @@ typedef struct {
} nxt_remove_pid_msg_t;
+static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
@@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
-static void nxt_router_apps_sort(nxt_router_t *router,
+static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_t *router,
@@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
-static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
- void *data);
-static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
-static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
-static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
- void *data);
+static void nxt_router_app_port_ready(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_port_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+
+static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
+static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
- nxt_req_app_link_t *ra, nxt_port_t *port);
+ nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
@@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data)
}
-static nxt_start_worker_t *
-nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+static void
+nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
{
- nxt_port_t *main_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
+ size_t size;
+ uint32_t stream;
+ nxt_app_t *app;
+ nxt_buf_t *b;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
- sw = nxt_zalloc(sizeof(nxt_start_worker_t));
+ app = data;
- if (nxt_slow_path(sw == NULL)) {
- return NULL;
- }
+ rt = task->thread->runtime;
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- sw->app = app;
- sw->ra = ra;
+ nxt_debug(task, "app '%V' %p start worker", &app->name, app);
- nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
- ra->stream, &app->name, app);
+ size = app->name.length + 1 + app->conf.length;
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
- sw->work.handler = nxt_router_send_sw_request;
- sw->work.task = &main_port->engine->task;
- sw->work.obj = sw;
- sw->work.data = task->thread->engine;
- sw->work.next = NULL;
+ if (nxt_slow_path(b == NULL)) {
+ goto failed;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
- if (task->thread->engine != main_port->engine) {
- nxt_debug(task, "sw %p post send to main engine %p", sw,
- main_port->engine);
+ stream = nxt_port_rpc_register_handler(task, port,
+ nxt_router_app_port_ready,
+ nxt_router_app_port_error,
+ -1, app);
- nxt_event_engine_post(main_port->engine, &sw->work);
+ if (nxt_slow_path(stream == 0)) {
+ nxt_mp_release(b->data, b);
- } else {
- nxt_router_send_sw_request(task, sw, sw->work.data);
+ goto failed;
}
- return sw;
-}
+ nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, port->id, b);
+
+ return;
+failed:
-nxt_inline void
-nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
-{
- nxt_debug(task, "sw %p release", sw);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
- nxt_free(sw);
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
}
-nxt_inline void
-nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
+static nxt_int_t
+nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
{
- nxt_queue_remove(&rc->link);
+ nxt_int_t res;
+ nxt_port_t *router_port;
+ nxt_runtime_t *rt;
- if (rc->ra != NULL) {
- rc->ra->rc = NULL;
- rc->ra = NULL;
+ rt = task->thread->runtime;
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ nxt_router_app_use(task, app, 1);
+
+ res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
+ app);
+
+ if (res == NXT_OK) {
+ return res;
}
- rc->conn = NULL;
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
+
+ return NXT_ERROR;
}
@@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
static void
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
{
- nxt_port_t *app_port;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_req_app_link_t *ra;
+ nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
ra = obj;
engine = data;
- if (ra->app_port != NULL) {
-
- app_port = ra->app_port;
- ra->app_port = NULL;
-
- if (task->thread->engine != engine) {
- ra->app_pid = app_port->pid;
- }
-
- nxt_router_app_release_port(task, app_port, app_port->app);
-
-#if 0
- /* Uncomment to hold app port until complete response received. */
- if (ra->rc != NULL) {
- ra->rc->app_port = ra->app_port;
-
- } else {
- nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
+ if (task->thread->engine != engine) {
+ if (ra->app_port != NULL) {
+ ra->app_pid = ra->app_port->pid;
}
-#endif
- }
- if (task->thread->engine != engine) {
ra->work.handler = nxt_router_ra_release;
ra->work.task = &engine->task;
ra->work.next = NULL;
@@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
return;
}
- if (ra->rc != NULL && ra->app_pid != -1) {
- nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
+ nxt_debug(task, "ra stream #%uD release", ra->stream);
+
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ if (ra->app_pid != -1) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
+ }
+
+ rc->app_port = ra->app_port;
+
+ ra->app_port = NULL;
+ rc->ra = NULL;
+ ra->rc = NULL;
}
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
+ }
nxt_mp_release(ra->mem_pool, ra);
}
@@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
{
- nxt_conn_t *c;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_conn_t *c;
+ nxt_req_app_link_t *ra;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
ra = obj;
engine = data;
@@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "ra stream #%uD abort", ra->stream);
- if (ra->rc != NULL) {
- c = ra->rc->conn;
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ c = rc->conn;
nxt_router_gen_error(task, c, 500,
"Failed to start application worker");
+
+ rc->ra = NULL;
+ ra->rc = NULL;
+ }
+
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
}
nxt_mp_release(ra->mem_pool, ra);
}
+nxt_inline void
+nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
+{
+ nxt_req_app_link_t *ra;
+
+ if (rc->app_port != NULL) {
+ nxt_router_app_port_release(task, rc->app_port, 0, 1);
+
+ rc->app_port = NULL;
+ }
+
+ ra = rc->ra;
+
+ if (ra != NULL) {
+ rc->ra = NULL;
+ ra->rc = NULL;
+
+ nxt_thread_mutex_lock(&rc->app->mutex);
+
+ if (ra->link.next != NULL) {
+ nxt_queue_remove(&ra->link);
+
+ ra->link.next = NULL;
+
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&rc->app->mutex);
+ }
+
+ if (ra != NULL) {
+ nxt_router_ra_release(task, ra, ra->work.data);
+ }
+
+ if (rc->app != NULL) {
+ nxt_router_app_use(task, rc->app, -1);
+
+ rc->app = NULL;
+ }
+
+ nxt_queue_remove(&rc->link);
+
+ rc->conn = NULL;
+}
+
+
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
{
+ nxt_pid_t pid;
+ nxt_buf_t *buf;
nxt_event_engine_t *engine;
nxt_remove_pid_msg_t *rp;
rp = obj;
- nxt_port_remove_pid_handler(task, &rp->msg);
+ buf = rp->msg.buf;
+
+ nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
+
+ nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
+
+ nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
engine = rp->work.data;
@@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
goto fail;
}
- nxt_router_apps_sort(router, tmcf);
+ nxt_router_apps_sort(task, router, tmcf);
nxt_router_engines_post(router, tmcf);
@@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
app->live = 1;
+ app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[type];
nxt_queue_insert_tail(&tmcf->apps, &app->link);
+
+ nxt_router_app_use(task, app, 1);
}
http = nxt_conf_get_path(conf, &http_path);
@@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
-nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
+nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
+ nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
+ nxt_debug(task, "about to free app '%V' %p", &app->name, app);
app->live = 0;
- if (nxt_router_app_free(NULL, app) != 0) {
- continue;
- }
-
- if (!nxt_queue_is_empty(&app->requests)) {
-
- nxt_thread_log_debug("app '%V' %p pending requests found",
- &app->name, app);
- continue;
- }
-
do {
- port = nxt_router_app_get_port(app, 0);
+ port = nxt_router_app_get_idle_port(app);
if (port == NULL) {
break;
}
- nxt_thread_log_debug("port %p send quit", port);
+ nxt_debug(task, "port %p send quit", port);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
- nxt_port_socket_write(&port->engine->task, port,
- NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+ nxt_port_use(task, port, -1);
} while (1);
+ nxt_router_app_use(task, app, -1);
+
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data)
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_release(port->mem_pool, port);
+ nxt_port_use(task, port, -1);
return;
}
@@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
+ port->engine = task->thread->engine;
nxt_mp_thread_adopt(port->mem_pool);
- nxt_port_release(port);
+ nxt_port_use(task, port, -1);
nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
@@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, last);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
if (b == NULL) {
@@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_gen_error(task, rc->conn, 500,
"Application terminated unexpectedly");
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
@@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
static void
-nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
- nxt_start_worker_t *sw;
+ nxt_app_t *app;
+ nxt_port_t *port;
+
+ app = data;
+ port = msg->new_port;
- sw = data;
+ nxt_assert(app != NULL);
+ nxt_assert(port != NULL);
- nxt_assert(sw != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ port->app = app;
- msg->new_port->app = sw->app;
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
- sw->app->workers++;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
+ app->pending_workers--;
+ app->workers++;
+
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_app_release_port(task, msg->new_port, sw->app);
+ nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
- nxt_router_sw_release(task, sw);
+ nxt_router_app_port_release(task, port, 0, 0);
}
static void
-nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
nxt_app_t *app;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
- nxt_start_worker_t *sw;
- sw = data;
+ app = data;
- nxt_assert(sw != NULL);
- nxt_assert(sw->app != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ nxt_assert(app != NULL);
- app = sw->app;
+ nxt_debug(task, "app '%V' %p start error", &app->name, app);
+
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p error, failed to start app '%V'",
- sw, &app->name);
+ app->pending_workers--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (ra != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
nxt_router_ra_abort(task, ra, ra->work.data);
}
- nxt_router_sw_release(task, sw);
+ nxt_router_app_use(task, app, -1);
}
-static void
-nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
+void
+nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
{
- size_t size;
- uint32_t stream;
- nxt_buf_t *b;
- nxt_app_t *app;
- nxt_port_t *main_port, *router_port, *app_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
- nxt_req_app_link_t *ra;
-
- sw = obj;
- app = sw->app;
+ int c;
- if (nxt_queue_is_empty(&app->requests)) {
- ra = sw->ra;
- app_port = nxt_router_app_get_port(app, ra->stream);
+ c = nxt_atomic_fetch_add(&app->use_count, i);
- if (app_port != NULL) {
- nxt_debug(task, "app '%V' %p process stream #%uD",
- &app->name, app, ra->stream);
+ if (i < 0 && c == -i) {
- ra->app_port = app_port;
+ nxt_assert(app->live == 0);
+ nxt_assert(app->workers == 0);
+ nxt_assert(app->pending_workers == 0);
+ nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
- nxt_router_process_http_request_mp(task, ra, app_port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
- nxt_router_sw_release(task, sw);
-
- return;
- }
- }
-
- nxt_queue_insert_tail(&app->requests, &sw->ra->link);
-
- if (app->workers + app->pending_workers >= app->max_workers) {
- nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, "
- "max_workers (%uD) reached", &app->name, app,
- app->workers, app->pending_workers, app->max_workers);
-
- nxt_router_sw_release(task, sw);
-
- return;
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
}
-
- app->pending_workers++;
-
- nxt_debug(task, "sw %p send", sw);
-
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
-
- size = app->name.length + 1 + app->conf.length;
-
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
-
- nxt_buf_cpystr(b, &app->name);
- *b->mem.free++ = '\0';
- nxt_buf_cpystr(b, &app->conf);
-
- stream = nxt_port_rpc_register_handler(task, router_port,
- nxt_router_sw_ready,
- nxt_router_sw_error,
- main_port->pid, sw);
-
- nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
- stream, router_port->id, b);
}
-static nxt_bool_t
-nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
{
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_port_t *port;
+ nxt_queue_link_t *lnk;
- nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
- app->live, app->workers, app->pending_workers,
- nxt_queue_is_empty(&app->requests));
+ lnk = nxt_queue_first(&app->ports);
+ nxt_queue_remove(lnk);
- if (app->live == 0
- && app->workers == 0
- && app->pending_workers == 0
- && nxt_queue_is_empty(&app->requests))
- {
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
- return 1;
- }
+ port->app_requests++;
- if (app->live == 1
- && nxt_queue_is_empty(&app->requests) == 0
- && app->workers + app->pending_workers < app->max_workers)
+ if (app->live &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
{
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
+ nxt_queue_insert_tail(&app->ports, lnk);
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ lnk->next = NULL;
- nxt_router_sw_create(task, app, ra);
+ (*use_delta)--;
}
- return 0;
+ return port;
}
static nxt_port_t *
-nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
+nxt_router_app_get_idle_port(nxt_app_t *app)
{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
+ nxt_port_t *port;
port = NULL;
nxt_thread_mutex_lock(&app->mutex);
- if (!nxt_queue_is_empty(&app->ports)) {
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
+ nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- lnk->next = NULL;
+ if (port->app_requests > port->app_responses) {
+ port = NULL;
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
+ continue;
+ }
- port->app_stream = stream;
- }
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ break;
+
+ } nxt_queue_loop;
nxt_thread_mutex_unlock(&app->mutex);
@@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
static void
-nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
+nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
- nxt_work_t *work;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- port = obj;
- app = data;
+ app = obj;
+ ra = data;
nxt_assert(app != NULL);
- nxt_assert(app == port->app);
- nxt_assert(port->app_link.next == NULL);
+ nxt_assert(ra != NULL);
+ nxt_assert(ra->app_port != NULL);
+ nxt_debug(task, "app '%V' %p process next stream #%uD",
+ &app->name, app, ra->stream);
- if (task->thread->engine != port->engine) {
- work = &port->work;
+ nxt_router_process_http_request_mp(task, ra);
+}
- nxt_debug(task, "post release port to engine %p", port->engine);
- work->next = NULL;
- work->handler = nxt_router_app_release_port;
- work->task = &port->engine->task;
- work->obj = port;
- work->data = app;
+static void
+nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response)
+{
+ int use_delta, ra_use_delta;
+ nxt_app_t *app;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra;
- nxt_event_engine_post(port->engine, work);
+ nxt_assert(port != NULL);
+ nxt_assert(port->app != NULL);
- return;
+ app = port->app;
+
+ use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ port->app_requests -= request_failed;
+ port->app_responses += got_response;
+
+ if (app->live != 0 &&
+ port->pair[1] != -1 &&
+ port->app_link.next == NULL &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
+ {
+ nxt_queue_insert_tail(&app->ports, &port->app_link);
+ use_delta++;
}
- if (!nxt_queue_is_empty(&app->requests)) {
+ if (app->live != 0 &&
+ !nxt_queue_is_empty(&app->ports) &&
+ !nxt_queue_is_empty(&app->requests))
+ {
lnk = nxt_queue_first(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, ra->stream);
+ ra_use_delta = 1;
+ ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
- ra->app_port = port;
- port->app_stream = ra->stream;
+ } else {
+ ra = NULL;
+ ra_use_delta = 0;
+ }
- nxt_router_process_http_request_mp(task, ra, port);
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_ra_release(task, ra, ra->work.data);
+ if (ra != NULL) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, ra);
- return;
+ goto adjust_use;
}
- port->app_stream = 0;
-
+ /* ? */
if (port->pair[1] == -1) {
- nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
- &app->name, app, port->pid);
-
- app->workers--;
- nxt_router_app_free(task, app);
+ nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
+ &app->name, app, port, port->pid);
- port->app = NULL;
-
- nxt_port_release(port);
-
- return;
+ goto adjust_use;
}
- if (!app->live) {
+ if (app->live == 0) {
nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
- return;
+ goto adjust_use;
}
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
&app->name, app);
- nxt_thread_mutex_lock(&app->mutex);
+adjust_use:
- nxt_queue_insert_head(&app->ports, &port->app_link);
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
- nxt_thread_mutex_unlock(&app->mutex);
+ if (ra_use_delta != 0) {
+ nxt_port_use(task, ra->app_port, ra_use_delta);
+ }
}
-nxt_bool_t
-nxt_router_app_remove_port(nxt_port_t *port)
+void
+nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
nxt_app_t *app;
- nxt_bool_t busy;
+ nxt_bool_t unchain, start_worker;
app = port->app;
- busy = port->app_stream != 0;
-
- if (app == NULL) {
- nxt_thread_log_debug("port %p app remove, no app", port);
-
- nxt_assert(port->app_link.next == NULL);
- return 1;
- }
+ nxt_assert(app != NULL);
nxt_thread_mutex_lock(&app->mutex);
- if (port->app_link.next != NULL) {
+ unchain = port->app_link.next != NULL;
+ if (unchain) {
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
+ }
+ app->workers--;
+
+ start_worker = app->live != 0 &&
+ nxt_queue_is_empty(&app->requests) == 0 &&
+ app->workers + app->pending_workers < app->max_workers;
+
+ if (start_worker) {
+ app->pending_workers++;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (busy == 0) {
- nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
- &app->name, app);
-
- app->workers--;
- nxt_router_app_free(&port->engine->task, app);
+ nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
- return 1;
+ if (unchain) {
+ nxt_port_use(task, port, -1);
}
- nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
- "app stream #%uD", port, &app->name, app,
- port->app_stream);
-
- return 0;
+ if (start_worker) {
+ nxt_router_start_worker(task, app);
+ }
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ int use_delta;
+ nxt_int_t res;
nxt_app_t *app;
+ nxt_bool_t can_start_worker;
nxt_conn_t *c;
nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_start_worker_t *sw;
nxt_socket_conf_joint_t *joint;
port = NULL;
+ use_delta = 1;
c = ra->rc->conn;
joint = c->listen->socket.data;
@@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_ERROR;
}
+ ra->rc->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
engine = task->thread->engine;
nxt_timer_disable(engine, &c->read_timer);
@@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_timer_add(engine, &c->read_timer, app->timeout);
}
- port = nxt_router_app_get_port(app, ra->stream);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (!nxt_queue_is_empty(&app->ports)) {
+ port = nxt_router_app_get_port_unsafe(app, &use_delta);
+
+ can_start_worker = 0;
+
+ } else {
+ nxt_queue_insert_tail(&app->requests, &ra->link);
+
+ can_start_worker = (app->workers + app->pending_workers) <
+ app->max_workers;
+ if (can_start_worker) {
+ app->pending_workers++;
+ }
+
+ port = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
if (port != NULL) {
- nxt_debug(task, "already have port for app '%V'", &app->name);
+ nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
ra->app_port = port;
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
return NXT_OK;
}
- sw = nxt_router_sw_create(task, app, ra);
+ if (!can_start_worker) {
+ nxt_debug(task, "app '%V' %p too many running or pending workers",
+ &app->name, app);
+
+ return NXT_AGAIN;
+ }
+
+ res = nxt_router_start_worker(task, app);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_router_gen_error(task, c, 500, "Failed to start worker");
- if (nxt_slow_path(sw == NULL)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to allocate start worker struct");
return NXT_ERROR;
}
@@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
- nxt_router_process_http_request_mp(task, ra, port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
+ nxt_router_process_http_request_mp(task, ra);
}
static void
-nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
- nxt_port_t *port)
+nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ uint32_t request_failed;
nxt_int_t res;
- nxt_port_t *c_port, *reply_port;
+ nxt_port_t *port, *c_port, *reply_port;
nxt_conn_t *c;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
@@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
/* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
nxt_assert(ra->rc != NULL);
+ nxt_assert(ra->app_port != NULL);
+ port = ra->app_port;
reply_port = ra->reply_port;
ap = ra->ap;
c = ra->rc->conn;
+ request_failed = 1;
+
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
if (nxt_slow_path(c_port != reply_port)) {
@@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send reply port to application");
- return;
+ goto release_port;
}
nxt_process_connected_port_add(port->process, reply_port);
@@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to prepare message for application");
- return;
+ goto release_port;
}
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
+ request_failed = 0;
+
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, ra->stream, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send message to application");
- return;
+ goto release_port;
}
+
+release_port:
+
+ if (request_failed != 0) {
+ ra->app_port = 0;
+ }
+
+ nxt_router_app_port_release(task, port, request_failed, 0);
+
+ nxt_router_ra_release(task, ra, ra->work.data);
}
@@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index f5c5f7aa..5056021e 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -87,6 +87,7 @@ struct nxt_app_s {
uint32_t pending_workers;
uint32_t workers;
uint32_t max_workers;
+ uint32_t max_pending_responses;
nxt_msec_t timeout;
@@ -97,6 +98,8 @@ struct nxt_app_s {
nxt_str_t conf;
nxt_app_prepare_msg_t prepare_msg;
+
+ nxt_atomic_t use_count;
};
@@ -141,6 +144,7 @@ void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port);
+void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port);
+void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
#endif /* _NXT_ROUTER_H_INCLUDED_ */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index deab980e..a96f4cea 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -553,7 +553,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
nxt_runtime_process_each(rt, process) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
} nxt_runtime_process_loop;
@@ -1725,13 +1725,16 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
void
-nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
+nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
{
nxt_port_t *port;
+ nxt_runtime_t *rt;
nxt_lvlhsh_query_t lhq;
nxt_assert(process->registered == 0);
+ rt = task->thread->runtime;
+
nxt_runtime_process_lhq_pid(&lhq, &process->pid);
lhq.replace = 0;
@@ -1753,7 +1756,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
port->pid = process->pid;
- nxt_runtime_port_add(rt, port);
+ nxt_runtime_port_add(task, port);
} nxt_process_port_loop;
@@ -1809,9 +1812,12 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
void
-nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
+nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process)
{
- nxt_port_t *port;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
if (process->port_cleanups == 0) {
if (process->registered == 1) {
@@ -1823,9 +1829,9 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
} else {
nxt_process_port_each(process, port) {
- nxt_runtime_port_remove(rt, port);
+ nxt_port_close(task, port);
- nxt_port_release(port);
+ nxt_runtime_port_remove(task, port);
} nxt_process_port_loop;
}
@@ -1851,22 +1857,34 @@ nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
void
-nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
+nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
nxt_port_hash_add(&rt->ports, port);
rt->port_by_type[port->type] = port;
+
+ nxt_port_use(task, port, 1);
}
void
-nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
+nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
{
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
nxt_port_hash_remove(&rt->ports, port);
if (rt->port_by_type[port->type] == port) {
rt->port_by_type[port->type] = NULL;
}
+
+ nxt_port_use(task, port, -1);
}
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index d0decfd2..5aa897dc 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -102,11 +102,11 @@ nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
+void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process);
+void nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each_t *lhe);
@@ -115,9 +115,9 @@ nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each(&rt->processes, lhe)
-void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port);
+void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
-void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port);
+void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
nxt_port_id_t port_id);