diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_cycle.c | 19 | ||||
-rw-r--r-- | src/nxt_cycle.h | 3 | ||||
-rw-r--r-- | src/nxt_main.h | 4 | ||||
-rw-r--r-- | src/nxt_master_process.c | 92 | ||||
-rw-r--r-- | src/nxt_port.c | 151 | ||||
-rw-r--r-- | src/nxt_port.h | 125 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 72 | ||||
-rw-r--r-- | src/nxt_port_socket.h | 73 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 10 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 20 |
10 files changed, 271 insertions, 298 deletions
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index 81ad8ce2..e95421d2 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -344,34 +344,33 @@ nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle) static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle) { - nxt_uint_t n; - nxt_process_port_t *proc, *prev; + nxt_uint_t n; + nxt_port_t *port, *prev; /* * Preallocate double number of previous cycle * process slots or 2 process slots for initial cycle. */ - n = (cycle->previous != NULL) ? cycle->previous->processes->nelts : 1; + n = (cycle->previous != NULL) ? cycle->previous->ports->nelts : 1; - cycle->processes = nxt_array_create(cycle->mem_pool, 2 * n, - sizeof(nxt_process_port_t)); + cycle->ports = nxt_array_create(cycle->mem_pool, 2 * n, sizeof(nxt_port_t)); - if (nxt_slow_path(cycle->processes == NULL)) { + if (nxt_slow_path(cycle->ports == NULL)) { return NXT_ERROR; } if (cycle->previous != NULL) { cycle->process_generation = cycle->previous->process_generation; - prev = cycle->previous->processes->elts; + prev = cycle->previous->ports->elts; while (n != 0) { - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { + port = nxt_array_add(cycle->ports); + if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } - *proc = *prev++; + *port = *prev++; n--; } } diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h index 6dbde173..088a91c3 100644 --- a/src/nxt_cycle.h +++ b/src/nxt_cycle.h @@ -16,7 +16,6 @@ typedef enum { } nxt_process_type_e; -typedef struct nxt_cycle_s nxt_cycle_t; typedef void (*nxt_cycle_cont_t)(nxt_task_t *task, nxt_cycle_t *cycle); @@ -47,7 +46,7 @@ struct nxt_cycle_s { nxt_cycle_cont_t continuation; #endif - nxt_array_t *processes; /* of nxt_process_port_t */ + nxt_array_t *ports; /* of nxt_port_t */ nxt_list_t *log_files; /* of nxt_file_t */ diff --git a/src/nxt_main.h b/src/nxt_main.h index b692dc92..1388a2f3 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -121,7 +121,8 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_fd_event.h> -#include <nxt_port_socket.h> +typedef struct nxt_cycle_s nxt_cycle_t; +#include <nxt_port.h> #if (NXT_THREADS) #include <nxt_thread_pool.h> #endif @@ -153,6 +154,7 @@ typedef struct nxt_upstream_source_s nxt_upstream_source_t; #include <nxt_http_parse.h> #include <nxt_http_source.h> #include <nxt_fastcgi_source.h> +#include <nxt_cycle.h> #if (NXT_LIB_UNIT_TEST) diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index ceb44e1f..1e613b45 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -71,26 +71,27 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_process_port_t *proc; + nxt_int_t ret; + nxt_port_t *port; - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { + port = nxt_array_zero_add(cycle->ports); + if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } - proc->pid = nxt_pid; - proc->engine = 0; - - proc->port = nxt_port_create(task, 0); - if (nxt_slow_path(proc->port == NULL)) { - return NXT_ERROR; + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; } + port->pid = nxt_pid; + port->engine = 0; + /* * A master process port. A write port is not closed * since it should be inherited by worker processes. */ - nxt_port_read_enable(task, proc->port); + nxt_port_read_enable(task, port); return NXT_OK; } @@ -143,24 +144,25 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_pid_t pid; - nxt_process_port_t *proc; + nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { + port = nxt_array_zero_add(cycle->ports); + if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } - cycle->current_process = cycle->processes->nelts - 1; + cycle->current_process = cycle->ports->nelts - 1; - proc->engine = 0; - proc->generation = cycle->process_generation; - - proc->port = nxt_port_create(task, 0); - if (nxt_slow_path(proc->port == NULL)) { - return NXT_ERROR; + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; } + port->engine = 0; + port->generation = cycle->process_generation; + pid = nxt_process_create(nxt_worker_process_start, cycle, "start worker process"); @@ -175,12 +177,12 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) default: /* The master process created a new process. */ - proc->pid = pid; + port->pid = pid; - nxt_port_read_close(proc->port); - nxt_port_write_enable(task, proc->port); + nxt_port_read_close(port); + nxt_port_write_enable(task, port); - nxt_process_new_port(task, cycle, proc); + nxt_port_send_new_port(task, cycle, port); return NXT_OK; } } @@ -250,24 +252,24 @@ static void nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, void *data) { - uint32_t generation; - nxt_uint_t i, n; - nxt_cycle_t *cycle; - nxt_process_port_t *proc; + uint32_t generation; + nxt_uint_t i, n; + nxt_port_t *port; + nxt_cycle_t *cycle; cycle = nxt_thread_cycle(); - proc = cycle->processes->elts; - n = cycle->processes->nelts; + port = cycle->ports->elts; + n = cycle->ports->nelts; generation = cycle->process_generation - 1; - /* The proc[0] is the master process. */ + /* The port[0] is the master process port. */ for (i = 1; i < n; i++) { - if (proc[i].generation == generation) { - (void) nxt_port_write(task, proc[i].port, NXT_PORT_MSG_QUIT, - -1, 0, NULL); + if (port[i].generation == generation) { + (void) nxt_port_socket_write(task, &port[i], + NXT_PORT_MSG_QUIT, -1, 0, NULL); } } @@ -278,7 +280,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_process_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL); + nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL); } @@ -366,7 +368,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_list_each(file, cycle->log_files) { - nxt_process_port_change_log_file(task, cycle, n, new_file[n].fd); + nxt_port_change_log_file(task, cycle, n, new_file[n].fd); /* * The old log file descriptor must be closed at the moment * when no other threads use it. dup2() allows to use the @@ -617,9 +619,9 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_uint_t i, n, generation; - nxt_cycle_t *cycle; - nxt_process_port_t *proc; + nxt_uint_t i, n, generation; + nxt_port_t *port; + nxt_cycle_t *cycle; cycle = nxt_thread_cycle(); @@ -630,15 +632,15 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) return; } - proc = cycle->processes->elts; - n = cycle->processes->nelts; + port = cycle->ports->elts; + n = cycle->ports->nelts; for (i = 0; i < n; i++) { - if (pid == proc[i].pid) { - generation = proc[i].generation; + if (pid == port[i].pid) { + generation = port[i].generation; - nxt_array_remove(cycle->processes, &proc[i]); + nxt_array_remove(cycle->ports, &port[i]); if (nxt_exiting) { nxt_debug(task, "processes %d", n); diff --git a/src/nxt_port.c b/src/nxt_port.c index 7da2bcc4..8a56fc6b 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -9,48 +9,47 @@ #include <nxt_port.h> -static void nxt_process_port_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); -static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, +static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data); void -nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, - nxt_process_port_handler_t *handlers) +nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, + nxt_port_handler_t *handlers) { - proc->pid = nxt_pid; - proc->engine = thr->engine->id; - proc->port->handler = nxt_process_port_handler; - proc->port->data = handlers; + port->pid = nxt_pid; + port->engine = thread->engine->id; + port->handler = nxt_port_handler; + port->data = handlers; - nxt_port_write_close(proc->port); - nxt_port_read_enable(&thr->engine->task, proc->port); + nxt_port_write_close(port); + nxt_port_read_enable(&thread->engine->task, port); } void -nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, +nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_uint_t i, n; - nxt_process_port_t *proc; + nxt_uint_t i, n; + nxt_port_t *port; - proc = cycle->processes->elts; - n = cycle->processes->nelts; + port = cycle->ports->elts; + n = cycle->ports->nelts; for (i = 0; i < n; i++) { - if (nxt_pid != proc[i].pid) { - (void) nxt_port_write(task, proc[i].port, type, fd, stream, b); + if (nxt_pid != port[i].pid) { + (void) nxt_port_socket_write(task, &port[i], type, fd, stream, b); } } } static void -nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_process_port_handler_t *handlers; + nxt_port_handler_t *handlers; if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { @@ -69,62 +68,64 @@ nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_cycle_quit(task, NULL); } void -nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_process_port_t *proc) +nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_port_t *new_port) { nxt_buf_t *b; nxt_uint_t i, n; - nxt_process_port_t *p; - nxt_proc_msg_new_port_t *new_port; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; - n = cycle->processes->nelts; + n = cycle->ports->nelts; if (n == 0) { return; } - nxt_thread_log_debug("new port %d for process %PI engine %uD", - proc->port->socket.fd, proc->pid, proc->engine); + nxt_debug(task, "new port %d for process %PI engine %uD", + new_port->socket.fd, new_port->pid, new_port->engine); - p = cycle->processes->elts; + port = cycle->ports->elts; for (i = 0; i < n; i++) { - if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) { + if (port[i].pid == new_port->pid + || port[i].pid == nxt_pid + || port[i].engine != 0) + { continue; } - b = nxt_buf_mem_alloc(p[i].port->mem_pool, - sizeof(nxt_process_port_data_t), 0); + b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - b->data = p[i].port; - b->completion_handler = nxt_process_new_port_buf_completion; - b->mem.free += sizeof(nxt_proc_msg_new_port_t); - new_port = (nxt_proc_msg_new_port_t *) b->mem.pos; + b->data = &port[i]; + b->completion_handler = nxt_port_new_port_buf_completion; + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; - new_port->pid = proc->pid; - new_port->engine = proc->engine; - new_port->max_size = p[i].port->max_size; - new_port->max_share = p[i].port->max_share; + msg->pid = new_port->pid; + msg->engine = new_port->engine; + msg->max_size = port[i].max_size; + msg->max_share = port[i].max_share; - (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT, - proc->port->socket.fd, 0, b); + (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT, + new_port->socket.fd, 0, b); } } static void -nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) +nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_port_t *port; @@ -139,66 +140,69 @@ nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) void -nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; nxt_cycle_t *cycle; - nxt_process_port_t *proc; - nxt_proc_msg_new_port_t *new_port; + nxt_mem_pool_t *mp; + nxt_port_msg_new_port_t *new_port_msg; cycle = nxt_thread_cycle(); - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { + port = nxt_array_add(cycle->ports); + if (nxt_slow_path(port == NULL)) { return; } - port = nxt_port_alloc(task); - if (nxt_slow_path(port == NULL)) { + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { return; } - proc->port = port; + port->mem_pool = mp; - new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos; + new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; msg->buf->mem.pos = msg->buf->mem.free; nxt_debug(task, "new port %d received for process %PI engine %uD", - msg->fd, new_port->pid, new_port->engine); + msg->fd, new_port_msg->pid, new_port_msg->engine); - proc->pid = new_port->pid; - proc->engine = new_port->engine; + port->pid = new_port_msg->pid; + port->engine = new_port_msg->engine; + port->pair[0] = -1; port->pair[1] = msg->fd; - port->max_size = new_port->max_size; - port->max_share = new_port->max_share; + port->max_size = new_port_msg->max_size; + port->max_share = new_port_msg->max_share; + + nxt_queue_init(&port->messages); + + port->socket.task = task; - /* A read port is not passed at all. */ nxt_port_write_enable(task, port); } void -nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, +nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t slot, nxt_fd_t fd) { - nxt_buf_t *b; - nxt_uint_t i, n; - nxt_process_port_t *p; + nxt_buf_t *b; + nxt_uint_t i, n; + nxt_port_t *port; - n = cycle->processes->nelts; + n = cycle->ports->nelts; if (n == 0) { return; } - nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd); + nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); - p = cycle->processes->elts; + port = cycle->ports->elts; - /* p[0] is master process. */ + /* port[0] is master process port. */ for (i = 1; i < n; i++) { - b = nxt_buf_mem_alloc(p[i].port->mem_pool, - sizeof(nxt_process_port_data_t), 0); + b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; @@ -207,15 +211,14 @@ nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, *(nxt_uint_t *) b->mem.pos = slot; b->mem.free += sizeof(nxt_uint_t); - (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE, - fd, 0, b); + (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE, + fd, 0, b); } } void -nxt_process_port_change_log_file_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg) +nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b; nxt_uint_t slot; @@ -246,7 +249,7 @@ nxt_process_port_change_log_file_handler(nxt_task_t *task, void -nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b; @@ -259,7 +262,7 @@ nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "port empty handler"); } diff --git a/src/nxt_port.h b/src/nxt_port.h index b195bcd7..c27b1bc4 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -8,61 +8,116 @@ #define _NXT_PORT_H_INCLUDED_ +typedef struct nxt_port_s nxt_port_t; + + +typedef struct { + uint32_t stream; + + uint16_t type; + uint8_t last; /* 1 bit */ +} nxt_port_msg_t; + + +typedef struct { + nxt_queue_link_t link; + nxt_buf_t *buf; + size_t share; + nxt_fd_t fd; + nxt_port_msg_t port_msg; +} nxt_port_send_msg_t; + + +typedef struct nxt_port_recv_msg_s { + uint32_t stream; + uint16_t type; + + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_port_t *port; +} nxt_port_recv_msg_t; + + +typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); + + +struct nxt_port_s { + /* Must be the first field. */ + nxt_fd_event_t socket; + + nxt_queue_t messages; /* of nxt_port_send_msg_t */ + + /* Maximum size of message part. */ + uint32_t max_size; + /* Maximum interleave of message parts. */ + uint32_t max_share; + + nxt_port_handler_t handler; + void *data; + + nxt_mem_pool_t *mem_pool; + nxt_buf_t *free_bufs; + nxt_socket_t pair[2]; + + nxt_pid_t pid; + uint32_t engine; + uint32_t generation; +}; + + #define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA typedef enum { NXT_PORT_MSG_QUIT = 0, NXT_PORT_MSG_NEW_PORT, - NXT_PORT_MSG_PORTGE_FILE, + NXT_PORT_MSG_CHANGE_FILE, NXT_PORT_MSG_DATA, } nxt_port_msg_type_e; typedef struct { - nxt_pid_t pid; - uint32_t engine; - uint32_t generation; - nxt_port_t *port; -} nxt_process_port_t; - - -typedef struct { - nxt_pid_t pid; - uint32_t engine; - size_t max_size; - size_t max_share; -} nxt_proc_msg_new_port_t; + nxt_pid_t pid; + uint32_t engine; + size_t max_size; + size_t max_share; +} nxt_port_msg_new_port_t; /* - * nxt_process_port_data_t is allocaiton size - * enabling effective reuse of memory pool cache. + * nxt_port_data_t size is allocation size + * which enables effective reuse of memory pool cache. */ typedef union { nxt_buf_t buf; - nxt_proc_msg_new_port_t new_port; -} nxt_process_port_data_t; - - -typedef void (*nxt_process_port_handler_t)(nxt_task_t *task, - nxt_port_recv_msg_t *msg); - - -void nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, - nxt_process_port_handler_t *handlers); -void nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_port_msg_new_port_t new_port; +} nxt_port_data_t; + + +nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, + size_t max_size); +void nxt_port_destroy(nxt_port_t *port); +void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); +void nxt_port_write_close(nxt_port_t *port); +void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); +void nxt_port_read_close(nxt_port_t *port); +nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); -void nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_process_port_t *proc); -void nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, + +void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, + nxt_port_handler_t *handlers); +void nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); +void nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_port_t *port); +void nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t slot, nxt_fd_t fd); -void nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_process_port_change_log_file_handler(nxt_task_t *task, +void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); #endif /* _NXT_PORT_H_INCLUDED_ */ diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index a3bc7f26..671c3334 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -16,44 +16,26 @@ static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); -nxt_port_t * -nxt_port_alloc(nxt_task_t *task) +nxt_int_t +nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) { - nxt_port_t *port; + nxt_int_t sndbuf, rcvbuf, size; + nxt_socket_t snd, rcv; nxt_mem_pool_t *mp; - mp = nxt_mem_pool_create(1024); - - if (nxt_fast_path(mp != NULL)) { - /* This allocation cannot fail. */ - port = nxt_mem_zalloc(mp, sizeof(nxt_port_t)); - port->mem_pool = mp; - - port->socket.task = task; + port->socket.task = task; - port->pair[0] = -1; - port->pair[1] = -1; + port->pair[0] = -1; + port->pair[1] = -1; - nxt_queue_init(&port->messages); + nxt_queue_init(&port->messages); - return port; + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; } - return NULL; -} - - -nxt_port_t * -nxt_port_create(nxt_task_t *task, size_t max_size) -{ - nxt_int_t sndbuf, rcvbuf, size; - nxt_port_t *port; - nxt_socket_t snd, rcv; - - port = nxt_port_alloc(task); - if (nxt_slow_path(port == NULL)) { - return NULL; - } + port->mem_pool = mp; if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { goto socketpair_fail; @@ -109,7 +91,7 @@ nxt_port_create(nxt_task_t *task, size_t max_size) port->max_size = nxt_min(max_size, (size_t) sndbuf); port->max_share = (64 * 1024); - return port; + return NXT_OK; getsockopt_fail: @@ -120,7 +102,7 @@ socketpair_fail: nxt_mem_pool_destroy(port->mem_pool); - return NULL; + return NXT_ERROR; } @@ -154,7 +136,7 @@ nxt_port_write_close(nxt_port_t *port) nxt_int_t -nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, +nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { nxt_queue_link_t *link; @@ -204,9 +186,9 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; - nxt_uint_t niob; + nxt_uint_t niov; nxt_port_t *port; - struct iovec iob[NXT_IOBUF_MAX]; + struct iovec iov[NXT_IOBUF_MAX]; nxt_queue_link_t *link; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; @@ -223,21 +205,22 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg = (nxt_port_send_msg_t *) link; - nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t)); + iov[0].iov_base = &msg->port_msg; + iov[0].iov_len = sizeof(nxt_port_msg_t); sb.buf = msg->buf; - sb.iobuf = &iob[1]; + sb.iobuf = &iov[1]; sb.nmax = NXT_IOBUF_MAX - 1; sb.sync = 0; sb.last = 0; sb.size = sizeof(nxt_port_msg_t); sb.limit = port->max_size; - niob = nxt_sendbuf_mem_coalesce(task, &sb); + niov = nxt_sendbuf_mem_coalesce(task, &sb); msg->port_msg.last = sb.last; - n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1); + n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1); if (n > 0) { if (nxt_slow_path((size_t) n != sb.size)) { @@ -322,7 +305,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) nxt_fd_t fd; nxt_buf_t *b; nxt_port_t *port; - nxt_iobuf_t iob[2]; + struct iovec iov[2]; nxt_port_msg_t msg; port = obj; @@ -335,10 +318,13 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) /* TODO: disable event for some time */ } - nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t)); - nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size); + iov[0].iov_base = &msg; + iov[0].iov_len = sizeof(nxt_port_msg_t); + + iov[1].iov_base = b->mem.pos; + iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &fd, iob, 2); + n = nxt_socketpair_recv(&port->socket, &fd, iov, 2); if (n > 0) { nxt_port_read_msg_process(task, port, &msg, fd, b, n); diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h deleted file mode 100644 index 4740368d..00000000 --- a/src/nxt_port_socket.h +++ /dev/null @@ -1,73 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_PORT_SOCKET_H_INCLUDED_ -#define _NXT_PORT_SOCKET_H_INCLUDED_ - - -typedef struct { - uint32_t stream; - - uint16_t type; - uint8_t last; /* 1 bit */ -} nxt_port_msg_t; - - -typedef struct { - nxt_queue_link_t link; - nxt_buf_t *buf; - size_t share; - nxt_fd_t fd; - nxt_port_msg_t port_msg; -} nxt_port_send_msg_t; - - -typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; -typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); - - -typedef struct { - /* Must be the first field. */ - nxt_fd_event_t socket; - - nxt_queue_t messages; /* of nxt_port_send_msg_t */ - - /* Maximum size of message part. */ - uint32_t max_size; - /* Maximum interleave of message parts. */ - uint32_t max_share; - - nxt_port_handler_t handler; - void *data; - - nxt_mem_pool_t *mem_pool; - nxt_buf_t *free_bufs; - nxt_socket_t pair[2]; -} nxt_port_t; - - -struct nxt_port_recv_msg_s { - uint32_t stream; - uint16_t type; - - nxt_fd_t fd; - nxt_buf_t *buf; - nxt_port_t *port; -}; - - -NXT_EXPORT nxt_port_t *nxt_port_alloc(nxt_task_t *task); -NXT_EXPORT nxt_port_t *nxt_port_create(nxt_task_t *task, size_t bufsize); -NXT_EXPORT void nxt_port_destroy(nxt_port_t *port); -NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); -NXT_EXPORT void nxt_port_write_close(nxt_port_t *port); -NXT_EXPORT void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); -NXT_EXPORT void nxt_port_read_close(nxt_port_t *port); -NXT_EXPORT nxt_int_t nxt_port_write(nxt_task_t *task, nxt_port_t *port, - nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); - - -#endif /* _NXT_PORT_SOCKET_H_INCLUDED_ */ diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 21a25e68..62042504 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -122,15 +122,15 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) goto done; } - nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size); + sb->iobuf[n].iov_base = b->mem.pos; + sb->iobuf[n].iov_len = size; } else { - nxt_iobuf_add(&sb->iobuf[n], size); + sb->iobuf[n].iov_len += size; } - nxt_debug(task, "sendbuf: %ui, %p, %uz", n, - nxt_iobuf_data(&sb->iobuf[n]), - nxt_iobuf_size(&sb->iobuf[n])); + nxt_debug(task, "sendbuf: %ui, %p, %uz", + n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len); total += size; last = b->mem.pos + size; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 01304f65..a6ef8e80 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -21,11 +21,11 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -static nxt_process_port_handler_t nxt_worker_process_port_handlers[] = { +static nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_worker_process_quit_handler, - nxt_process_port_new_handler, - nxt_process_port_change_log_file_handler, - nxt_process_port_data_handler, + nxt_port_new_port_handler, + nxt_port_change_log_file_handler, + nxt_port_data_handler, }; @@ -45,9 +45,9 @@ void nxt_worker_process_start(void *data) { nxt_int_t n; + nxt_port_t *port; nxt_cycle_t *cycle; nxt_thread_t *thr; - nxt_process_port_t *proc; const nxt_event_interface_t *interface; cycle = data; @@ -90,15 +90,15 @@ nxt_worker_process_start(void *data) goto fail; } - proc = cycle->processes->elts; + port = cycle->ports->elts; /* A master process port. */ - nxt_port_read_close(proc[0].port); - nxt_port_write_enable(&nxt_main_task, proc[0].port); + nxt_port_read_close(&port[0]); + nxt_port_write_enable(&nxt_main_task, &port[0]); /* A worker process port. */ - nxt_process_port_create(thr, &proc[cycle->current_process], - nxt_worker_process_port_handlers); + nxt_port_create(thr, &port[cycle->current_process], + nxt_worker_process_port_handlers); #if (NXT_THREADS) { |