diff options
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r-- | src/nxt_port.c | 50 |
1 files changed, 35 insertions, 15 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 7232c465..dbcdec11 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_router.h> +#include <nxt_port_queue.h> static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -67,8 +68,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); - nxt_queue_init(&port->pending_requests); - nxt_queue_init(&port->active_websockets); + + port->queue_fd = -1; } else { nxt_mp_destroy(mp); @@ -101,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) nxt_router_app_port_close(task, port); } } + + if (port->queue_fd != -1) { + nxt_fd_close(port->queue_fd); + port->queue_fd = -1; + } + + if (port->queue != NULL) { + nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + port->queue = NULL; + } } @@ -178,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +/* TODO join with process_ready and move to nxt_main_process.c */ nxt_inline void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port, uint32_t stream) @@ -229,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, msg->max_share = port->max_share; msg->type = new_port->type; - return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->pair[1], stream, 0, b); + return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, + new_port->pair[1], new_port->queue_fd, + stream, 0, b); } @@ -248,15 +261,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) /* TODO check b size and make plain */ nxt_debug(task, "new port %d received for process %PI:%d", - msg->fd, new_port_msg->pid, new_port_msg->id); + msg->fd[0], new_port_msg->pid, new_port_msg->id); port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); if (port != NULL) { nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); - nxt_fd_close(msg->fd); - msg->fd = -1; + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; return; } @@ -267,10 +280,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_fd_nonblocking(task, msg->fd); + nxt_fd_nonblocking(task, msg->fd[0]); port->pair[0] = -1; - port->pair[1] = msg->fd; + port->pair[1] = msg->fd[0]; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; @@ -281,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->u.new_port = port; } - +/* TODO move to nxt_main_process.c */ void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -306,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); + if (msg->fd[0] != -1) { + port->queue_fd = msg->fd[0]; + port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, + msg->fd[0], 0); + } + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); } @@ -318,7 +338,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - if (nxt_slow_path(msg->fd == -1)) { + if (nxt_slow_path(msg->fd[0] == -1)) { nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); return; @@ -332,11 +352,11 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto fail_close; } - nxt_port_incoming_port_mmap(task, process, msg->fd); + nxt_port_incoming_port_mmap(task, process, msg->fd[0]); fail_close: - nxt_fd_close(msg->fd); + nxt_fd_close(msg->fd[0]); } @@ -389,14 +409,14 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) log_file = nxt_list_elt(rt->log_files, slot); - nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); + nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd); /* * The old log file descriptor must be closed at the moment when no * other threads use it. dup2() allows to use the old file descriptor * for new log file. This change is performed atomically in the kernel. */ - if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { + if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) { if (slot == 0) { (void) nxt_file_stderr(log_file); } |