diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:10:28 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:10:28 +0300 |
commit | b379dae85e6edcb4af1ac665ab66d99e63bf34f4 (patch) | |
tree | 84b1d69d355e4722ebee02748e512306913a92d7 /src/nxt_port.c | |
parent | 029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (diff) | |
download | unit-b379dae85e6edcb4af1ac665ab66d99e63bf34f4.tar.gz unit-b379dae85e6edcb4af1ac665ab66d99e63bf34f4.tar.bz2 |
Port changes.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port.c | 151 |
1 files changed, 77 insertions, 74 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 7da2bcc4..8a56fc6b 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -9,48 +9,47 @@ #include <nxt_port.h> -static void nxt_process_port_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); -static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, +static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data); void -nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, - nxt_process_port_handler_t *handlers) +nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, + nxt_port_handler_t *handlers) { - proc->pid = nxt_pid; - proc->engine = thr->engine->id; - proc->port->handler = nxt_process_port_handler; - proc->port->data = handlers; + port->pid = nxt_pid; + port->engine = thread->engine->id; + port->handler = nxt_port_handler; + port->data = handlers; - nxt_port_write_close(proc->port); - nxt_port_read_enable(&thr->engine->task, proc->port); + nxt_port_write_close(port); + nxt_port_read_enable(&thread->engine->task, port); } void -nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, +nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_uint_t i, n; - nxt_process_port_t *proc; + nxt_uint_t i, n; + nxt_port_t *port; - proc = cycle->processes->elts; - n = cycle->processes->nelts; + port = cycle->ports->elts; + n = cycle->ports->nelts; for (i = 0; i < n; i++) { - if (nxt_pid != proc[i].pid) { - (void) nxt_port_write(task, proc[i].port, type, fd, stream, b); + if (nxt_pid != port[i].pid) { + (void) nxt_port_socket_write(task, &port[i], type, fd, stream, b); } } } static void -nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_process_port_handler_t *handlers; + nxt_port_handler_t *handlers; if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { @@ -69,62 +68,64 @@ nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_cycle_quit(task, NULL); } void -nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_process_port_t *proc) +nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_port_t *new_port) { nxt_buf_t *b; nxt_uint_t i, n; - nxt_process_port_t *p; - nxt_proc_msg_new_port_t *new_port; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; - n = cycle->processes->nelts; + n = cycle->ports->nelts; if (n == 0) { return; } - nxt_thread_log_debug("new port %d for process %PI engine %uD", - proc->port->socket.fd, proc->pid, proc->engine); + nxt_debug(task, "new port %d for process %PI engine %uD", + new_port->socket.fd, new_port->pid, new_port->engine); - p = cycle->processes->elts; + port = cycle->ports->elts; for (i = 0; i < n; i++) { - if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) { + if (port[i].pid == new_port->pid + || port[i].pid == nxt_pid + || port[i].engine != 0) + { continue; } - b = nxt_buf_mem_alloc(p[i].port->mem_pool, - sizeof(nxt_process_port_data_t), 0); + b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - b->data = p[i].port; - b->completion_handler = nxt_process_new_port_buf_completion; - b->mem.free += sizeof(nxt_proc_msg_new_port_t); - new_port = (nxt_proc_msg_new_port_t *) b->mem.pos; + b->data = &port[i]; + b->completion_handler = nxt_port_new_port_buf_completion; + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; - new_port->pid = proc->pid; - new_port->engine = proc->engine; - new_port->max_size = p[i].port->max_size; - new_port->max_share = p[i].port->max_share; + msg->pid = new_port->pid; + msg->engine = new_port->engine; + msg->max_size = port[i].max_size; + msg->max_share = port[i].max_share; - (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT, - proc->port->socket.fd, 0, b); + (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT, + new_port->socket.fd, 0, b); } } static void -nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) +nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_port_t *port; @@ -139,66 +140,69 @@ nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) void -nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; nxt_cycle_t *cycle; - nxt_process_port_t *proc; - nxt_proc_msg_new_port_t *new_port; + nxt_mem_pool_t *mp; + nxt_port_msg_new_port_t *new_port_msg; cycle = nxt_thread_cycle(); - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { + port = nxt_array_add(cycle->ports); + if (nxt_slow_path(port == NULL)) { return; } - port = nxt_port_alloc(task); - if (nxt_slow_path(port == NULL)) { + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { return; } - proc->port = port; + port->mem_pool = mp; - new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos; + new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; msg->buf->mem.pos = msg->buf->mem.free; nxt_debug(task, "new port %d received for process %PI engine %uD", - msg->fd, new_port->pid, new_port->engine); + msg->fd, new_port_msg->pid, new_port_msg->engine); - proc->pid = new_port->pid; - proc->engine = new_port->engine; + port->pid = new_port_msg->pid; + port->engine = new_port_msg->engine; + port->pair[0] = -1; port->pair[1] = msg->fd; - port->max_size = new_port->max_size; - port->max_share = new_port->max_share; + port->max_size = new_port_msg->max_size; + port->max_share = new_port_msg->max_share; + + nxt_queue_init(&port->messages); + + port->socket.task = task; - /* A read port is not passed at all. */ nxt_port_write_enable(task, port); } void -nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, +nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t slot, nxt_fd_t fd) { - nxt_buf_t *b; - nxt_uint_t i, n; - nxt_process_port_t *p; + nxt_buf_t *b; + nxt_uint_t i, n; + nxt_port_t *port; - n = cycle->processes->nelts; + n = cycle->ports->nelts; if (n == 0) { return; } - nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd); + nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); - p = cycle->processes->elts; + port = cycle->ports->elts; - /* p[0] is master process. */ + /* port[0] is master process port. */ for (i = 1; i < n; i++) { - b = nxt_buf_mem_alloc(p[i].port->mem_pool, - sizeof(nxt_process_port_data_t), 0); + b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; @@ -207,15 +211,14 @@ nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, *(nxt_uint_t *) b->mem.pos = slot; b->mem.free += sizeof(nxt_uint_t); - (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE, - fd, 0, b); + (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE, + fd, 0, b); } } void -nxt_process_port_change_log_file_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg) +nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b; nxt_uint_t slot; @@ -246,7 +249,7 @@ nxt_process_port_change_log_file_handler(nxt_task_t *task, void -nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b; @@ -259,7 +262,7 @@ nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "port empty handler"); } |