diff options
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r-- | src/nxt_port.c | 127 |
1 files changed, 77 insertions, 50 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 1da16587..fc807d1f 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -32,25 +32,21 @@ void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_uint_t i, n, nprocesses, nports; nxt_port_t *port; nxt_process_t *process; - process = rt->processes->elts; - nprocesses = rt->processes->nelts; + nxt_runtime_process_each(rt, process) + { + if (nxt_pid != process->pid) { + nxt_process_port_each(process, port) { - for (i = 0; i < nprocesses; i++) { + (void) nxt_port_socket_write(task, port, type, + fd, stream, 0, b); - if (nxt_pid != process[i].pid) { - port = process[i].ports->elts; - nports = process[i].ports->nelts; - - for (n = 0; n < nports; n++) { - (void) nxt_port_socket_write(task, &port[n], type, - fd, stream, b); - } + } nxt_process_port_loop; } } + nxt_runtime_process_loop; } @@ -59,19 +55,19 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_handler_t *handlers; - if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { + if (nxt_fast_path(msg->port_msg.type <= NXT_PORT_MSG_MAX)) { nxt_debug(task, "port %d: message type:%uD", - msg->port->socket.fd, msg->type); + msg->port->socket.fd, msg->port_msg.type); handlers = msg->port->data; - handlers[msg->type](task, msg); + handlers[msg->port_msg.type](task, msg); return; } nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", - msg->port->socket.fd, msg->type); + msg->port->socket.fd, msg->port_msg.type); } @@ -87,28 +83,20 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port) { nxt_buf_t *b; - nxt_uint_t i, n; nxt_port_t *port; nxt_process_t *process; nxt_port_msg_new_port_t *msg; - n = rt->processes->nelts; - if (n == 0) { - return; - } - nxt_debug(task, "new port %d for process %PI engine %uD", new_port->socket.fd, new_port->pid, new_port->engine); - process = rt->processes->elts; - - for (i = 0; i < n; i++) { - - if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) { + nxt_runtime_process_each(rt, process) + { + if (process->pid == new_port->pid || process->pid == nxt_pid) { continue; } - port = process[i].ports->elts; + port = nxt_process_port_first(process); b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); @@ -116,19 +104,25 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, continue; } + nxt_debug(task, "send new port %FD to process %PI", + new_port->socket.fd, process->pid); + b->data = port; b->completion_handler = nxt_port_new_port_buf_completion; b->mem.free += sizeof(nxt_port_msg_new_port_t); msg = (nxt_port_msg_new_port_t *) b->mem.pos; + msg->id = new_port->id; msg->pid = new_port->pid; msg->engine = new_port->engine; msg->max_size = port->max_size; msg->max_share = port->max_share; + msg->type = new_port->type; (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->socket.fd, 0, b); + new_port->socket.fd, 0, 0, b); } + nxt_runtime_process_loop; } @@ -158,12 +152,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_new_process(rt); + new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; + msg->buf->mem.pos = msg->buf->mem.free; + + process = nxt_runtime_process_get(rt, new_port_msg->pid); if (nxt_slow_path(process == NULL)) { return; } - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return; } @@ -175,51 +172,74 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->mem_pool = mp; - new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; - msg->buf->mem.pos = msg->buf->mem.free; - nxt_debug(task, "new port %d received for process %PI engine %uD", msg->fd, new_port_msg->pid, new_port_msg->engine); - process->pid = new_port_msg->pid; - - port->pid = new_port_msg->pid; + port->id = new_port_msg->id; port->engine = new_port_msg->engine; port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; + port->type = new_port_msg->type; nxt_queue_init(&port->messages); port->socket.task = task; + nxt_runtime_port_add(rt, port); + nxt_port_write_enable(task, port); } void +nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + + rt = task->thread->runtime; + + if (nxt_slow_path(msg->fd == -1)) { + nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); + + return; + } + + process = nxt_runtime_process_get(rt, msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", + msg->port_msg.pid); + + goto fail_close; + } + + nxt_port_incoming_port_mmap(task, process, msg->fd); + +fail_close: + + close(msg->fd); +} + + +void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd) { nxt_buf_t *b; - nxt_uint_t i, n; nxt_port_t *port; nxt_process_t *process; - n = rt->processes->nelts; - if (n == 0) { - return; - } - nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); - process = rt->processes->elts; - - /* process[0] is master process. */ + nxt_runtime_process_each(rt, process) + { + if (nxt_pid == process->pid) { + continue; + } - for (i = 1; i < n; i++) { - port = process[i].ports->elts; + port = nxt_process_port_first(process); b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { @@ -230,8 +250,9 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, b->mem.free += sizeof(nxt_uint_t); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, - fd, 0, b); + fd, 0, 0, b); } + nxt_runtime_process_loop; } @@ -269,11 +290,17 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + size_t dump_size; nxt_buf_t *b; b = msg->buf; + dump_size = b->mem.free - b->mem.pos; + + if (dump_size > 300) { + dump_size = 300; + } - nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); + nxt_debug(task, "data: %*s", dump_size, b->mem.pos); b->mem.pos = b->mem.free; } |