summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
commitb0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch)
tree08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_port.c
parentc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff)
downloadunit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz
unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master. Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r--src/nxt_port.c95
1 files changed, 78 insertions, 17 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 404e6424..3abc4125 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -11,13 +11,28 @@
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static nxt_atomic_uint_t nxt_port_last_id;
+
+nxt_port_id_t
+nxt_port_get_next_id()
+{
+ return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
+}
+
void
-nxt_port_create(nxt_task_t *task, nxt_port_t *port,
+nxt_port_reset_next_id()
+{
+ nxt_port_last_id = 1;
+}
+
+
+void
+nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers)
{
port->pid = nxt_pid;
- port->engine = task->thread->engine->id;
+ port->engine = task->thread->engine;
port->handler = nxt_port_handler;
port->data = handlers;
@@ -77,12 +92,13 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
- nxt_port_t *new_port)
+ nxt_port_t *new_port, uint32_t stream)
{
+ nxt_port_t *port;
nxt_process_t *process;
- nxt_debug(task, "new port %d for process %PI engine %uD",
- new_port->pair[1], new_port->pid, new_port->engine);
+ nxt_debug(task, "new port %d for process %PI",
+ new_port->pair[1], new_port->pid);
nxt_runtime_process_each(rt, process)
{
@@ -90,15 +106,22 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
continue;
}
- (void) nxt_port_send_port(task, nxt_process_port_first(process),
- new_port);
+ port = nxt_process_port_first(process);
+
+ if (port->type == NXT_PROCESS_MASTER ||
+ port->type == NXT_PROCESS_CONTROLLER ||
+ port->type == NXT_PROCESS_ROUTER) {
+
+ (void) nxt_port_send_port(task, port, new_port, stream);
+ }
}
nxt_runtime_process_loop;
}
nxt_int_t
-nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port)
+nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
+ uint32_t stream)
{
nxt_buf_t *b;
nxt_port_msg_new_port_t *msg;
@@ -116,13 +139,12 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port)
msg->id = new_port->id;
msg->pid = new_port->pid;
- msg->engine = new_port->engine;
msg->max_size = port->max_size;
msg->max_share = port->max_share;
msg->type = new_port->type;
return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
- new_port->pair[1], 0, 0, b);
+ new_port->pair[1], stream, 0, b);
}
@@ -140,12 +162,26 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
msg->buf->mem.pos = msg->buf->mem.free;
+ nxt_debug(task, "new port %d received for process %PI:%d",
+ msg->fd, new_port_msg->pid, new_port_msg->id);
+
+ port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
+ if (port != NULL) {
+ nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
+ new_port_msg->id);
+
+ nxt_fd_close(msg->fd);
+ msg->fd = -1;
+ return;
+ }
+
process = nxt_runtime_process_get(rt, new_port_msg->pid);
if (nxt_slow_path(process == NULL)) {
return;
}
- port = nxt_process_port_new(process);
+ port = nxt_process_port_new(rt, process, new_port_msg->id,
+ new_port_msg->type);
if (nxt_slow_path(port == NULL)) {
return;
}
@@ -157,16 +193,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->mem_pool = mp;
- nxt_debug(task, "new port %d received for process %PI engine %uD",
- msg->fd, new_port_msg->pid, new_port_msg->engine);
-
- port->id = new_port_msg->id;
- port->engine = new_port_msg->engine;
port->pair[0] = -1;
port->pair[1] = msg->fd;
port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share;
- port->type = new_port_msg->type;
nxt_queue_init(&port->messages);
@@ -175,6 +205,37 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_runtime_port_add(rt, port);
nxt_port_write_enable(task, port);
+
+ msg->new_port = port;
+}
+
+
+void
+nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *port;
+ nxt_process_t *process;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ process = nxt_runtime_process_get(rt, msg->port_msg.pid);
+ if (nxt_slow_path(process == NULL)) {
+ return;
+ }
+
+ process->ready = 1;
+
+ port = nxt_process_port_first(process);
+ if (nxt_slow_path(port == NULL)) {
+ return;
+ }
+
+ nxt_debug(task, "process %PI ready", msg->port_msg.pid);
+
+ if (nxt_runtime_is_master(rt)) {
+ nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
+ }
}