summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
committerMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
commit6a64533fa3b96bb64bfde4615e40376d65a292cb (patch)
treea18ed8059158d833290519e1135209747e28af21 /src
parent414d508e04d26ebef0e3e1ba4ed518b11d3af1a0 (diff)
downloadunit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.gz
unit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.bz2
Introducing use counters for port and app. Thread safe port write.
Use counter helps to simplify logic around port and application free. Port 'post' function introduced to simplify post execution of particular function to original port engine's thread. Write message queue is protected by mutex which makes port write operation thread safe.
Diffstat (limited to '')
-rw-r--r--src/nxt_main_process.c15
-rw-r--r--src/nxt_port.c129
-rw-r--r--src/nxt_port.h17
-rw-r--r--src/nxt_port_socket.c104
-rw-r--r--src/nxt_process.c12
-rw-r--r--src/nxt_router.c762
-rw-r--r--src/nxt_router.h6
-rw-r--r--src/nxt_runtime.c36
-rw-r--r--src/nxt_runtime.h8
9 files changed, 682 insertions, 407 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 90c06a05..9c27a89b 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -260,14 +260,17 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
+ nxt_process_port_add(task, process, port);
+
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
return ret;
}
- nxt_process_port_add(task, process, port);
+ nxt_runtime_port_add(task, port);
- nxt_runtime_port_add(rt, port);
+ nxt_port_use(task, port, -1);
/*
* A main process port. A write port is not closed
@@ -508,7 +511,7 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
port = nxt_port_new(task, 0, 0, init->type);
if (nxt_slow_path(port == NULL)) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
return NXT_ERROR;
}
@@ -516,12 +519,14 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_release(port->mem_pool, port);
+ nxt_port_use(task, port, -1);
return ret;
}
pid = nxt_process_create(task, process);
+ nxt_port_use(task, port, -1);
+
switch (pid) {
case -1:
@@ -755,7 +760,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
if (process) {
init = process->init;
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
if (!nxt_exiting) {
nxt_runtime_process_each(rt, process) {
diff --git a/src/nxt_port.c b/src/nxt_port.c
index d7f42012..3f7dc411 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -27,13 +27,15 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
nxt_assert(port->pair[0] == -1);
nxt_assert(port->pair[1] == -1);
- nxt_assert(port->app_stream == 0);
+ nxt_assert(port->use_count == 0);
nxt_assert(port->app_link.next == NULL);
nxt_assert(nxt_queue_is_empty(&port->messages));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
+ nxt_thread_mutex_destroy(&port->write_mutex);
+
nxt_mp_free(mp, port);
}
@@ -58,10 +60,12 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
port->pid = pid;
port->type = type;
port->mem_pool = mp;
+ port->use_count = 1;
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
nxt_queue_init(&port->messages);
+ nxt_thread_mutex_create(&port->write_mutex);
} else {
nxt_mp_destroy(mp);
@@ -73,11 +77,11 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
}
-nxt_bool_t
-nxt_port_release(nxt_port_t *port)
+void
+nxt_port_close(nxt_task_t *task, nxt_port_t *port)
{
- nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
- port->id, port->type);
+ nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
+ port->id, port->type);
if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]);
@@ -87,21 +91,31 @@ nxt_port_release(nxt_port_t *port)
if (port->pair[1] != -1) {
nxt_fd_close(port->pair[1]);
port->pair[1] = -1;
- }
- if (port->type == NXT_PROCESS_WORKER) {
- if (nxt_router_app_remove_port(port) == 0) {
- return 0;
+ if (port->app != NULL) {
+ nxt_router_app_port_close(task, port);
}
}
+}
+
+
+static void
+nxt_port_release(nxt_task_t *task, nxt_port_t *port)
+{
+ nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
+ port->id, port->type);
+
+ if (port->app != NULL) {
+ nxt_router_app_use(task, port->app, -1);
+
+ port->app = NULL;
+ }
if (port->link.next != NULL) {
nxt_process_port_remove(port);
}
nxt_mp_release(port->mem_pool, NULL);
-
- return 1;
}
@@ -263,7 +277,9 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(rt, port);
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
nxt_port_write_enable(task, port);
@@ -434,7 +450,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
process = nxt_runtime_process_find(rt, pid);
if (process) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
}
}
@@ -444,3 +460,90 @@ nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "port empty handler");
}
+
+
+typedef struct {
+ nxt_work_t work;
+ nxt_port_t *port;
+ nxt_port_post_handler_t handler;
+} nxt_port_work_t;
+
+
+static void
+nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_port_t *port;
+ nxt_port_work_t *pw;
+ nxt_port_post_handler_t handler;
+
+ pw = obj;
+ port = pw->port;
+ handler = pw->handler;
+
+ nxt_free(pw);
+
+ handler(task, port, data);
+
+ nxt_port_use(task, port, -1);
+}
+
+
+nxt_int_t
+nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data)
+{
+ nxt_port_work_t *pw;
+
+ if (task->thread->engine == port->engine) {
+ handler(task, port, data);
+
+ return NXT_OK;
+ }
+
+ pw = nxt_zalloc(sizeof(nxt_port_work_t));
+
+ if (nxt_slow_path(pw == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&port->use_count, 1);
+
+ pw->work.handler = nxt_port_post_handler;
+ pw->work.task = &port->engine->task;
+ pw->work.obj = pw;
+ pw->work.data = data;
+
+ pw->port = port;
+ pw->handler = handler;
+
+ nxt_event_engine_post(port->engine, &pw->work);
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ /* no op */
+}
+
+
+void
+nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
+{
+ int c;
+
+ c = nxt_atomic_fetch_add(&port->use_count, i);
+
+ if (i < 0 && c == -i) {
+
+ if (task->thread->engine == port->engine) {
+ nxt_port_release(task, port);
+
+ return;
+ }
+
+ nxt_port_post(task, port, nxt_port_release_handler, NULL);
+ }
+}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index c5b2b40e..f6679bb2 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -115,7 +115,6 @@ typedef struct {
size_t share;
nxt_fd_t fd;
nxt_bool_t close_fd;
- nxt_bool_t opened;
nxt_port_msg_t port_msg;
nxt_work_t work;
@@ -145,12 +144,15 @@ struct nxt_port_s {
nxt_app_t *app;
nxt_queue_t messages; /* of nxt_port_send_msg_t */
+ nxt_thread_mutex_t write_mutex;
/* Maximum size of message part. */
uint32_t max_size;
/* Maximum interleave of message parts. */
uint32_t max_share;
- uint32_t app_stream;
+
+ uint32_t app_requests;
+ uint32_t app_responses;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
@@ -167,8 +169,9 @@ struct nxt_port_s {
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
+ nxt_atomic_t use_count;
+
nxt_process_type_t type;
- nxt_work_t work;
struct iovec *iov;
void *mmsg_buf;
@@ -194,9 +197,11 @@ typedef union {
} nxt_port_data_t;
+typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
+ void *data);
+
nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_process_type_t type);
-nxt_bool_t nxt_port_release(nxt_port_t *port);
nxt_port_id_t nxt_port_get_next_id(void);
void nxt_port_reset_next_id(void);
@@ -204,6 +209,7 @@ void nxt_port_reset_next_id(void);
nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
size_t max_size);
void nxt_port_destroy(nxt_port_t *port);
+void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
@@ -231,5 +237,8 @@ void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data);
+void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
#endif /* _NXT_PORT_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 75706459..d5ed493d 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
{
nxt_port_send_msg_t *msg;
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
-
- if ((type & NXT_PORT_MSG_SYNC) != 0) {
- msg->opened = 0;
- continue;
- }
-
- if (msg->port_msg.stream == stream
- && msg->port_msg.reply_port == reply_port
- && msg->port_msg.last == 0
- && msg->opened) {
-
- /*
- * An fd is ignored since a file descriptor
- * must be sent only in the first message of a stream.
- */
- nxt_buf_chain_add(&msg->buf, b);
-
- msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
-
- return NXT_OK;
- }
-
- } nxt_queue_loop;
-
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
@@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
- msg->opened = 1;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_insert_tail(&port->messages, &msg->link);
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ nxt_port_use(task, port, 1);
+
if (port->socket.write_ready) {
nxt_port_write_handler(task, &port->socket, NULL);
}
@@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
static void
+nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_block_write(task->thread->engine, &port->socket);
+}
+
+
+static void
+nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+}
+
+
+static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
size_t plain_size;
ssize_t n;
+ nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
@@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
+ block_write = 0;
+ enable_write = 0;
+ use_delta = 0;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
iov = port->iov;
do {
link = nxt_queue_first(&port->messages);
if (link == nxt_queue_tail(&port->messages)) {
- nxt_fd_event_block_write(task->thread->engine, &port->socket);
- return;
+ block_write = 1;
+ goto unlock_mutex;
}
msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
@@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
}
@@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) {
- /* TODO task->thread->engine or port->engine ? */
- nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+ enable_write = 1;
}
- return;
+ goto unlock_mutex;
fail:
+ use_delta++;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket, NULL);
+ nxt_port_error_handler, task, &port->socket,
+ &port->socket);
+
+unlock_mutex:
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (block_write && nxt_fd_event_is_active(port->socket.write)) {
+ nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
+ }
+
+ if (enable_write) {
+ nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
+ }
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
@@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
nxt_buf_t *b;
nxt_port_t *port;
nxt_work_queue_t *wq;
@@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
+ use_delta = 0;
+
+ if (obj == data) {
+ use_delta--;
+ }
+
+ wq = &task->thread->engine->fast_work_queue;
- wq = &task->thread->engine->fast_work_queue;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
for(b = msg->buf; b != NULL; b = b->next) {
if (nxt_buf_is_sync(b)) {
@@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
}
nxt_queue_remove(&msg->link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
} nxt_queue_loop;
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 95e701f8..d3ec36ed 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -58,7 +58,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
if (!p->ready) {
nxt_debug(task, "remove not ready process %PI", p->pid);
- nxt_runtime_process_remove(rt, p);
+ nxt_runtime_process_remove(task, p);
} else {
nxt_port_mmaps_destroy(p->incoming, 0);
@@ -67,7 +67,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
} nxt_runtime_process_loop;
- nxt_runtime_process_add(rt, process);
+ nxt_runtime_process_add(task, process);
nxt_process_start(task, process);
@@ -81,7 +81,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
process->pid = pid;
- nxt_runtime_process_add(rt, process);
+ nxt_runtime_process_add(task, process);
break;
}
@@ -589,16 +589,14 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
static void
nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
{
- nxt_runtime_t *rt;
nxt_process_t *process;
process = obj;
- rt = data;
process->port_cleanups--;
if (process->port_cleanups == 0) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
}
}
@@ -609,7 +607,7 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
nxt_queue_insert_tail(&process->ports, &port->link);
nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process,
- task->thread->runtime);
+ NULL);
process->port_cleanups++;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df3caf82..5eb95b59 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -24,19 +24,12 @@ typedef struct {
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
-typedef struct nxt_start_worker_s nxt_start_worker_t;
-
-struct nxt_start_worker_s {
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
-
- nxt_work_t work;
-};
typedef struct {
uint32_t stream;
nxt_conn_t *conn;
+ nxt_app_t *app;
nxt_port_t *app_port;
nxt_req_app_link_t *ra;
@@ -72,6 +65,8 @@ typedef struct {
} nxt_remove_pid_msg_t;
+static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
@@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
-static void nxt_router_apps_sort(nxt_router_t *router,
+static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_t *router,
@@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
-static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
- void *data);
-static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
-static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
-static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
- void *data);
+static void nxt_router_app_port_ready(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_port_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+
+static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
+static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
- nxt_req_app_link_t *ra, nxt_port_t *port);
+ nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
@@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data)
}
-static nxt_start_worker_t *
-nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+static void
+nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
{
- nxt_port_t *main_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
+ size_t size;
+ uint32_t stream;
+ nxt_app_t *app;
+ nxt_buf_t *b;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
- sw = nxt_zalloc(sizeof(nxt_start_worker_t));
+ app = data;
- if (nxt_slow_path(sw == NULL)) {
- return NULL;
- }
+ rt = task->thread->runtime;
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- sw->app = app;
- sw->ra = ra;
+ nxt_debug(task, "app '%V' %p start worker", &app->name, app);
- nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
- ra->stream, &app->name, app);
+ size = app->name.length + 1 + app->conf.length;
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
- sw->work.handler = nxt_router_send_sw_request;
- sw->work.task = &main_port->engine->task;
- sw->work.obj = sw;
- sw->work.data = task->thread->engine;
- sw->work.next = NULL;
+ if (nxt_slow_path(b == NULL)) {
+ goto failed;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
- if (task->thread->engine != main_port->engine) {
- nxt_debug(task, "sw %p post send to main engine %p", sw,
- main_port->engine);
+ stream = nxt_port_rpc_register_handler(task, port,
+ nxt_router_app_port_ready,
+ nxt_router_app_port_error,
+ -1, app);
- nxt_event_engine_post(main_port->engine, &sw->work);
+ if (nxt_slow_path(stream == 0)) {
+ nxt_mp_release(b->data, b);
- } else {
- nxt_router_send_sw_request(task, sw, sw->work.data);
+ goto failed;
}
- return sw;
-}
+ nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, port->id, b);
+
+ return;
+failed:
-nxt_inline void
-nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
-{
- nxt_debug(task, "sw %p release", sw);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
- nxt_free(sw);
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
}
-nxt_inline void
-nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
+static nxt_int_t
+nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
{
- nxt_queue_remove(&rc->link);
+ nxt_int_t res;
+ nxt_port_t *router_port;
+ nxt_runtime_t *rt;
- if (rc->ra != NULL) {
- rc->ra->rc = NULL;
- rc->ra = NULL;
+ rt = task->thread->runtime;
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ nxt_router_app_use(task, app, 1);
+
+ res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
+ app);
+
+ if (res == NXT_OK) {
+ return res;
}
- rc->conn = NULL;
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
+
+ return NXT_ERROR;
}
@@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
static void
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
{
- nxt_port_t *app_port;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_req_app_link_t *ra;
+ nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
ra = obj;
engine = data;
- if (ra->app_port != NULL) {
-
- app_port = ra->app_port;
- ra->app_port = NULL;
-
- if (task->thread->engine != engine) {
- ra->app_pid = app_port->pid;
- }
-
- nxt_router_app_release_port(task, app_port, app_port->app);
-
-#if 0
- /* Uncomment to hold app port until complete response received. */
- if (ra->rc != NULL) {
- ra->rc->app_port = ra->app_port;
-
- } else {
- nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
+ if (task->thread->engine != engine) {
+ if (ra->app_port != NULL) {
+ ra->app_pid = ra->app_port->pid;
}
-#endif
- }
- if (task->thread->engine != engine) {
ra->work.handler = nxt_router_ra_release;
ra->work.task = &engine->task;
ra->work.next = NULL;
@@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
return;
}
- if (ra->rc != NULL && ra->app_pid != -1) {
- nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
+ nxt_debug(task, "ra stream #%uD release", ra->stream);
+
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ if (ra->app_pid != -1) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
+ }
+
+ rc->app_port = ra->app_port;
+
+ ra->app_port = NULL;
+ rc->ra = NULL;
+ ra->rc = NULL;
}
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
+ }
nxt_mp_release(ra->mem_pool, ra);
}
@@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
{
- nxt_conn_t *c;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_conn_t *c;
+ nxt_req_app_link_t *ra;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
ra = obj;
engine = data;
@@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "ra stream #%uD abort", ra->stream);
- if (ra->rc != NULL) {
- c = ra->rc->conn;
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ c = rc->conn;
nxt_router_gen_error(task, c, 500,
"Failed to start application worker");
+
+ rc->ra = NULL;
+ ra->rc = NULL;
+ }
+
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
}
nxt_mp_release(ra->mem_pool, ra);
}
+nxt_inline void
+nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
+{
+ nxt_req_app_link_t *ra;
+
+ if (rc->app_port != NULL) {
+ nxt_router_app_port_release(task, rc->app_port, 0, 1);
+
+ rc->app_port = NULL;
+ }
+
+ ra = rc->ra;
+
+ if (ra != NULL) {
+ rc->ra = NULL;
+ ra->rc = NULL;
+
+ nxt_thread_mutex_lock(&rc->app->mutex);
+
+ if (ra->link.next != NULL) {
+ nxt_queue_remove(&ra->link);
+
+ ra->link.next = NULL;
+
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&rc->app->mutex);
+ }
+
+ if (ra != NULL) {
+ nxt_router_ra_release(task, ra, ra->work.data);
+ }
+
+ if (rc->app != NULL) {
+ nxt_router_app_use(task, rc->app, -1);
+
+ rc->app = NULL;
+ }
+
+ nxt_queue_remove(&rc->link);
+
+ rc->conn = NULL;
+}
+
+
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
{
+ nxt_pid_t pid;
+ nxt_buf_t *buf;
nxt_event_engine_t *engine;
nxt_remove_pid_msg_t *rp;
rp = obj;
- nxt_port_remove_pid_handler(task, &rp->msg);
+ buf = rp->msg.buf;
+
+ nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
+
+ nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
+
+ nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
engine = rp->work.data;
@@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
goto fail;
}
- nxt_router_apps_sort(router, tmcf);
+ nxt_router_apps_sort(task, router, tmcf);
nxt_router_engines_post(router, tmcf);
@@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
app->live = 1;
+ app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[type];
nxt_queue_insert_tail(&tmcf->apps, &app->link);
+
+ nxt_router_app_use(task, app, 1);
}
http = nxt_conf_get_path(conf, &http_path);
@@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
-nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
+nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
+ nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
+ nxt_debug(task, "about to free app '%V' %p", &app->name, app);
app->live = 0;
- if (nxt_router_app_free(NULL, app) != 0) {
- continue;
- }
-
- if (!nxt_queue_is_empty(&app->requests)) {
-
- nxt_thread_log_debug("app '%V' %p pending requests found",
- &app->name, app);
- continue;
- }
-
do {
- port = nxt_router_app_get_port(app, 0);
+ port = nxt_router_app_get_idle_port(app);
if (port == NULL) {
break;
}
- nxt_thread_log_debug("port %p send quit", port);
+ nxt_debug(task, "port %p send quit", port);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
- nxt_port_socket_write(&port->engine->task, port,
- NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+ nxt_port_use(task, port, -1);
} while (1);
+ nxt_router_app_use(task, app, -1);
+
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data)
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_release(port->mem_pool, port);
+ nxt_port_use(task, port, -1);
return;
}
@@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
+ port->engine = task->thread->engine;
nxt_mp_thread_adopt(port->mem_pool);
- nxt_port_release(port);
+ nxt_port_use(task, port, -1);
nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
@@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, last);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
if (b == NULL) {
@@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_gen_error(task, rc->conn, 500,
"Application terminated unexpectedly");
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
@@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
static void
-nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
- nxt_start_worker_t *sw;
+ nxt_app_t *app;
+ nxt_port_t *port;
+
+ app = data;
+ port = msg->new_port;
- sw = data;
+ nxt_assert(app != NULL);
+ nxt_assert(port != NULL);
- nxt_assert(sw != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ port->app = app;
- msg->new_port->app = sw->app;
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
- sw->app->workers++;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
+ app->pending_workers--;
+ app->workers++;
+
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_app_release_port(task, msg->new_port, sw->app);
+ nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
- nxt_router_sw_release(task, sw);
+ nxt_router_app_port_release(task, port, 0, 0);
}
static void
-nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
nxt_app_t *app;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
- nxt_start_worker_t *sw;
- sw = data;
+ app = data;
- nxt_assert(sw != NULL);
- nxt_assert(sw->app != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ nxt_assert(app != NULL);
- app = sw->app;
+ nxt_debug(task, "app '%V' %p start error", &app->name, app);
+
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p error, failed to start app '%V'",
- sw, &app->name);
+ app->pending_workers--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (ra != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
nxt_router_ra_abort(task, ra, ra->work.data);
}
- nxt_router_sw_release(task, sw);
+ nxt_router_app_use(task, app, -1);
}
-static void
-nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
+void
+nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
{
- size_t size;
- uint32_t stream;
- nxt_buf_t *b;
- nxt_app_t *app;
- nxt_port_t *main_port, *router_port, *app_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
- nxt_req_app_link_t *ra;
-
- sw = obj;
- app = sw->app;
+ int c;
- if (nxt_queue_is_empty(&app->requests)) {
- ra = sw->ra;
- app_port = nxt_router_app_get_port(app, ra->stream);
+ c = nxt_atomic_fetch_add(&app->use_count, i);
- if (app_port != NULL) {
- nxt_debug(task, "app '%V' %p process stream #%uD",
- &app->name, app, ra->stream);
+ if (i < 0 && c == -i) {
- ra->app_port = app_port;
+ nxt_assert(app->live == 0);
+ nxt_assert(app->workers == 0);
+ nxt_assert(app->pending_workers == 0);
+ nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
- nxt_router_process_http_request_mp(task, ra, app_port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
- nxt_router_sw_release(task, sw);
-
- return;
- }
- }
-
- nxt_queue_insert_tail(&app->requests, &sw->ra->link);
-
- if (app->workers + app->pending_workers >= app->max_workers) {
- nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, "
- "max_workers (%uD) reached", &app->name, app,
- app->workers, app->pending_workers, app->max_workers);
-
- nxt_router_sw_release(task, sw);
-
- return;
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
}
-
- app->pending_workers++;
-
- nxt_debug(task, "sw %p send", sw);
-
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
-
- size = app->name.length + 1 + app->conf.length;
-
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
-
- nxt_buf_cpystr(b, &app->name);
- *b->mem.free++ = '\0';
- nxt_buf_cpystr(b, &app->conf);
-
- stream = nxt_port_rpc_register_handler(task, router_port,
- nxt_router_sw_ready,
- nxt_router_sw_error,
- main_port->pid, sw);
-
- nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
- stream, router_port->id, b);
}
-static nxt_bool_t
-nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
{
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_port_t *port;
+ nxt_queue_link_t *lnk;
- nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
- app->live, app->workers, app->pending_workers,
- nxt_queue_is_empty(&app->requests));
+ lnk = nxt_queue_first(&app->ports);
+ nxt_queue_remove(lnk);
- if (app->live == 0
- && app->workers == 0
- && app->pending_workers == 0
- && nxt_queue_is_empty(&app->requests))
- {
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
- return 1;
- }
+ port->app_requests++;
- if (app->live == 1
- && nxt_queue_is_empty(&app->requests) == 0
- && app->workers + app->pending_workers < app->max_workers)
+ if (app->live &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
{
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
+ nxt_queue_insert_tail(&app->ports, lnk);
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ lnk->next = NULL;
- nxt_router_sw_create(task, app, ra);
+ (*use_delta)--;
}
- return 0;
+ return port;
}
static nxt_port_t *
-nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
+nxt_router_app_get_idle_port(nxt_app_t *app)
{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
+ nxt_port_t *port;
port = NULL;
nxt_thread_mutex_lock(&app->mutex);
- if (!nxt_queue_is_empty(&app->ports)) {
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
+ nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- lnk->next = NULL;
+ if (port->app_requests > port->app_responses) {
+ port = NULL;
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
+ continue;
+ }
- port->app_stream = stream;
- }
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ break;
+
+ } nxt_queue_loop;
nxt_thread_mutex_unlock(&app->mutex);
@@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
static void
-nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
+nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
- nxt_work_t *work;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- port = obj;
- app = data;
+ app = obj;
+ ra = data;
nxt_assert(app != NULL);
- nxt_assert(app == port->app);
- nxt_assert(port->app_link.next == NULL);
+ nxt_assert(ra != NULL);
+ nxt_assert(ra->app_port != NULL);
+ nxt_debug(task, "app '%V' %p process next stream #%uD",
+ &app->name, app, ra->stream);
- if (task->thread->engine != port->engine) {
- work = &port->work;
+ nxt_router_process_http_request_mp(task, ra);
+}
- nxt_debug(task, "post release port to engine %p", port->engine);
- work->next = NULL;
- work->handler = nxt_router_app_release_port;
- work->task = &port->engine->task;
- work->obj = port;
- work->data = app;
+static void
+nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response)
+{
+ int use_delta, ra_use_delta;
+ nxt_app_t *app;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra;
- nxt_event_engine_post(port->engine, work);
+ nxt_assert(port != NULL);
+ nxt_assert(port->app != NULL);
- return;
+ app = port->app;
+
+ use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ port->app_requests -= request_failed;
+ port->app_responses += got_response;
+
+ if (app->live != 0 &&
+ port->pair[1] != -1 &&
+ port->app_link.next == NULL &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
+ {
+ nxt_queue_insert_tail(&app->ports, &port->app_link);
+ use_delta++;
}
- if (!nxt_queue_is_empty(&app->requests)) {
+ if (app->live != 0 &&
+ !nxt_queue_is_empty(&app->ports) &&
+ !nxt_queue_is_empty(&app->requests))
+ {
lnk = nxt_queue_first(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, ra->stream);
+ ra_use_delta = 1;
+ ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
- ra->app_port = port;
- port->app_stream = ra->stream;
+ } else {
+ ra = NULL;
+ ra_use_delta = 0;
+ }
- nxt_router_process_http_request_mp(task, ra, port);
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_ra_release(task, ra, ra->work.data);
+ if (ra != NULL) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, ra);
- return;
+ goto adjust_use;
}
- port->app_stream = 0;
-
+ /* ? */
if (port->pair[1] == -1) {
- nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
- &app->name, app, port->pid);
-
- app->workers--;
- nxt_router_app_free(task, app);
+ nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
+ &app->name, app, port, port->pid);
- port->app = NULL;
-
- nxt_port_release(port);
-
- return;
+ goto adjust_use;
}
- if (!app->live) {
+ if (app->live == 0) {
nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
- return;
+ goto adjust_use;
}
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
&app->name, app);
- nxt_thread_mutex_lock(&app->mutex);
+adjust_use:
- nxt_queue_insert_head(&app->ports, &port->app_link);
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
- nxt_thread_mutex_unlock(&app->mutex);
+ if (ra_use_delta != 0) {
+ nxt_port_use(task, ra->app_port, ra_use_delta);
+ }
}
-nxt_bool_t
-nxt_router_app_remove_port(nxt_port_t *port)
+void
+nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
nxt_app_t *app;
- nxt_bool_t busy;
+ nxt_bool_t unchain, start_worker;
app = port->app;
- busy = port->app_stream != 0;
-
- if (app == NULL) {
- nxt_thread_log_debug("port %p app remove, no app", port);
-
- nxt_assert(port->app_link.next == NULL);
- return 1;
- }
+ nxt_assert(app != NULL);
nxt_thread_mutex_lock(&app->mutex);
- if (port->app_link.next != NULL) {
+ unchain = port->app_link.next != NULL;
+ if (unchain) {
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
+ }
+ app->workers--;
+
+ start_worker = app->live != 0 &&
+ nxt_queue_is_empty(&app->requests) == 0 &&
+ app->workers + app->pending_workers < app->max_workers;
+
+ if (start_worker) {
+ app->pending_workers++;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (busy == 0) {
- nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
- &app->name, app);
-
- app->workers--;
- nxt_router_app_free(&port->engine->task, app);
+ nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
- return 1;
+ if (unchain) {
+ nxt_port_use(task, port, -1);
}
- nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
- "app stream #%uD", port, &app->name, app,
- port->app_stream);
-
- return 0;
+ if (start_worker) {
+ nxt_router_start_worker(task, app);
+ }
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ int use_delta;
+ nxt_int_t res;
nxt_app_t *app;
+ nxt_bool_t can_start_worker;
nxt_conn_t *c;
nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_start_worker_t *sw;
nxt_socket_conf_joint_t *joint;
port = NULL;
+ use_delta = 1;
c = ra->rc->conn;
joint = c->listen->socket.data;
@@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_ERROR;
}
+ ra->rc->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
engine = task->thread->engine;
nxt_timer_disable(engine, &c->read_timer);
@@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_timer_add(engine, &c->read_timer, app->timeout);
}
- port = nxt_router_app_get_port(app, ra->stream);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (!nxt_queue_is_empty(&app->ports)) {
+ port = nxt_router_app_get_port_unsafe(app, &use_delta);
+
+ can_start_worker = 0;
+
+ } else {
+ nxt_queue_insert_tail(&app->requests, &ra->link);
+
+ can_start_worker = (app->workers + app->pending_workers) <
+ app->max_workers;
+ if (can_start_worker) {
+ app->pending_workers++;
+ }
+
+ port = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
if (port != NULL) {
- nxt_debug(task, "already have port for app '%V'", &app->name);
+ nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
ra->app_port = port;
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
return NXT_OK;
}
- sw = nxt_router_sw_create(task, app, ra);
+ if (!can_start_worker) {
+ nxt_debug(task, "app '%V' %p too many running or pending workers",
+ &app->name, app);
+
+ return NXT_AGAIN;
+ }
+
+ res = nxt_router_start_worker(task, app);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_router_gen_error(task, c, 500, "Failed to start worker");
- if (nxt_slow_path(sw == NULL)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to allocate start worker struct");
return NXT_ERROR;
}
@@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
- nxt_router_process_http_request_mp(task, ra, port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
+ nxt_router_process_http_request_mp(task, ra);
}
static void
-nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
- nxt_port_t *port)
+nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ uint32_t request_failed;
nxt_int_t res;
- nxt_port_t *c_port, *reply_port;
+ nxt_port_t *port, *c_port, *reply_port;
nxt_conn_t *c;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
@@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
/* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
nxt_assert(ra->rc != NULL);
+ nxt_assert(ra->app_port != NULL);
+ port = ra->app_port;
reply_port = ra->reply_port;
ap = ra->ap;
c = ra->rc->conn;
+ request_failed = 1;
+
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
if (nxt_slow_path(c_port != reply_port)) {
@@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send reply port to application");
- return;
+ goto release_port;
}
nxt_process_connected_port_add(port->process, reply_port);
@@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to prepare message for application");
- return;
+ goto release_port;
}
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
+ request_failed = 0;
+
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, ra->stream, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send message to application");
- return;
+ goto release_port;
}
+
+release_port:
+
+ if (request_failed != 0) {
+ ra->app_port = 0;
+ }
+
+ nxt_router_app_port_release(task, port, request_failed, 0);
+
+ nxt_router_ra_release(task, ra, ra->work.data);
}
@@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index f5c5f7aa..5056021e 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -87,6 +87,7 @@ struct nxt_app_s {
uint32_t pending_workers;
uint32_t workers;
uint32_t max_workers;
+ uint32_t max_pending_responses;
nxt_msec_t timeout;
@@ -97,6 +98,8 @@ struct nxt_app_s {
nxt_str_t conf;
nxt_app_prepare_msg_t prepare_msg;
+
+ nxt_atomic_t use_count;
};
@@ -141,6 +144,7 @@ void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port);
+void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port);
+void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
#endif /* _NXT_ROUTER_H_INCLUDED_ */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index deab980e..a96f4cea 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -553,7 +553,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
nxt_runtime_process_each(rt, process) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
} nxt_runtime_process_loop;
@@ -1725,13 +1725,16 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
void
-nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
+nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
{
nxt_port_t *port;
+ nxt_runtime_t *rt;
nxt_lvlhsh_query_t lhq;
nxt_assert(process->registered == 0);
+ rt = task->thread->runtime;
+
nxt_runtime_process_lhq_pid(&lhq, &process->pid);
lhq.replace = 0;
@@ -1753,7 +1756,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
port->pid = process->pid;
- nxt_runtime_port_add(rt, port);
+ nxt_runtime_port_add(task, port);
} nxt_process_port_loop;
@@ -1809,9 +1812,12 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
void
-nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
+nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process)
{
- nxt_port_t *port;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
if (process->port_cleanups == 0) {
if (process->registered == 1) {
@@ -1823,9 +1829,9 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
} else {
nxt_process_port_each(process, port) {
- nxt_runtime_port_remove(rt, port);
+ nxt_port_close(task, port);
- nxt_port_release(port);
+ nxt_runtime_port_remove(task, port);
} nxt_process_port_loop;
}
@@ -1851,22 +1857,34 @@ nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
void
-nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
+nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
nxt_port_hash_add(&rt->ports, port);
rt->port_by_type[port->type] = port;
+
+ nxt_port_use(task, port, 1);
}
void
-nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
+nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
{
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
nxt_port_hash_remove(&rt->ports, port);
if (rt->port_by_type[port->type] == port) {
rt->port_by_type[port->type] = NULL;
}
+
+ nxt_port_use(task, port, -1);
}
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index d0decfd2..5aa897dc 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -102,11 +102,11 @@ nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
+void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process);
+void nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each_t *lhe);
@@ -115,9 +115,9 @@ nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each(&rt->processes, lhe)
-void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port);
+void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
-void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port);
+void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
nxt_port_id_t port_id);