diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-01 20:03:45 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-01 20:03:45 +0300 |
commit | e57b95a92333fa7ff558737b0ba2b76894cc0412 (patch) | |
tree | b2de846a4fd958517be379b552f38652dd6d9078 /src | |
parent | 6e67bee0f4f96ad0a7d1a231dfdae8431714c66a (diff) | |
download | unit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.gz unit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.bz2 |
Process channels have been renamed to ports.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_chan.c | 464 | ||||
-rw-r--r-- | src/nxt_chan.h | 75 | ||||
-rw-r--r-- | src/nxt_cycle.c | 6 | ||||
-rw-r--r-- | src/nxt_cycle.h | 2 | ||||
-rw-r--r-- | src/nxt_main.h | 2 | ||||
-rw-r--r-- | src/nxt_master_process.c | 40 | ||||
-rw-r--r-- | src/nxt_port.c | 265 | ||||
-rw-r--r-- | src/nxt_port.h | 68 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 464 | ||||
-rw-r--r-- | src/nxt_port_socket.h | 75 | ||||
-rw-r--r-- | src/nxt_process_chan.c | 265 | ||||
-rw-r--r-- | src/nxt_process_chan.h | 68 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 28 |
13 files changed, 911 insertions, 911 deletions
diff --git a/src/nxt_chan.c b/src/nxt_chan.c deleted file mode 100644 index 6832ecdc..00000000 --- a/src/nxt_chan.c +++ /dev/null @@ -1,464 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, - nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); -static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan); -static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b); -static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data); - - -nxt_chan_t * -nxt_chan_alloc(void) -{ - nxt_chan_t *chan; - nxt_mem_pool_t *mp; - - mp = nxt_mem_pool_create(1024); - - if (nxt_fast_path(mp != NULL)) { - /* This allocation cannot fail. */ - chan = nxt_mem_zalloc(mp, sizeof(nxt_chan_t)); - chan->mem_pool = mp; - - chan->pair[0] = -1; - chan->pair[1] = -1; - - nxt_queue_init(&chan->messages); - - return chan; - } - - return NULL; -} - - -nxt_chan_t * -nxt_chan_create(size_t max_size) -{ - nxt_int_t sndbuf, rcvbuf, size; - nxt_chan_t *chan; - nxt_socket_t snd, rcv; - - chan = nxt_chan_alloc(); - if (nxt_slow_path(chan == NULL)) { - return NULL; - } - - if (nxt_slow_path(nxt_socketpair_create(chan->pair) != NXT_OK)) { - goto socketpair_fail; - } - - snd = chan->pair[1]; - - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); - if (nxt_slow_path(sndbuf < 0)) { - goto getsockopt_fail; - } - - rcv = chan->pair[0]; - - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); - if (nxt_slow_path(rcvbuf < 0)) { - goto getsockopt_fail; - } - - if (max_size == 0) { - max_size = 16 * 1024; - } - - if ((size_t) sndbuf < max_size) { - /* - * On Unix domain sockets - * Linux uses 224K on both send and receive directions; - * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size - * on send direction and 4K buffer size on receive direction; - * Solaris uses 16K on send direction and 5K on receive direction. - */ - (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size); - - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); - if (nxt_slow_path(sndbuf < 0)) { - goto getsockopt_fail; - } - - size = sndbuf * 4; - - if (rcvbuf < size) { - (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size); - - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); - if (nxt_slow_path(rcvbuf < 0)) { - goto getsockopt_fail; - } - } - } - - chan->max_size = nxt_min(max_size, (size_t) sndbuf); - chan->max_share = (64 * 1024); - - return chan; - -getsockopt_fail: - - nxt_socket_close(chan->pair[0]); - nxt_socket_close(chan->pair[1]); - -socketpair_fail: - - nxt_mem_pool_destroy(chan->mem_pool); - - return NULL; -} - - -void -nxt_chan_destroy(nxt_chan_t *chan) -{ - nxt_socket_close(chan->socket.fd); - nxt_mem_pool_destroy(chan->mem_pool); -} - - -void -nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan) -{ - chan->socket.fd = chan->pair[1]; - chan->socket.log = &nxt_main_log; - chan->socket.write_ready = 1; - - chan->task.thread = task->thread; - chan->task.log = chan->socket.log; - chan->task.ident = nxt_task_next_ident(); - - chan->socket.task = &chan->task; - - chan->socket.write_work_queue = &task->thread->engine->fast_work_queue; - chan->socket.write_handler = nxt_chan_write_handler; - chan->socket.error_handler = nxt_chan_error_handler; -} - - -void -nxt_chan_write_close(nxt_chan_t *chan) -{ - nxt_socket_close(chan->pair[1]); - chan->pair[1] = -1; -} - - -nxt_int_t -nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) -{ - nxt_queue_link_t *link; - nxt_chan_send_msg_t *msg; - - for (link = nxt_queue_first(&chan->messages); - link != nxt_queue_tail(&chan->messages); - link = nxt_queue_next(link)) - { - msg = (nxt_chan_send_msg_t *) link; - - if (msg->chan_msg.stream == stream) { - /* - * An fd is ignored since a file descriptor - * must be sent only in the first message of a stream. - */ - nxt_buf_chain_add(&msg->buf, b); - - return NXT_OK; - } - } - - msg = nxt_mem_cache_zalloc0(chan->mem_pool, sizeof(nxt_chan_send_msg_t)); - if (nxt_slow_path(msg == NULL)) { - return NXT_ERROR; - } - - msg->buf = b; - msg->fd = fd; - msg->share = 0; - - msg->chan_msg.stream = stream; - msg->chan_msg.type = type; - msg->chan_msg.last = 0; - - nxt_queue_insert_tail(&chan->messages, &msg->link); - - if (chan->socket.write_ready) { - nxt_chan_write_handler(task, chan, NULL); - } - - return NXT_OK; -} - - -static void -nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data) -{ - ssize_t n; - nxt_uint_t niob; - nxt_chan_t *chan; - struct iovec iob[NXT_IOBUF_MAX]; - nxt_queue_link_t *link; - nxt_chan_send_msg_t *msg; - nxt_sendbuf_coalesce_t sb; - - chan = obj; - - do { - link = nxt_queue_first(&chan->messages); - - if (link == nxt_queue_tail(&chan->messages)) { - nxt_event_fd_block_write(task->thread->engine, &chan->socket); - return; - } - - msg = (nxt_chan_send_msg_t *) link; - - nxt_iobuf_set(&iob[0], &msg->chan_msg, sizeof(nxt_chan_msg_t)); - - sb.buf = msg->buf; - sb.iobuf = &iob[1]; - sb.nmax = NXT_IOBUF_MAX - 1; - sb.sync = 0; - sb.last = 0; - sb.size = sizeof(nxt_chan_msg_t); - sb.limit = chan->max_size; - - niob = nxt_sendbuf_mem_coalesce(task, &sb); - - msg->chan_msg.last = sb.last; - - n = nxt_socketpair_send(&chan->socket, msg->fd, iob, niob + 1); - - if (n > 0) { - if (nxt_slow_path((size_t) n != sb.size)) { - nxt_log(task, NXT_LOG_CRIT, - "chan %d: short write: %z instead of %uz", - chan->socket.fd, n, sb.size); - goto fail; - } - - msg->buf = nxt_sendbuf_completion(task, - chan->socket.write_work_queue, - msg->buf, - n - sizeof(nxt_chan_msg_t)); - - if (msg->buf != NULL) { - /* - * A file descriptor is sent only - * in the first message of a stream. - */ - msg->fd = -1; - msg->share += n; - - if (msg->share >= chan->max_share) { - msg->share = 0; - nxt_queue_remove(link); - nxt_queue_insert_tail(&chan->messages, link); - } - - } else { - nxt_queue_remove(link); - nxt_mem_cache_free0(chan->mem_pool, msg, - sizeof(nxt_chan_send_msg_t)); - } - - } else if (nxt_slow_path(n == NXT_ERROR)) { - goto fail; - } - - /* n == NXT_AGAIN */ - - } while (chan->socket.write_ready); - - if (nxt_event_fd_is_disabled(chan->socket.write)) { - nxt_event_fd_enable_write(task->thread->engine, &chan->socket); - } - - return; - -fail: - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_chan_error_handler, task, &chan->socket, NULL); -} - - -void -nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan) -{ - chan->socket.fd = chan->pair[0]; - chan->socket.log = &nxt_main_log; - - chan->task.thread = task->thread; - chan->task.log = chan->socket.log; - chan->task.ident = nxt_task_next_ident(); - - chan->socket.task = &chan->task; - - chan->socket.read_work_queue = &task->thread->engine->fast_work_queue; - chan->socket.read_handler = nxt_chan_read_handler; - chan->socket.error_handler = nxt_chan_error_handler; - - nxt_event_fd_enable_read(task->thread->engine, &chan->socket); -} - - -void -nxt_chan_read_close(nxt_chan_t *chan) -{ - nxt_socket_close(chan->pair[0]); - chan->pair[0] = -1; -} - - -static void -nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data) -{ - ssize_t n; - nxt_fd_t fd; - nxt_buf_t *b; - nxt_chan_t *chan; - nxt_iobuf_t iob[2]; - nxt_chan_msg_t msg; - - chan = obj; - - for ( ;; ) { - - b = nxt_chan_buf_alloc(chan); - - if (nxt_slow_path(b == NULL)) { - /* TODO: disable event for some time */ - } - - nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_chan_msg_t)); - nxt_iobuf_set(&iob[1], b->mem.pos, chan->max_size); - - n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2); - - if (n > 0) { - nxt_chan_read_msg_process(task, chan, &msg, fd, b, n); - - if (b->mem.pos == b->mem.free) { - - if (b->next != NULL) { - /* A sync buffer */ - nxt_buf_free(chan->mem_pool, b->next); - } - - nxt_chan_buf_free(chan, b); - } - - if (chan->socket.read_ready) { - continue; - } - - return; - } - - if (n == NXT_AGAIN) { - nxt_chan_buf_free(chan, b); - - nxt_event_fd_enable_read(task->thread->engine, &chan->socket); - return; - } - - /* n == 0 || n == NXT_ERROR */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_chan_error_handler, task, &chan->socket, NULL); - return; - } -} - - -static void -nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, - nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) -{ - nxt_buf_t *sync; - nxt_chan_recv_msg_t recv_msg; - - if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) { - nxt_log(chan->socket.task, NXT_LOG_CRIT, - "chan %d: too small message:%uz", chan->socket.fd, size); - goto fail; - } - - recv_msg.stream = msg->stream; - recv_msg.type = msg->type; - recv_msg.fd = fd; - recv_msg.buf = b; - recv_msg.chan = chan; - - b->mem.free += size - sizeof(nxt_chan_msg_t); - - if (msg->last) { - sync = nxt_buf_sync_alloc(chan->mem_pool, NXT_BUF_SYNC_LAST); - if (nxt_slow_path(sync == NULL)) { - goto fail; - } - - b->next = sync; - } - - chan->handler(task, &recv_msg); - - return; - -fail: - - if (fd != -1) { - nxt_fd_close(fd); - } -} - - -static nxt_buf_t * -nxt_chan_buf_alloc(nxt_chan_t *chan) -{ - nxt_buf_t *b; - - if (chan->free_bufs != NULL) { - b = chan->free_bufs; - chan->free_bufs = b->next; - - b->mem.pos = b->mem.start; - b->mem.free = b->mem.start; - - } else { - b = nxt_buf_mem_alloc(chan->mem_pool, chan->max_size, 0); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - } - - return b; -} - - -static void -nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b) -{ - b->next = chan->free_bufs; - chan->free_bufs = b; -} - - -static void -nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data) -{ - /* TODO */ -} diff --git a/src/nxt_chan.h b/src/nxt_chan.h deleted file mode 100644 index 48cd1a9b..00000000 --- a/src/nxt_chan.h +++ /dev/null @@ -1,75 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_UNIX_CHAN_H_INCLUDED_ -#define _NXT_UNIX_CHAN_H_INCLUDED_ - - -typedef struct { - uint32_t stream; - - uint16_t type; - uint8_t last; /* 1 bit */ -} nxt_chan_msg_t; - - -typedef struct { - nxt_queue_link_t link; - nxt_buf_t *buf; - size_t share; - nxt_fd_t fd; - nxt_chan_msg_t chan_msg; -} nxt_chan_send_msg_t; - - -typedef struct nxt_chan_recv_msg_s nxt_chan_recv_msg_t; -typedef void (*nxt_chan_handler_t)(nxt_task_t *task, nxt_chan_recv_msg_t *msg); - - -typedef struct { - /* Must be the first field. */ - nxt_event_fd_t socket; - - nxt_task_t task; - - nxt_queue_t messages; /* of nxt_chan_send_msg_t */ - - /* Maximum size of message part. */ - uint32_t max_size; - /* Maximum interleave of message parts. */ - uint32_t max_share; - - nxt_chan_handler_t handler; - void *data; - - nxt_mem_pool_t *mem_pool; - nxt_buf_t *free_bufs; - nxt_socket_t pair[2]; -} nxt_chan_t; - - -struct nxt_chan_recv_msg_s { - uint32_t stream; - uint16_t type; - - nxt_fd_t fd; - nxt_buf_t *buf; - nxt_chan_t *chan; -}; - - -NXT_EXPORT nxt_chan_t *nxt_chan_alloc(void); -NXT_EXPORT nxt_chan_t *nxt_chan_create(size_t bufsize); -NXT_EXPORT void nxt_chan_destroy(nxt_chan_t *chan); -NXT_EXPORT void nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan); -NXT_EXPORT void nxt_chan_write_close(nxt_chan_t *chan); -NXT_EXPORT void nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan); -NXT_EXPORT void nxt_chan_read_close(nxt_chan_t *chan); -NXT_EXPORT nxt_int_t nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, - nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); - - -#endif /* _NXT_UNIX_CHAN_H_INCLUDED_ */ diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index c18089a4..447f0b37 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -7,7 +7,7 @@ #include <nxt_main.h> #include <nxt_cycle.h> -#include <nxt_process_chan.h> +#include <nxt_port.h> #include <nxt_master_process.h> @@ -344,7 +344,7 @@ static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle) { nxt_uint_t n; - nxt_process_chan_t *proc, *prev; + nxt_process_port_t *proc, *prev; /* * Preallocate double number of previous cycle @@ -353,7 +353,7 @@ nxt_cycle_processes(nxt_cycle_t *cycle) n = (cycle->previous != NULL) ? cycle->previous->processes->nelts : 1; cycle->processes = nxt_array_create(cycle->mem_pool, 2 * n, - sizeof(nxt_process_chan_t)); + sizeof(nxt_process_port_t)); if (nxt_slow_path(cycle->processes == NULL)) { return NXT_ERROR; diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h index 2e7b0504..4a5b656d 100644 --- a/src/nxt_cycle.h +++ b/src/nxt_cycle.h @@ -47,7 +47,7 @@ struct nxt_cycle_s { nxt_cycle_cont_t continuation; #endif - nxt_array_t *processes; /* of nxt_process_chan_t */ + nxt_array_t *processes; /* of nxt_process_port_t */ nxt_list_t *log_files; /* of nxt_file_t */ diff --git a/src/nxt_main.h b/src/nxt_main.h index 25d52665..54e414d4 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -120,7 +120,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_event_fd.h> -#include <nxt_chan.h> +#include <nxt_port_socket.h> #if (NXT_THREADS) #include <nxt_thread_pool.h> #endif diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index fbf0f6bf..781020e6 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -6,11 +6,11 @@ #include <nxt_main.h> #include <nxt_cycle.h> -#include <nxt_process_chan.h> +#include <nxt_port.h> #include <nxt_master_process.h> -static nxt_int_t nxt_master_process_chan_create(nxt_task_t *task, +static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle); static void nxt_master_process_title(void); static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, @@ -58,7 +58,7 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, { cycle->type = NXT_PROCESS_MASTER; - if (nxt_master_process_chan_create(task, cycle) != NXT_OK) { + if (nxt_master_process_port_create(task, cycle) != NXT_OK) { return NXT_ERROR; } @@ -69,9 +69,9 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, static nxt_int_t -nxt_master_process_chan_create(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_process_chan_t *proc; + nxt_process_port_t *proc; proc = nxt_array_add(cycle->processes); if (nxt_slow_path(proc == NULL)) { @@ -81,16 +81,16 @@ nxt_master_process_chan_create(nxt_task_t *task, nxt_cycle_t *cycle) proc->pid = nxt_pid; proc->engine = 0; - proc->chan = nxt_chan_create(0); - if (nxt_slow_path(proc->chan == NULL)) { + proc->port = nxt_port_create(0); + if (nxt_slow_path(proc->port == NULL)) { return NXT_ERROR; } /* - * A master process chan. A write chan is not closed + * A master process port. A write port is not closed * since it should be inherited by worker processes. */ - nxt_chan_read_enable(task, proc->chan); + nxt_port_read_enable(task, proc->port); return NXT_OK; } @@ -144,7 +144,7 @@ static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_pid_t pid; - nxt_process_chan_t *proc; + nxt_process_port_t *proc; proc = nxt_array_add(cycle->processes); if (nxt_slow_path(proc == NULL)) { @@ -156,8 +156,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) proc->engine = 0; proc->generation = cycle->process_generation; - proc->chan = nxt_chan_create(0); - if (nxt_slow_path(proc->chan == NULL)) { + proc->port = nxt_port_create(0); + if (nxt_slow_path(proc->port == NULL)) { return NXT_ERROR; } @@ -177,10 +177,10 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) /* The master process created a new process. */ proc->pid = pid; - nxt_chan_read_close(proc->chan); - nxt_chan_write_enable(task, proc->chan); + nxt_port_read_close(proc->port); + nxt_port_write_enable(task, proc->port); - nxt_process_new_chan(task, cycle, proc); + nxt_process_new_port(task, cycle, proc); return NXT_OK; } } @@ -253,7 +253,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, uint32_t generation; nxt_uint_t i, n; nxt_cycle_t *cycle; - nxt_process_chan_t *proc; + nxt_process_port_t *proc; cycle = nxt_thread_cycle(); @@ -266,7 +266,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, for (i = 1; i < n; i++) { if (proc[i].generation == generation) { - (void) nxt_chan_write(task, proc[i].chan, NXT_CHAN_MSG_QUIT, + (void) nxt_port_write(task, proc[i].port, NXT_PORT_MSG_QUIT, -1, 0, NULL); } } @@ -278,7 +278,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_process_chan_write(task, cycle, NXT_CHAN_MSG_QUIT, -1, 0, NULL); + nxt_process_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL); } @@ -366,7 +366,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_list_each(file, cycle->log_files) { - nxt_process_chan_change_log_file(task, cycle, n, new_file[n].fd); + nxt_process_port_change_log_file(task, cycle, n, new_file[n].fd); /* * The old log file descriptor must be closed at the moment * when no other threads use it. dup2() allows to use the @@ -619,7 +619,7 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { nxt_uint_t i, n, generation; nxt_cycle_t *cycle; - nxt_process_chan_t *proc; + nxt_process_port_t *proc; cycle = nxt_thread_cycle(); diff --git a/src/nxt_port.c b/src/nxt_port.c new file mode 100644 index 00000000..c00fcb61 --- /dev/null +++ b/src/nxt_port.c @@ -0,0 +1,265 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_cycle.h> +#include <nxt_port.h> + + +static void nxt_process_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, + void *data); + + +void +nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, + nxt_process_port_handler_t *handlers) +{ + proc->pid = nxt_pid; + proc->engine = thr->engine->id; + proc->port->handler = nxt_process_port_handler; + proc->port->data = handlers; + + nxt_port_write_close(proc->port); + nxt_port_read_enable(&thr->engine->task, proc->port); +} + + +void +nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) +{ + nxt_uint_t i, n; + nxt_process_port_t *proc; + + proc = cycle->processes->elts; + n = cycle->processes->nelts; + + for (i = 0; i < n; i++) { + if (nxt_pid != proc[i].pid) { + (void) nxt_port_write(task, proc[i].port, type, fd, stream, b); + } + } +} + + +static void +nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_process_port_handler_t *handlers; + + if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { + + nxt_debug(task, "port %d: message type:%uD", + msg->port->socket.fd, msg->type); + + handlers = msg->port->data; + handlers[msg->type](task, msg); + + return; + } + + nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", + msg->port->socket.fd, msg->type); +} + + +void +nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_cycle_quit(task, NULL); +} + + +void +nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_process_port_t *proc) +{ + nxt_buf_t *b; + nxt_uint_t i, n; + nxt_process_port_t *p; + nxt_proc_msg_new_port_t *new_port; + + n = cycle->processes->nelts; + if (n == 0) { + return; + } + + nxt_thread_log_debug("new port %d for process %PI engine %uD", + proc->port->socket.fd, proc->pid, proc->engine); + + p = cycle->processes->elts; + + for (i = 0; i < n; i++) { + + if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) { + continue; + } + + b = nxt_buf_mem_alloc(p[i].port->mem_pool, + sizeof(nxt_process_port_data_t), 0); + + if (nxt_slow_path(b == NULL)) { + continue; + } + + b->data = p[i].port; + b->completion_handler = nxt_process_new_port_buf_completion; + b->mem.free += sizeof(nxt_proc_msg_new_port_t); + new_port = (nxt_proc_msg_new_port_t *) b->mem.pos; + + new_port->pid = proc->pid; + new_port->engine = proc->engine; + new_port->max_size = p[i].port->max_size; + new_port->max_share = p[i].port->max_share; + + (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT, + proc->port->socket.fd, 0, b); + } +} + + +static void +nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_port_t *port; + + b = obj; + port = b->data; + + /* TODO: b->mem.pos */ + + nxt_buf_free(port->mem_pool, b); +} + + +void +nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port; + nxt_cycle_t *cycle; + nxt_process_port_t *proc; + nxt_proc_msg_new_port_t *new_port; + + cycle = nxt_thread_cycle(); + + proc = nxt_array_add(cycle->processes); + if (nxt_slow_path(proc == NULL)) { + return; + } + + port = nxt_port_alloc(); + if (nxt_slow_path(port == NULL)) { + return; + } + + proc->port = port; + + new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos; + msg->buf->mem.pos = msg->buf->mem.free; + + nxt_debug(task, "new port %d received for process %PI engine %uD", + msg->fd, new_port->pid, new_port->engine); + + proc->pid = new_port->pid; + proc->engine = new_port->engine; + port->pair[1] = msg->fd; + port->max_size = new_port->max_size; + port->max_share = new_port->max_share; + + /* A read port is not passed at all. */ + nxt_port_write_enable(task, port); +} + + +void +nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_uint_t slot, nxt_fd_t fd) +{ + nxt_buf_t *b; + nxt_uint_t i, n; + nxt_process_port_t *p; + + n = cycle->processes->nelts; + if (n == 0) { + return; + } + + nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd); + + p = cycle->processes->elts; + + /* p[0] is master process. */ + + for (i = 1; i < n; i++) { + b = nxt_buf_mem_alloc(p[i].port->mem_pool, + sizeof(nxt_process_port_data_t), 0); + + if (nxt_slow_path(b == NULL)) { + continue; + } + + *(nxt_uint_t *) b->mem.pos = slot; + b->mem.free += sizeof(nxt_uint_t); + + (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE, + fd, 0, b); + } +} + + +void +nxt_process_port_change_log_file_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *b; + nxt_uint_t slot; + nxt_file_t *log_file; + nxt_cycle_t *cycle; + + cycle = nxt_thread_cycle(); + + b = msg->buf; + slot = *(nxt_uint_t *) b->mem.pos; + + log_file = nxt_list_elt(cycle->log_files, slot); + + nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); + + /* + * The old log file descriptor must be closed at the moment when no + * other threads use it. dup2() allows to use the old file descriptor + * for new log file. This change is performed atomically in the kernel. + */ + if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { + + if (slot == 0) { + (void) nxt_file_stderr(log_file); + } + } +} + + +void +nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *b; + + b = msg->buf; + + nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); + + b->mem.pos = b->mem.free; +} + + +void +nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_debug(task, "port empty handler"); +} diff --git a/src/nxt_port.h b/src/nxt_port.h new file mode 100644 index 00000000..b195bcd7 --- /dev/null +++ b/src/nxt_port.h @@ -0,0 +1,68 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PORT_H_INCLUDED_ +#define _NXT_PORT_H_INCLUDED_ + + +#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA + +typedef enum { + NXT_PORT_MSG_QUIT = 0, + NXT_PORT_MSG_NEW_PORT, + NXT_PORT_MSG_PORTGE_FILE, + NXT_PORT_MSG_DATA, +} nxt_port_msg_type_e; + + +typedef struct { + nxt_pid_t pid; + uint32_t engine; + uint32_t generation; + nxt_port_t *port; +} nxt_process_port_t; + + +typedef struct { + nxt_pid_t pid; + uint32_t engine; + size_t max_size; + size_t max_share; +} nxt_proc_msg_new_port_t; + + +/* + * nxt_process_port_data_t is allocaiton size + * enabling effective reuse of memory pool cache. + */ +typedef union { + nxt_buf_t buf; + nxt_proc_msg_new_port_t new_port; +} nxt_process_port_data_t; + + +typedef void (*nxt_process_port_handler_t)(nxt_task_t *task, + nxt_port_recv_msg_t *msg); + + +void nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, + nxt_process_port_handler_t *handlers); +void nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); +void nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_process_port_t *proc); +void nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_uint_t slot, nxt_fd_t fd); + +void nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_process_port_change_log_file_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +void nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); + + +#endif /* _NXT_PORT_H_INCLUDED_ */ diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c new file mode 100644 index 00000000..4dc72d64 --- /dev/null +++ b/src/nxt_port_socket.c @@ -0,0 +1,464 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, + nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); +static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); +static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); +static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); + + +nxt_port_t * +nxt_port_alloc(void) +{ + nxt_port_t *port; + nxt_mem_pool_t *mp; + + mp = nxt_mem_pool_create(1024); + + if (nxt_fast_path(mp != NULL)) { + /* This allocation cannot fail. */ + port = nxt_mem_zalloc(mp, sizeof(nxt_port_t)); + port->mem_pool = mp; + + port->pair[0] = -1; + port->pair[1] = -1; + + nxt_queue_init(&port->messages); + + return port; + } + + return NULL; +} + + +nxt_port_t * +nxt_port_create(size_t max_size) +{ + nxt_int_t sndbuf, rcvbuf, size; + nxt_port_t *port; + nxt_socket_t snd, rcv; + + port = nxt_port_alloc(); + if (nxt_slow_path(port == NULL)) { + return NULL; + } + + if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) { + goto socketpair_fail; + } + + snd = port->pair[1]; + + sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); + if (nxt_slow_path(sndbuf < 0)) { + goto getsockopt_fail; + } + + rcv = port->pair[0]; + + rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); + if (nxt_slow_path(rcvbuf < 0)) { + goto getsockopt_fail; + } + + if (max_size == 0) { + max_size = 16 * 1024; + } + + if ((size_t) sndbuf < max_size) { + /* + * On Unix domain sockets + * Linux uses 224K on both send and receive directions; + * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size + * on send direction and 4K buffer size on receive direction; + * Solaris uses 16K on send direction and 5K on receive direction. + */ + (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size); + + sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); + if (nxt_slow_path(sndbuf < 0)) { + goto getsockopt_fail; + } + + size = sndbuf * 4; + + if (rcvbuf < size) { + (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size); + + rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); + if (nxt_slow_path(rcvbuf < 0)) { + goto getsockopt_fail; + } + } + } + + port->max_size = nxt_min(max_size, (size_t) sndbuf); + port->max_share = (64 * 1024); + + return port; + +getsockopt_fail: + + nxt_socket_close(port->pair[0]); + nxt_socket_close(port->pair[1]); + +socketpair_fail: + + nxt_mem_pool_destroy(port->mem_pool); + + return NULL; +} + + +void +nxt_port_destroy(nxt_port_t *port) +{ + nxt_socket_close(port->socket.fd); + nxt_mem_pool_destroy(port->mem_pool); +} + + +void +nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) +{ + port->socket.fd = port->pair[1]; + port->socket.log = &nxt_main_log; + port->socket.write_ready = 1; + + port->task.thread = task->thread; + port->task.log = port->socket.log; + port->task.ident = nxt_task_next_ident(); + + port->socket.task = &port->task; + + port->socket.write_work_queue = &task->thread->engine->fast_work_queue; + port->socket.write_handler = nxt_port_write_handler; + port->socket.error_handler = nxt_port_error_handler; +} + + +void +nxt_port_write_close(nxt_port_t *port) +{ + nxt_socket_close(port->pair[1]); + port->pair[1] = -1; +} + + +nxt_int_t +nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) +{ + nxt_queue_link_t *link; + nxt_port_send_msg_t *msg; + + for (link = nxt_queue_first(&port->messages); + link != nxt_queue_tail(&port->messages); + link = nxt_queue_next(link)) + { + msg = (nxt_port_send_msg_t *) link; + + if (msg->port_msg.stream == stream) { + /* + * An fd is ignored since a file descriptor + * must be sent only in the first message of a stream. + */ + nxt_buf_chain_add(&msg->buf, b); + + return NXT_OK; + } + } + + msg = nxt_mem_cache_zalloc0(port->mem_pool, sizeof(nxt_port_send_msg_t)); + if (nxt_slow_path(msg == NULL)) { + return NXT_ERROR; + } + + msg->buf = b; + msg->fd = fd; + msg->share = 0; + + msg->port_msg.stream = stream; + msg->port_msg.type = type; + msg->port_msg.last = 0; + + nxt_queue_insert_tail(&port->messages, &msg->link); + + if (port->socket.write_ready) { + nxt_port_write_handler(task, port, NULL); + } + + return NXT_OK; +} + + +static void +nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_uint_t niob; + nxt_port_t *port; + struct iovec iob[NXT_IOBUF_MAX]; + nxt_queue_link_t *link; + nxt_port_send_msg_t *msg; + nxt_sendbuf_coalesce_t sb; + + port = obj; + + do { + link = nxt_queue_first(&port->messages); + + if (link == nxt_queue_tail(&port->messages)) { + nxt_event_fd_block_write(task->thread->engine, &port->socket); + return; + } + + msg = (nxt_port_send_msg_t *) link; + + nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t)); + + sb.buf = msg->buf; + sb.iobuf = &iob[1]; + sb.nmax = NXT_IOBUF_MAX - 1; + sb.sync = 0; + sb.last = 0; + sb.size = sizeof(nxt_port_msg_t); + sb.limit = port->max_size; + + niob = nxt_sendbuf_mem_coalesce(task, &sb); + + msg->port_msg.last = sb.last; + + n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1); + + if (n > 0) { + if (nxt_slow_path((size_t) n != sb.size)) { + nxt_log(task, NXT_LOG_CRIT, + "port %d: short write: %z instead of %uz", + port->socket.fd, n, sb.size); + goto fail; + } + + msg->buf = nxt_sendbuf_completion(task, + port->socket.write_work_queue, + msg->buf, + n - sizeof(nxt_port_msg_t)); + + if (msg->buf != NULL) { + /* + * A file descriptor is sent only + * in the first message of a stream. + */ + msg->fd = -1; + msg->share += n; + + if (msg->share >= port->max_share) { + msg->share = 0; + nxt_queue_remove(link); + nxt_queue_insert_tail(&port->messages, link); + } + + } else { + nxt_queue_remove(link); + nxt_mem_cache_free0(port->mem_pool, msg, + sizeof(nxt_port_send_msg_t)); + } + + } else if (nxt_slow_path(n == NXT_ERROR)) { + goto fail; + } + + /* n == NXT_AGAIN */ + + } while (port->socket.write_ready); + + if (nxt_event_fd_is_disabled(port->socket.write)) { + nxt_event_fd_enable_write(task->thread->engine, &port->socket); + } + + return; + +fail: + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_port_error_handler, task, &port->socket, NULL); +} + + +void +nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) +{ + port->socket.fd = port->pair[0]; + port->socket.log = &nxt_main_log; + + port->task.thread = task->thread; + port->task.log = port->socket.log; + port->task.ident = nxt_task_next_ident(); + + port->socket.task = &port->task; + + port->socket.read_work_queue = &task->thread->engine->fast_work_queue; + port->socket.read_handler = nxt_port_read_handler; + port->socket.error_handler = nxt_port_error_handler; + + nxt_event_fd_enable_read(task->thread->engine, &port->socket); +} + + +void +nxt_port_read_close(nxt_port_t *port) +{ + nxt_socket_close(port->pair[0]); + port->pair[0] = -1; +} + + +static void +nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_fd_t fd; + nxt_buf_t *b; + nxt_port_t *port; + nxt_iobuf_t iob[2]; + nxt_port_msg_t msg; + + port = obj; + + for ( ;; ) { + + b = nxt_port_buf_alloc(port); + + if (nxt_slow_path(b == NULL)) { + /* TODO: disable event for some time */ + } + + nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t)); + nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size); + + n = nxt_socketpair_recv(&port->socket, &fd, iob, 2); + + if (n > 0) { + nxt_port_read_msg_process(task, port, &msg, fd, b, n); + + if (b->mem.pos == b->mem.free) { + + if (b->next != NULL) { + /* A sync buffer */ + nxt_buf_free(port->mem_pool, b->next); + } + + nxt_port_buf_free(port, b); + } + + if (port->socket.read_ready) { + continue; + } + + return; + } + + if (n == NXT_AGAIN) { + nxt_port_buf_free(port, b); + + nxt_event_fd_enable_read(task->thread->engine, &port->socket); + return; + } + + /* n == 0 || n == NXT_ERROR */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_port_error_handler, task, &port->socket, NULL); + return; + } +} + + +static void +nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, + nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) +{ + nxt_buf_t *sync; + nxt_port_recv_msg_t recv_msg; + + if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) { + nxt_log(port->socket.task, NXT_LOG_CRIT, + "port %d: too small message:%uz", port->socket.fd, size); + goto fail; + } + + recv_msg.stream = msg->stream; + recv_msg.type = msg->type; + recv_msg.fd = fd; + recv_msg.buf = b; + recv_msg.port = port; + + b->mem.free += size - sizeof(nxt_port_msg_t); + + if (msg->last) { + sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(sync == NULL)) { + goto fail; + } + + b->next = sync; + } + + port->handler(task, &recv_msg); + + return; + +fail: + + if (fd != -1) { + nxt_fd_close(fd); + } +} + + +static nxt_buf_t * +nxt_port_buf_alloc(nxt_port_t *port) +{ + nxt_buf_t *b; + + if (port->free_bufs != NULL) { + b = port->free_bufs; + port->free_bufs = b->next; + + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + + } else { + b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + } + + return b; +} + + +static void +nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) +{ + b->next = port->free_bufs; + port->free_bufs = b; +} + + +static void +nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) +{ + /* TODO */ +} diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h new file mode 100644 index 00000000..b72efeba --- /dev/null +++ b/src/nxt_port_socket.h @@ -0,0 +1,75 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PORT_SOCKET_H_INCLUDED_ +#define _NXT_PORT_SOCKET_H_INCLUDED_ + + +typedef struct { + uint32_t stream; + + uint16_t type; + uint8_t last; /* 1 bit */ +} nxt_port_msg_t; + + +typedef struct { + nxt_queue_link_t link; + nxt_buf_t *buf; + size_t share; + nxt_fd_t fd; + nxt_port_msg_t port_msg; +} nxt_port_send_msg_t; + + +typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; +typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); + + +typedef struct { + /* Must be the first field. */ + nxt_event_fd_t socket; + + nxt_task_t task; + + nxt_queue_t messages; /* of nxt_port_send_msg_t */ + + /* Maximum size of message part. */ + uint32_t max_size; + /* Maximum interleave of message parts. */ + uint32_t max_share; + + nxt_port_handler_t handler; + void *data; + + nxt_mem_pool_t *mem_pool; + nxt_buf_t *free_bufs; + nxt_socket_t pair[2]; +} nxt_port_t; + + +struct nxt_port_recv_msg_s { + uint32_t stream; + uint16_t type; + + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_port_t *port; +}; + + +NXT_EXPORT nxt_port_t *nxt_port_alloc(void); +NXT_EXPORT nxt_port_t *nxt_port_create(size_t bufsize); +NXT_EXPORT void nxt_port_destroy(nxt_port_t *port); +NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); +NXT_EXPORT void nxt_port_write_close(nxt_port_t *port); +NXT_EXPORT void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); +NXT_EXPORT void nxt_port_read_close(nxt_port_t *port); +NXT_EXPORT nxt_int_t nxt_port_write(nxt_task_t *task, nxt_port_t *port, + nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); + + +#endif /* _NXT_PORT_SOCKET_H_INCLUDED_ */ diff --git a/src/nxt_process_chan.c b/src/nxt_process_chan.c deleted file mode 100644 index 2986f62b..00000000 --- a/src/nxt_process_chan.c +++ /dev/null @@ -1,265 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> -#include <nxt_cycle.h> -#include <nxt_process_chan.h> - - -static void nxt_process_chan_handler(nxt_task_t *task, - nxt_chan_recv_msg_t *msg); -static void nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj, - void *data); - - -void -nxt_process_chan_create(nxt_thread_t *thr, nxt_process_chan_t *proc, - nxt_process_chan_handler_t *handlers) -{ - proc->pid = nxt_pid; - proc->engine = thr->engine->id; - proc->chan->handler = nxt_process_chan_handler; - proc->chan->data = handlers; - - nxt_chan_write_close(proc->chan); - nxt_chan_read_enable(&thr->engine->task, proc->chan); -} - - -void -nxt_process_chan_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) -{ - nxt_uint_t i, n; - nxt_process_chan_t *proc; - - proc = cycle->processes->elts; - n = cycle->processes->nelts; - - for (i = 0; i < n; i++) { - if (nxt_pid != proc[i].pid) { - (void) nxt_chan_write(task, proc[i].chan, type, fd, stream, b); - } - } -} - - -static void -nxt_process_chan_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) -{ - nxt_process_chan_handler_t *handlers; - - if (nxt_fast_path(msg->type <= NXT_CHAN_MSG_MAX)) { - - nxt_debug(task, "chan %d: message type:%uD", - msg->chan->socket.fd, msg->type); - - handlers = msg->chan->data; - handlers[msg->type](task, msg); - - return; - } - - nxt_log(task, NXT_LOG_CRIT, "chan %d: unknown message type:%uD", - msg->chan->socket.fd, msg->type); -} - - -void -nxt_process_chan_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) -{ - nxt_cycle_quit(task, NULL); -} - - -void -nxt_process_new_chan(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_process_chan_t *proc) -{ - nxt_buf_t *b; - nxt_uint_t i, n; - nxt_process_chan_t *p; - nxt_proc_msg_new_chan_t *new_chan; - - n = cycle->processes->nelts; - if (n == 0) { - return; - } - - nxt_thread_log_debug("new chan %d for process %PI engine %uD", - proc->chan->socket.fd, proc->pid, proc->engine); - - p = cycle->processes->elts; - - for (i = 0; i < n; i++) { - - if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) { - continue; - } - - b = nxt_buf_mem_alloc(p[i].chan->mem_pool, - sizeof(nxt_process_chan_data_t), 0); - - if (nxt_slow_path(b == NULL)) { - continue; - } - - b->data = p[i].chan; - b->completion_handler = nxt_process_new_chan_buf_completion; - b->mem.free += sizeof(nxt_proc_msg_new_chan_t); - new_chan = (nxt_proc_msg_new_chan_t *) b->mem.pos; - - new_chan->pid = proc->pid; - new_chan->engine = proc->engine; - new_chan->max_size = p[i].chan->max_size; - new_chan->max_share = p[i].chan->max_share; - - (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_NEW_CHAN, - proc->chan->socket.fd, 0, b); - } -} - - -static void -nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_chan_t *chan; - - b = obj; - chan = b->data; - - /* TODO: b->mem.pos */ - - nxt_buf_free(chan->mem_pool, b); -} - - -void -nxt_process_chan_new_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) -{ - nxt_chan_t *chan; - nxt_cycle_t *cycle; - nxt_process_chan_t *proc; - nxt_proc_msg_new_chan_t *new_chan; - - cycle = nxt_thread_cycle(); - - proc = nxt_array_add(cycle->processes); - if (nxt_slow_path(proc == NULL)) { - return; - } - - chan = nxt_chan_alloc(); - if (nxt_slow_path(chan == NULL)) { - return; - } - - proc->chan = chan; - - new_chan = (nxt_proc_msg_new_chan_t *) msg->buf->mem.pos; - msg->buf->mem.pos = msg->buf->mem.free; - - nxt_debug(task, "new chan %d received for process %PI engine %uD", - msg->fd, new_chan->pid, new_chan->engine); - - proc->pid = new_chan->pid; - proc->engine = new_chan->engine; - chan->pair[1] = msg->fd; - chan->max_size = new_chan->max_size; - chan->max_share = new_chan->max_share; - - /* A read chan is not passed at all. */ - nxt_chan_write_enable(task, chan); -} - - -void -nxt_process_chan_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_uint_t slot, nxt_fd_t fd) -{ - nxt_buf_t *b; - nxt_uint_t i, n; - nxt_process_chan_t *p; - - n = cycle->processes->nelts; - if (n == 0) { - return; - } - - nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd); - - p = cycle->processes->elts; - - /* p[0] is master process. */ - - for (i = 1; i < n; i++) { - b = nxt_buf_mem_alloc(p[i].chan->mem_pool, - sizeof(nxt_process_chan_data_t), 0); - - if (nxt_slow_path(b == NULL)) { - continue; - } - - *(nxt_uint_t *) b->mem.pos = slot; - b->mem.free += sizeof(nxt_uint_t); - - (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_CHANGE_FILE, - fd, 0, b); - } -} - - -void -nxt_process_chan_change_log_file_handler(nxt_task_t *task, - nxt_chan_recv_msg_t *msg) -{ - nxt_buf_t *b; - nxt_uint_t slot; - nxt_file_t *log_file; - nxt_cycle_t *cycle; - - cycle = nxt_thread_cycle(); - - b = msg->buf; - slot = *(nxt_uint_t *) b->mem.pos; - - log_file = nxt_list_elt(cycle->log_files, slot); - - nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); - - /* - * The old log file descriptor must be closed at the moment when no - * other threads use it. dup2() allows to use the old file descriptor - * for new log file. This change is performed atomically in the kernel. - */ - if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { - - if (slot == 0) { - (void) nxt_file_stderr(log_file); - } - } -} - - -void -nxt_process_chan_data_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) -{ - nxt_buf_t *b; - - b = msg->buf; - - nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); - - b->mem.pos = b->mem.free; -} - - -void -nxt_process_chan_empty_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) -{ - nxt_debug(task, "chan empty handler"); -} diff --git a/src/nxt_process_chan.h b/src/nxt_process_chan.h deleted file mode 100644 index 18ff7be3..00000000 --- a/src/nxt_process_chan.h +++ /dev/null @@ -1,68 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_PROCESS_CHAN_H_INCLUDED_ -#define _NXT_PROCESS_CHAN_H_INCLUDED_ - - -#define NXT_CHAN_MSG_MAX NXT_CHAN_MSG_DATA - -typedef enum { - NXT_CHAN_MSG_QUIT = 0, - NXT_CHAN_MSG_NEW_CHAN, - NXT_CHAN_MSG_CHANGE_FILE, - NXT_CHAN_MSG_DATA, -} nxt_chan_msg_type_e; - - -typedef struct { - nxt_pid_t pid; - uint32_t engine; - uint32_t generation; - nxt_chan_t *chan; -} nxt_process_chan_t; - - -typedef struct { - nxt_pid_t pid; - uint32_t engine; - size_t max_size; - size_t max_share; -} nxt_proc_msg_new_chan_t; - - -/* - * nxt_process_chan_data_t is allocaiton size - * enabling effective reuse of memory pool cache. - */ -typedef union { - nxt_buf_t buf; - nxt_proc_msg_new_chan_t new_chan; -} nxt_process_chan_data_t; - - -typedef void (*nxt_process_chan_handler_t)(nxt_task_t *task, - nxt_chan_recv_msg_t *msg); - - -void nxt_process_chan_create(nxt_thread_t *thr, nxt_process_chan_t *proc, - nxt_process_chan_handler_t *handlers); -void nxt_process_chan_write(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); -void nxt_process_new_chan(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_process_chan_t *proc); -void nxt_process_chan_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_uint_t slot, nxt_fd_t fd); - -void nxt_process_chan_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg); -void nxt_process_chan_new_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg); -void nxt_process_chan_change_log_file_handler(nxt_task_t *task, - nxt_chan_recv_msg_t *msg); -void nxt_process_chan_data_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg); -void nxt_process_chan_empty_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg); - - -#endif /* _NXT_PROCESS_CHAN_H_INCLUDED_ */ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index f6d1b764..a30ac9e6 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -6,13 +6,13 @@ #include <nxt_main.h> #include <nxt_cycle.h> -#include <nxt_process_chan.h> +#include <nxt_port.h> #include <nxt_master_process.h> static void nxt_worker_process_quit(nxt_task_t *task); static void nxt_worker_process_quit_handler(nxt_task_t *task, - nxt_chan_recv_msg_t *msg); + nxt_port_recv_msg_t *msg); static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data); static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, @@ -21,11 +21,11 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -static nxt_process_chan_handler_t nxt_worker_process_chan_handlers[] = { +static nxt_process_port_handler_t nxt_worker_process_port_handlers[] = { nxt_worker_process_quit_handler, - nxt_process_chan_new_handler, - nxt_process_chan_change_log_file_handler, - nxt_process_chan_data_handler, + nxt_process_port_new_handler, + nxt_process_port_change_log_file_handler, + nxt_process_port_data_handler, }; @@ -47,7 +47,7 @@ nxt_worker_process_start(void *data) nxt_int_t n; nxt_cycle_t *cycle; nxt_thread_t *thr; - nxt_process_chan_t *proc; + nxt_process_port_t *proc; const nxt_event_set_ops_t *event_set; cycle = data; @@ -94,13 +94,13 @@ nxt_worker_process_start(void *data) proc = cycle->processes->elts; - /* A master process chan. */ - nxt_chan_read_close(proc[0].chan); - nxt_chan_write_enable(&nxt_main_task, proc[0].chan); + /* A master process port. */ + nxt_port_read_close(proc[0].port); + nxt_port_write_enable(&nxt_main_task, proc[0].port); - /* A worker process chan. */ - nxt_process_chan_create(thr, &proc[cycle->current_process], - nxt_worker_process_chan_handlers); + /* A worker process port. */ + nxt_process_port_create(thr, &proc[cycle->current_process], + nxt_worker_process_port_handlers); #if (NXT_THREADS) { @@ -182,7 +182,7 @@ nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void -nxt_worker_process_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) +nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_worker_process_quit(task); } |