summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r--src/nxt_port.c50
1 files changed, 35 insertions, 15 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 7232c465..dbcdec11 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -8,6 +8,7 @@
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_router.h>
+#include <nxt_port_queue.h>
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
@@ -67,8 +68,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
- nxt_queue_init(&port->pending_requests);
- nxt_queue_init(&port->active_websockets);
+
+ port->queue_fd = -1;
} else {
nxt_mp_destroy(mp);
@@ -101,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_router_app_port_close(task, port);
}
}
+
+ if (port->queue_fd != -1) {
+ nxt_fd_close(port->queue_fd);
+ port->queue_fd = -1;
+ }
+
+ if (port->queue != NULL) {
+ nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t));
+ port->queue = NULL;
+ }
}
@@ -178,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
+/* TODO join with process_ready and move to nxt_main_process.c */
nxt_inline void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port, uint32_t stream)
@@ -229,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
msg->max_share = port->max_share;
msg->type = new_port->type;
- return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
- new_port->pair[1], stream, 0, b);
+ return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
+ new_port->pair[1], new_port->queue_fd,
+ stream, 0, b);
}
@@ -248,15 +261,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
/* TODO check b size and make plain */
nxt_debug(task, "new port %d received for process %PI:%d",
- msg->fd, new_port_msg->pid, new_port_msg->id);
+ msg->fd[0], new_port_msg->pid, new_port_msg->id);
port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
if (port != NULL) {
nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
new_port_msg->id);
- nxt_fd_close(msg->fd);
- msg->fd = -1;
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
return;
}
@@ -267,10 +280,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- nxt_fd_nonblocking(task, msg->fd);
+ nxt_fd_nonblocking(task, msg->fd[0]);
port->pair[0] = -1;
- port->pair[1] = msg->fd;
+ port->pair[1] = msg->fd[0];
port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share;
@@ -281,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
msg->u.new_port = port;
}
-
+/* TODO move to nxt_main_process.c */
void
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -306,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
+ if (msg->fd[0] != -1) {
+ port->queue_fd = msg->fd[0];
+ port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ msg->fd[0], 0);
+ }
+
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
}
@@ -318,7 +338,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime;
- if (nxt_slow_path(msg->fd == -1)) {
+ if (nxt_slow_path(msg->fd[0] == -1)) {
nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
return;
@@ -332,11 +352,11 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
goto fail_close;
}
- nxt_port_incoming_port_mmap(task, process, msg->fd);
+ nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
fail_close:
- nxt_fd_close(msg->fd);
+ nxt_fd_close(msg->fd[0]);
}
@@ -389,14 +409,14 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
log_file = nxt_list_elt(rt->log_files, slot);
- nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
+ nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
/*
* The old log file descriptor must be closed at the moment when no
* other threads use it. dup2() allows to use the old file descriptor
* for new log file. This change is performed atomically in the kernel.
*/
- if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
+ if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
if (slot == 0) {
(void) nxt_file_stderr(log_file);
}