summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-01 20:03:45 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-01 20:03:45 +0300
commite57b95a92333fa7ff558737b0ba2b76894cc0412 (patch)
treeb2de846a4fd958517be379b552f38652dd6d9078 /src
parent6e67bee0f4f96ad0a7d1a231dfdae8431714c66a (diff)
downloadunit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.gz
unit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.bz2
Process channels have been renamed to ports.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_chan.c464
-rw-r--r--src/nxt_chan.h75
-rw-r--r--src/nxt_cycle.c6
-rw-r--r--src/nxt_cycle.h2
-rw-r--r--src/nxt_main.h2
-rw-r--r--src/nxt_master_process.c40
-rw-r--r--src/nxt_port.c265
-rw-r--r--src/nxt_port.h68
-rw-r--r--src/nxt_port_socket.c464
-rw-r--r--src/nxt_port_socket.h75
-rw-r--r--src/nxt_process_chan.c265
-rw-r--r--src/nxt_process_chan.h68
-rw-r--r--src/nxt_worker_process.c28
13 files changed, 911 insertions, 911 deletions
diff --git a/src/nxt_chan.c b/src/nxt_chan.c
deleted file mode 100644
index 6832ecdc..00000000
--- a/src/nxt_chan.c
+++ /dev/null
@@ -1,464 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
- nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
-static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan);
-static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b);
-static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data);
-
-
-nxt_chan_t *
-nxt_chan_alloc(void)
-{
- nxt_chan_t *chan;
- nxt_mem_pool_t *mp;
-
- mp = nxt_mem_pool_create(1024);
-
- if (nxt_fast_path(mp != NULL)) {
- /* This allocation cannot fail. */
- chan = nxt_mem_zalloc(mp, sizeof(nxt_chan_t));
- chan->mem_pool = mp;
-
- chan->pair[0] = -1;
- chan->pair[1] = -1;
-
- nxt_queue_init(&chan->messages);
-
- return chan;
- }
-
- return NULL;
-}
-
-
-nxt_chan_t *
-nxt_chan_create(size_t max_size)
-{
- nxt_int_t sndbuf, rcvbuf, size;
- nxt_chan_t *chan;
- nxt_socket_t snd, rcv;
-
- chan = nxt_chan_alloc();
- if (nxt_slow_path(chan == NULL)) {
- return NULL;
- }
-
- if (nxt_slow_path(nxt_socketpair_create(chan->pair) != NXT_OK)) {
- goto socketpair_fail;
- }
-
- snd = chan->pair[1];
-
- sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
- if (nxt_slow_path(sndbuf < 0)) {
- goto getsockopt_fail;
- }
-
- rcv = chan->pair[0];
-
- rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
- if (nxt_slow_path(rcvbuf < 0)) {
- goto getsockopt_fail;
- }
-
- if (max_size == 0) {
- max_size = 16 * 1024;
- }
-
- if ((size_t) sndbuf < max_size) {
- /*
- * On Unix domain sockets
- * Linux uses 224K on both send and receive directions;
- * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
- * on send direction and 4K buffer size on receive direction;
- * Solaris uses 16K on send direction and 5K on receive direction.
- */
- (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);
-
- sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
- if (nxt_slow_path(sndbuf < 0)) {
- goto getsockopt_fail;
- }
-
- size = sndbuf * 4;
-
- if (rcvbuf < size) {
- (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);
-
- rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
- if (nxt_slow_path(rcvbuf < 0)) {
- goto getsockopt_fail;
- }
- }
- }
-
- chan->max_size = nxt_min(max_size, (size_t) sndbuf);
- chan->max_share = (64 * 1024);
-
- return chan;
-
-getsockopt_fail:
-
- nxt_socket_close(chan->pair[0]);
- nxt_socket_close(chan->pair[1]);
-
-socketpair_fail:
-
- nxt_mem_pool_destroy(chan->mem_pool);
-
- return NULL;
-}
-
-
-void
-nxt_chan_destroy(nxt_chan_t *chan)
-{
- nxt_socket_close(chan->socket.fd);
- nxt_mem_pool_destroy(chan->mem_pool);
-}
-
-
-void
-nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)
-{
- chan->socket.fd = chan->pair[1];
- chan->socket.log = &nxt_main_log;
- chan->socket.write_ready = 1;
-
- chan->task.thread = task->thread;
- chan->task.log = chan->socket.log;
- chan->task.ident = nxt_task_next_ident();
-
- chan->socket.task = &chan->task;
-
- chan->socket.write_work_queue = &task->thread->engine->fast_work_queue;
- chan->socket.write_handler = nxt_chan_write_handler;
- chan->socket.error_handler = nxt_chan_error_handler;
-}
-
-
-void
-nxt_chan_write_close(nxt_chan_t *chan)
-{
- nxt_socket_close(chan->pair[1]);
- chan->pair[1] = -1;
-}
-
-
-nxt_int_t
-nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
-{
- nxt_queue_link_t *link;
- nxt_chan_send_msg_t *msg;
-
- for (link = nxt_queue_first(&chan->messages);
- link != nxt_queue_tail(&chan->messages);
- link = nxt_queue_next(link))
- {
- msg = (nxt_chan_send_msg_t *) link;
-
- if (msg->chan_msg.stream == stream) {
- /*
- * An fd is ignored since a file descriptor
- * must be sent only in the first message of a stream.
- */
- nxt_buf_chain_add(&msg->buf, b);
-
- return NXT_OK;
- }
- }
-
- msg = nxt_mem_cache_zalloc0(chan->mem_pool, sizeof(nxt_chan_send_msg_t));
- if (nxt_slow_path(msg == NULL)) {
- return NXT_ERROR;
- }
-
- msg->buf = b;
- msg->fd = fd;
- msg->share = 0;
-
- msg->chan_msg.stream = stream;
- msg->chan_msg.type = type;
- msg->chan_msg.last = 0;
-
- nxt_queue_insert_tail(&chan->messages, &msg->link);
-
- if (chan->socket.write_ready) {
- nxt_chan_write_handler(task, chan, NULL);
- }
-
- return NXT_OK;
-}
-
-
-static void
-nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)
-{
- ssize_t n;
- nxt_uint_t niob;
- nxt_chan_t *chan;
- struct iovec iob[NXT_IOBUF_MAX];
- nxt_queue_link_t *link;
- nxt_chan_send_msg_t *msg;
- nxt_sendbuf_coalesce_t sb;
-
- chan = obj;
-
- do {
- link = nxt_queue_first(&chan->messages);
-
- if (link == nxt_queue_tail(&chan->messages)) {
- nxt_event_fd_block_write(task->thread->engine, &chan->socket);
- return;
- }
-
- msg = (nxt_chan_send_msg_t *) link;
-
- nxt_iobuf_set(&iob[0], &msg->chan_msg, sizeof(nxt_chan_msg_t));
-
- sb.buf = msg->buf;
- sb.iobuf = &iob[1];
- sb.nmax = NXT_IOBUF_MAX - 1;
- sb.sync = 0;
- sb.last = 0;
- sb.size = sizeof(nxt_chan_msg_t);
- sb.limit = chan->max_size;
-
- niob = nxt_sendbuf_mem_coalesce(task, &sb);
-
- msg->chan_msg.last = sb.last;
-
- n = nxt_socketpair_send(&chan->socket, msg->fd, iob, niob + 1);
-
- if (n > 0) {
- if (nxt_slow_path((size_t) n != sb.size)) {
- nxt_log(task, NXT_LOG_CRIT,
- "chan %d: short write: %z instead of %uz",
- chan->socket.fd, n, sb.size);
- goto fail;
- }
-
- msg->buf = nxt_sendbuf_completion(task,
- chan->socket.write_work_queue,
- msg->buf,
- n - sizeof(nxt_chan_msg_t));
-
- if (msg->buf != NULL) {
- /*
- * A file descriptor is sent only
- * in the first message of a stream.
- */
- msg->fd = -1;
- msg->share += n;
-
- if (msg->share >= chan->max_share) {
- msg->share = 0;
- nxt_queue_remove(link);
- nxt_queue_insert_tail(&chan->messages, link);
- }
-
- } else {
- nxt_queue_remove(link);
- nxt_mem_cache_free0(chan->mem_pool, msg,
- sizeof(nxt_chan_send_msg_t));
- }
-
- } else if (nxt_slow_path(n == NXT_ERROR)) {
- goto fail;
- }
-
- /* n == NXT_AGAIN */
-
- } while (chan->socket.write_ready);
-
- if (nxt_event_fd_is_disabled(chan->socket.write)) {
- nxt_event_fd_enable_write(task->thread->engine, &chan->socket);
- }
-
- return;
-
-fail:
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_chan_error_handler, task, &chan->socket, NULL);
-}
-
-
-void
-nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)
-{
- chan->socket.fd = chan->pair[0];
- chan->socket.log = &nxt_main_log;
-
- chan->task.thread = task->thread;
- chan->task.log = chan->socket.log;
- chan->task.ident = nxt_task_next_ident();
-
- chan->socket.task = &chan->task;
-
- chan->socket.read_work_queue = &task->thread->engine->fast_work_queue;
- chan->socket.read_handler = nxt_chan_read_handler;
- chan->socket.error_handler = nxt_chan_error_handler;
-
- nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
-}
-
-
-void
-nxt_chan_read_close(nxt_chan_t *chan)
-{
- nxt_socket_close(chan->pair[0]);
- chan->pair[0] = -1;
-}
-
-
-static void
-nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)
-{
- ssize_t n;
- nxt_fd_t fd;
- nxt_buf_t *b;
- nxt_chan_t *chan;
- nxt_iobuf_t iob[2];
- nxt_chan_msg_t msg;
-
- chan = obj;
-
- for ( ;; ) {
-
- b = nxt_chan_buf_alloc(chan);
-
- if (nxt_slow_path(b == NULL)) {
- /* TODO: disable event for some time */
- }
-
- nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_chan_msg_t));
- nxt_iobuf_set(&iob[1], b->mem.pos, chan->max_size);
-
- n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2);
-
- if (n > 0) {
- nxt_chan_read_msg_process(task, chan, &msg, fd, b, n);
-
- if (b->mem.pos == b->mem.free) {
-
- if (b->next != NULL) {
- /* A sync buffer */
- nxt_buf_free(chan->mem_pool, b->next);
- }
-
- nxt_chan_buf_free(chan, b);
- }
-
- if (chan->socket.read_ready) {
- continue;
- }
-
- return;
- }
-
- if (n == NXT_AGAIN) {
- nxt_chan_buf_free(chan, b);
-
- nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
- return;
- }
-
- /* n == 0 || n == NXT_ERROR */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_chan_error_handler, task, &chan->socket, NULL);
- return;
- }
-}
-
-
-static void
-nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
- nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
-{
- nxt_buf_t *sync;
- nxt_chan_recv_msg_t recv_msg;
-
- if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) {
- nxt_log(chan->socket.task, NXT_LOG_CRIT,
- "chan %d: too small message:%uz", chan->socket.fd, size);
- goto fail;
- }
-
- recv_msg.stream = msg->stream;
- recv_msg.type = msg->type;
- recv_msg.fd = fd;
- recv_msg.buf = b;
- recv_msg.chan = chan;
-
- b->mem.free += size - sizeof(nxt_chan_msg_t);
-
- if (msg->last) {
- sync = nxt_buf_sync_alloc(chan->mem_pool, NXT_BUF_SYNC_LAST);
- if (nxt_slow_path(sync == NULL)) {
- goto fail;
- }
-
- b->next = sync;
- }
-
- chan->handler(task, &recv_msg);
-
- return;
-
-fail:
-
- if (fd != -1) {
- nxt_fd_close(fd);
- }
-}
-
-
-static nxt_buf_t *
-nxt_chan_buf_alloc(nxt_chan_t *chan)
-{
- nxt_buf_t *b;
-
- if (chan->free_bufs != NULL) {
- b = chan->free_bufs;
- chan->free_bufs = b->next;
-
- b->mem.pos = b->mem.start;
- b->mem.free = b->mem.start;
-
- } else {
- b = nxt_buf_mem_alloc(chan->mem_pool, chan->max_size, 0);
- if (nxt_slow_path(b == NULL)) {
- return NULL;
- }
- }
-
- return b;
-}
-
-
-static void
-nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b)
-{
- b->next = chan->free_bufs;
- chan->free_bufs = b;
-}
-
-
-static void
-nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data)
-{
- /* TODO */
-}
diff --git a/src/nxt_chan.h b/src/nxt_chan.h
deleted file mode 100644
index 48cd1a9b..00000000
--- a/src/nxt_chan.h
+++ /dev/null
@@ -1,75 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_UNIX_CHAN_H_INCLUDED_
-#define _NXT_UNIX_CHAN_H_INCLUDED_
-
-
-typedef struct {
- uint32_t stream;
-
- uint16_t type;
- uint8_t last; /* 1 bit */
-} nxt_chan_msg_t;
-
-
-typedef struct {
- nxt_queue_link_t link;
- nxt_buf_t *buf;
- size_t share;
- nxt_fd_t fd;
- nxt_chan_msg_t chan_msg;
-} nxt_chan_send_msg_t;
-
-
-typedef struct nxt_chan_recv_msg_s nxt_chan_recv_msg_t;
-typedef void (*nxt_chan_handler_t)(nxt_task_t *task, nxt_chan_recv_msg_t *msg);
-
-
-typedef struct {
- /* Must be the first field. */
- nxt_event_fd_t socket;
-
- nxt_task_t task;
-
- nxt_queue_t messages; /* of nxt_chan_send_msg_t */
-
- /* Maximum size of message part. */
- uint32_t max_size;
- /* Maximum interleave of message parts. */
- uint32_t max_share;
-
- nxt_chan_handler_t handler;
- void *data;
-
- nxt_mem_pool_t *mem_pool;
- nxt_buf_t *free_bufs;
- nxt_socket_t pair[2];
-} nxt_chan_t;
-
-
-struct nxt_chan_recv_msg_s {
- uint32_t stream;
- uint16_t type;
-
- nxt_fd_t fd;
- nxt_buf_t *buf;
- nxt_chan_t *chan;
-};
-
-
-NXT_EXPORT nxt_chan_t *nxt_chan_alloc(void);
-NXT_EXPORT nxt_chan_t *nxt_chan_create(size_t bufsize);
-NXT_EXPORT void nxt_chan_destroy(nxt_chan_t *chan);
-NXT_EXPORT void nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan);
-NXT_EXPORT void nxt_chan_write_close(nxt_chan_t *chan);
-NXT_EXPORT void nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan);
-NXT_EXPORT void nxt_chan_read_close(nxt_chan_t *chan);
-NXT_EXPORT nxt_int_t nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
-
-
-#endif /* _NXT_UNIX_CHAN_H_INCLUDED_ */
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index c18089a4..447f0b37 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -7,7 +7,7 @@
#include <nxt_main.h>
#include <nxt_cycle.h>
-#include <nxt_process_chan.h>
+#include <nxt_port.h>
#include <nxt_master_process.h>
@@ -344,7 +344,7 @@ static nxt_int_t
nxt_cycle_processes(nxt_cycle_t *cycle)
{
nxt_uint_t n;
- nxt_process_chan_t *proc, *prev;
+ nxt_process_port_t *proc, *prev;
/*
* Preallocate double number of previous cycle
@@ -353,7 +353,7 @@ nxt_cycle_processes(nxt_cycle_t *cycle)
n = (cycle->previous != NULL) ? cycle->previous->processes->nelts : 1;
cycle->processes = nxt_array_create(cycle->mem_pool, 2 * n,
- sizeof(nxt_process_chan_t));
+ sizeof(nxt_process_port_t));
if (nxt_slow_path(cycle->processes == NULL)) {
return NXT_ERROR;
diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h
index 2e7b0504..4a5b656d 100644
--- a/src/nxt_cycle.h
+++ b/src/nxt_cycle.h
@@ -47,7 +47,7 @@ struct nxt_cycle_s {
nxt_cycle_cont_t continuation;
#endif
- nxt_array_t *processes; /* of nxt_process_chan_t */
+ nxt_array_t *processes; /* of nxt_process_port_t */
nxt_list_t *log_files; /* of nxt_file_t */
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 25d52665..54e414d4 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -120,7 +120,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_event_fd.h>
-#include <nxt_chan.h>
+#include <nxt_port_socket.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
#endif
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index fbf0f6bf..781020e6 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -6,11 +6,11 @@
#include <nxt_main.h>
#include <nxt_cycle.h>
-#include <nxt_process_chan.h>
+#include <nxt_port.h>
#include <nxt_master_process.h>
-static nxt_int_t nxt_master_process_chan_create(nxt_task_t *task,
+static nxt_int_t nxt_master_process_port_create(nxt_task_t *task,
nxt_cycle_t *cycle);
static void nxt_master_process_title(void);
static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task,
@@ -58,7 +58,7 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
{
cycle->type = NXT_PROCESS_MASTER;
- if (nxt_master_process_chan_create(task, cycle) != NXT_OK) {
+ if (nxt_master_process_port_create(task, cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -69,9 +69,9 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
static nxt_int_t
-nxt_master_process_chan_create(nxt_task_t *task, nxt_cycle_t *cycle)
+nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_process_chan_t *proc;
+ nxt_process_port_t *proc;
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
@@ -81,16 +81,16 @@ nxt_master_process_chan_create(nxt_task_t *task, nxt_cycle_t *cycle)
proc->pid = nxt_pid;
proc->engine = 0;
- proc->chan = nxt_chan_create(0);
- if (nxt_slow_path(proc->chan == NULL)) {
+ proc->port = nxt_port_create(0);
+ if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
}
/*
- * A master process chan. A write chan is not closed
+ * A master process port. A write port is not closed
* since it should be inherited by worker processes.
*/
- nxt_chan_read_enable(task, proc->chan);
+ nxt_port_read_enable(task, proc->port);
return NXT_OK;
}
@@ -144,7 +144,7 @@ static nxt_int_t
nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_pid_t pid;
- nxt_process_chan_t *proc;
+ nxt_process_port_t *proc;
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
@@ -156,8 +156,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
proc->engine = 0;
proc->generation = cycle->process_generation;
- proc->chan = nxt_chan_create(0);
- if (nxt_slow_path(proc->chan == NULL)) {
+ proc->port = nxt_port_create(0);
+ if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
}
@@ -177,10 +177,10 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
/* The master process created a new process. */
proc->pid = pid;
- nxt_chan_read_close(proc->chan);
- nxt_chan_write_enable(task, proc->chan);
+ nxt_port_read_close(proc->port);
+ nxt_port_write_enable(task, proc->port);
- nxt_process_new_chan(task, cycle, proc);
+ nxt_process_new_port(task, cycle, proc);
return NXT_OK;
}
}
@@ -253,7 +253,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
uint32_t generation;
nxt_uint_t i, n;
nxt_cycle_t *cycle;
- nxt_process_chan_t *proc;
+ nxt_process_port_t *proc;
cycle = nxt_thread_cycle();
@@ -266,7 +266,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
for (i = 1; i < n; i++) {
if (proc[i].generation == generation) {
- (void) nxt_chan_write(task, proc[i].chan, NXT_CHAN_MSG_QUIT,
+ (void) nxt_port_write(task, proc[i].port, NXT_PORT_MSG_QUIT,
-1, 0, NULL);
}
}
@@ -278,7 +278,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_process_chan_write(task, cycle, NXT_CHAN_MSG_QUIT, -1, 0, NULL);
+ nxt_process_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
@@ -366,7 +366,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
nxt_list_each(file, cycle->log_files) {
- nxt_process_chan_change_log_file(task, cycle, n, new_file[n].fd);
+ nxt_process_port_change_log_file(task, cycle, n, new_file[n].fd);
/*
* The old log file descriptor must be closed at the moment
* when no other threads use it. dup2() allows to use the
@@ -619,7 +619,7 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
{
nxt_uint_t i, n, generation;
nxt_cycle_t *cycle;
- nxt_process_chan_t *proc;
+ nxt_process_port_t *proc;
cycle = nxt_thread_cycle();
diff --git a/src/nxt_port.c b/src/nxt_port.c
new file mode 100644
index 00000000..c00fcb61
--- /dev/null
+++ b/src/nxt_port.c
@@ -0,0 +1,265 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_cycle.h>
+#include <nxt_port.h>
+
+
+static void nxt_process_port_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj,
+ void *data);
+
+
+void
+nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
+ nxt_process_port_handler_t *handlers)
+{
+ proc->pid = nxt_pid;
+ proc->engine = thr->engine->id;
+ proc->port->handler = nxt_process_port_handler;
+ proc->port->data = handlers;
+
+ nxt_port_write_close(proc->port);
+ nxt_port_read_enable(&thr->engine->task, proc->port);
+}
+
+
+void
+nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
+{
+ nxt_uint_t i, n;
+ nxt_process_port_t *proc;
+
+ proc = cycle->processes->elts;
+ n = cycle->processes->nelts;
+
+ for (i = 0; i < n; i++) {
+ if (nxt_pid != proc[i].pid) {
+ (void) nxt_port_write(task, proc[i].port, type, fd, stream, b);
+ }
+ }
+}
+
+
+static void
+nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_process_port_handler_t *handlers;
+
+ if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
+
+ nxt_debug(task, "port %d: message type:%uD",
+ msg->port->socket.fd, msg->type);
+
+ handlers = msg->port->data;
+ handlers[msg->type](task, msg);
+
+ return;
+ }
+
+ nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
+ msg->port->socket.fd, msg->type);
+}
+
+
+void
+nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_cycle_quit(task, NULL);
+}
+
+
+void
+nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_process_port_t *proc)
+{
+ nxt_buf_t *b;
+ nxt_uint_t i, n;
+ nxt_process_port_t *p;
+ nxt_proc_msg_new_port_t *new_port;
+
+ n = cycle->processes->nelts;
+ if (n == 0) {
+ return;
+ }
+
+ nxt_thread_log_debug("new port %d for process %PI engine %uD",
+ proc->port->socket.fd, proc->pid, proc->engine);
+
+ p = cycle->processes->elts;
+
+ for (i = 0; i < n; i++) {
+
+ if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) {
+ continue;
+ }
+
+ b = nxt_buf_mem_alloc(p[i].port->mem_pool,
+ sizeof(nxt_process_port_data_t), 0);
+
+ if (nxt_slow_path(b == NULL)) {
+ continue;
+ }
+
+ b->data = p[i].port;
+ b->completion_handler = nxt_process_new_port_buf_completion;
+ b->mem.free += sizeof(nxt_proc_msg_new_port_t);
+ new_port = (nxt_proc_msg_new_port_t *) b->mem.pos;
+
+ new_port->pid = proc->pid;
+ new_port->engine = proc->engine;
+ new_port->max_size = p[i].port->max_size;
+ new_port->max_share = p[i].port->max_share;
+
+ (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT,
+ proc->port->socket.fd, 0, b);
+ }
+}
+
+
+static void
+nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+
+ b = obj;
+ port = b->data;
+
+ /* TODO: b->mem.pos */
+
+ nxt_buf_free(port->mem_pool, b);
+}
+
+
+void
+nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *port;
+ nxt_cycle_t *cycle;
+ nxt_process_port_t *proc;
+ nxt_proc_msg_new_port_t *new_port;
+
+ cycle = nxt_thread_cycle();
+
+ proc = nxt_array_add(cycle->processes);
+ if (nxt_slow_path(proc == NULL)) {
+ return;
+ }
+
+ port = nxt_port_alloc();
+ if (nxt_slow_path(port == NULL)) {
+ return;
+ }
+
+ proc->port = port;
+
+ new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos;
+ msg->buf->mem.pos = msg->buf->mem.free;
+
+ nxt_debug(task, "new port %d received for process %PI engine %uD",
+ msg->fd, new_port->pid, new_port->engine);
+
+ proc->pid = new_port->pid;
+ proc->engine = new_port->engine;
+ port->pair[1] = msg->fd;
+ port->max_size = new_port->max_size;
+ port->max_share = new_port->max_share;
+
+ /* A read port is not passed at all. */
+ nxt_port_write_enable(task, port);
+}
+
+
+void
+nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_uint_t slot, nxt_fd_t fd)
+{
+ nxt_buf_t *b;
+ nxt_uint_t i, n;
+ nxt_process_port_t *p;
+
+ n = cycle->processes->nelts;
+ if (n == 0) {
+ return;
+ }
+
+ nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd);
+
+ p = cycle->processes->elts;
+
+ /* p[0] is master process. */
+
+ for (i = 1; i < n; i++) {
+ b = nxt_buf_mem_alloc(p[i].port->mem_pool,
+ sizeof(nxt_process_port_data_t), 0);
+
+ if (nxt_slow_path(b == NULL)) {
+ continue;
+ }
+
+ *(nxt_uint_t *) b->mem.pos = slot;
+ b->mem.free += sizeof(nxt_uint_t);
+
+ (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE,
+ fd, 0, b);
+ }
+}
+
+
+void
+nxt_process_port_change_log_file_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg)
+{
+ nxt_buf_t *b;
+ nxt_uint_t slot;
+ nxt_file_t *log_file;
+ nxt_cycle_t *cycle;
+
+ cycle = nxt_thread_cycle();
+
+ b = msg->buf;
+ slot = *(nxt_uint_t *) b->mem.pos;
+
+ log_file = nxt_list_elt(cycle->log_files, slot);
+
+ nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
+
+ /*
+ * The old log file descriptor must be closed at the moment when no
+ * other threads use it. dup2() allows to use the old file descriptor
+ * for new log file. This change is performed atomically in the kernel.
+ */
+ if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
+
+ if (slot == 0) {
+ (void) nxt_file_stderr(log_file);
+ }
+ }
+}
+
+
+void
+nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_buf_t *b;
+
+ b = msg->buf;
+
+ nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
+
+ b->mem.pos = b->mem.free;
+}
+
+
+void
+nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_debug(task, "port empty handler");
+}
diff --git a/src/nxt_port.h b/src/nxt_port.h
new file mode 100644
index 00000000..b195bcd7
--- /dev/null
+++ b/src/nxt_port.h
@@ -0,0 +1,68 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_H_INCLUDED_
+#define _NXT_PORT_H_INCLUDED_
+
+
+#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA
+
+typedef enum {
+ NXT_PORT_MSG_QUIT = 0,
+ NXT_PORT_MSG_NEW_PORT,
+ NXT_PORT_MSG_PORTGE_FILE,
+ NXT_PORT_MSG_DATA,
+} nxt_port_msg_type_e;
+
+
+typedef struct {
+ nxt_pid_t pid;
+ uint32_t engine;
+ uint32_t generation;
+ nxt_port_t *port;
+} nxt_process_port_t;
+
+
+typedef struct {
+ nxt_pid_t pid;
+ uint32_t engine;
+ size_t max_size;
+ size_t max_share;
+} nxt_proc_msg_new_port_t;
+
+
+/*
+ * nxt_process_port_data_t is allocaiton size
+ * enabling effective reuse of memory pool cache.
+ */
+typedef union {
+ nxt_buf_t buf;
+ nxt_proc_msg_new_port_t new_port;
+} nxt_process_port_data_t;
+
+
+typedef void (*nxt_process_port_handler_t)(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+
+
+void nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
+ nxt_process_port_handler_t *handlers);
+void nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
+void nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_process_port_t *proc);
+void nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_uint_t slot, nxt_fd_t fd);
+
+void nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_process_port_change_log_file_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+void nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
+
+#endif /* _NXT_PORT_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
new file mode 100644
index 00000000..4dc72d64
--- /dev/null
+++ b/src/nxt_port_socket.c
@@ -0,0 +1,464 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
+static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
+static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
+static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
+
+
+nxt_port_t *
+nxt_port_alloc(void)
+{
+ nxt_port_t *port;
+ nxt_mem_pool_t *mp;
+
+ mp = nxt_mem_pool_create(1024);
+
+ if (nxt_fast_path(mp != NULL)) {
+ /* This allocation cannot fail. */
+ port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
+ port->mem_pool = mp;
+
+ port->pair[0] = -1;
+ port->pair[1] = -1;
+
+ nxt_queue_init(&port->messages);
+
+ return port;
+ }
+
+ return NULL;
+}
+
+
+nxt_port_t *
+nxt_port_create(size_t max_size)
+{
+ nxt_int_t sndbuf, rcvbuf, size;
+ nxt_port_t *port;
+ nxt_socket_t snd, rcv;
+
+ port = nxt_port_alloc();
+ if (nxt_slow_path(port == NULL)) {
+ return NULL;
+ }
+
+ if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) {
+ goto socketpair_fail;
+ }
+
+ snd = port->pair[1];
+
+ sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ if (nxt_slow_path(sndbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ rcv = port->pair[0];
+
+ rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ if (nxt_slow_path(rcvbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ if (max_size == 0) {
+ max_size = 16 * 1024;
+ }
+
+ if ((size_t) sndbuf < max_size) {
+ /*
+ * On Unix domain sockets
+ * Linux uses 224K on both send and receive directions;
+ * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
+ * on send direction and 4K buffer size on receive direction;
+ * Solaris uses 16K on send direction and 5K on receive direction.
+ */
+ (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);
+
+ sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ if (nxt_slow_path(sndbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ size = sndbuf * 4;
+
+ if (rcvbuf < size) {
+ (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);
+
+ rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ if (nxt_slow_path(rcvbuf < 0)) {
+ goto getsockopt_fail;
+ }
+ }
+ }
+
+ port->max_size = nxt_min(max_size, (size_t) sndbuf);
+ port->max_share = (64 * 1024);
+
+ return port;
+
+getsockopt_fail:
+
+ nxt_socket_close(port->pair[0]);
+ nxt_socket_close(port->pair[1]);
+
+socketpair_fail:
+
+ nxt_mem_pool_destroy(port->mem_pool);
+
+ return NULL;
+}
+
+
+void
+nxt_port_destroy(nxt_port_t *port)
+{
+ nxt_socket_close(port->socket.fd);
+ nxt_mem_pool_destroy(port->mem_pool);
+}
+
+
+void
+nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
+{
+ port->socket.fd = port->pair[1];
+ port->socket.log = &nxt_main_log;
+ port->socket.write_ready = 1;
+
+ port->task.thread = task->thread;
+ port->task.log = port->socket.log;
+ port->task.ident = nxt_task_next_ident();
+
+ port->socket.task = &port->task;
+
+ port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
+ port->socket.write_handler = nxt_port_write_handler;
+ port->socket.error_handler = nxt_port_error_handler;
+}
+
+
+void
+nxt_port_write_close(nxt_port_t *port)
+{
+ nxt_socket_close(port->pair[1]);
+ port->pair[1] = -1;
+}
+
+
+nxt_int_t
+nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
+{
+ nxt_queue_link_t *link;
+ nxt_port_send_msg_t *msg;
+
+ for (link = nxt_queue_first(&port->messages);
+ link != nxt_queue_tail(&port->messages);
+ link = nxt_queue_next(link))
+ {
+ msg = (nxt_port_send_msg_t *) link;
+
+ if (msg->port_msg.stream == stream) {
+ /*
+ * An fd is ignored since a file descriptor
+ * must be sent only in the first message of a stream.
+ */
+ nxt_buf_chain_add(&msg->buf, b);
+
+ return NXT_OK;
+ }
+ }
+
+ msg = nxt_mem_cache_zalloc0(port->mem_pool, sizeof(nxt_port_send_msg_t));
+ if (nxt_slow_path(msg == NULL)) {
+ return NXT_ERROR;
+ }
+
+ msg->buf = b;
+ msg->fd = fd;
+ msg->share = 0;
+
+ msg->port_msg.stream = stream;
+ msg->port_msg.type = type;
+ msg->port_msg.last = 0;
+
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+
+ if (port->socket.write_ready) {
+ nxt_port_write_handler(task, port, NULL);
+ }
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_uint_t niob;
+ nxt_port_t *port;
+ struct iovec iob[NXT_IOBUF_MAX];
+ nxt_queue_link_t *link;
+ nxt_port_send_msg_t *msg;
+ nxt_sendbuf_coalesce_t sb;
+
+ port = obj;
+
+ do {
+ link = nxt_queue_first(&port->messages);
+
+ if (link == nxt_queue_tail(&port->messages)) {
+ nxt_event_fd_block_write(task->thread->engine, &port->socket);
+ return;
+ }
+
+ msg = (nxt_port_send_msg_t *) link;
+
+ nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t));
+
+ sb.buf = msg->buf;
+ sb.iobuf = &iob[1];
+ sb.nmax = NXT_IOBUF_MAX - 1;
+ sb.sync = 0;
+ sb.last = 0;
+ sb.size = sizeof(nxt_port_msg_t);
+ sb.limit = port->max_size;
+
+ niob = nxt_sendbuf_mem_coalesce(task, &sb);
+
+ msg->port_msg.last = sb.last;
+
+ n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1);
+
+ if (n > 0) {
+ if (nxt_slow_path((size_t) n != sb.size)) {
+ nxt_log(task, NXT_LOG_CRIT,
+ "port %d: short write: %z instead of %uz",
+ port->socket.fd, n, sb.size);
+ goto fail;
+ }
+
+ msg->buf = nxt_sendbuf_completion(task,
+ port->socket.write_work_queue,
+ msg->buf,
+ n - sizeof(nxt_port_msg_t));
+
+ if (msg->buf != NULL) {
+ /*
+ * A file descriptor is sent only
+ * in the first message of a stream.
+ */
+ msg->fd = -1;
+ msg->share += n;
+
+ if (msg->share >= port->max_share) {
+ msg->share = 0;
+ nxt_queue_remove(link);
+ nxt_queue_insert_tail(&port->messages, link);
+ }
+
+ } else {
+ nxt_queue_remove(link);
+ nxt_mem_cache_free0(port->mem_pool, msg,
+ sizeof(nxt_port_send_msg_t));
+ }
+
+ } else if (nxt_slow_path(n == NXT_ERROR)) {
+ goto fail;
+ }
+
+ /* n == NXT_AGAIN */
+
+ } while (port->socket.write_ready);
+
+ if (nxt_event_fd_is_disabled(port->socket.write)) {
+ nxt_event_fd_enable_write(task->thread->engine, &port->socket);
+ }
+
+ return;
+
+fail:
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+}
+
+
+void
+nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
+{
+ port->socket.fd = port->pair[0];
+ port->socket.log = &nxt_main_log;
+
+ port->task.thread = task->thread;
+ port->task.log = port->socket.log;
+ port->task.ident = nxt_task_next_ident();
+
+ port->socket.task = &port->task;
+
+ port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
+ port->socket.read_handler = nxt_port_read_handler;
+ port->socket.error_handler = nxt_port_error_handler;
+
+ nxt_event_fd_enable_read(task->thread->engine, &port->socket);
+}
+
+
+void
+nxt_port_read_close(nxt_port_t *port)
+{
+ nxt_socket_close(port->pair[0]);
+ port->pair[0] = -1;
+}
+
+
+static void
+nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_fd_t fd;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_iobuf_t iob[2];
+ nxt_port_msg_t msg;
+
+ port = obj;
+
+ for ( ;; ) {
+
+ b = nxt_port_buf_alloc(port);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* TODO: disable event for some time */
+ }
+
+ nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t));
+ nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size);
+
+ n = nxt_socketpair_recv(&port->socket, &fd, iob, 2);
+
+ if (n > 0) {
+ nxt_port_read_msg_process(task, port, &msg, fd, b, n);
+
+ if (b->mem.pos == b->mem.free) {
+
+ if (b->next != NULL) {
+ /* A sync buffer */
+ nxt_buf_free(port->mem_pool, b->next);
+ }
+
+ nxt_port_buf_free(port, b);
+ }
+
+ if (port->socket.read_ready) {
+ continue;
+ }
+
+ return;
+ }
+
+ if (n == NXT_AGAIN) {
+ nxt_port_buf_free(port, b);
+
+ nxt_event_fd_enable_read(task->thread->engine, &port->socket);
+ return;
+ }
+
+ /* n == 0 || n == NXT_ERROR */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+ return;
+ }
+}
+
+
+static void
+nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
+{
+ nxt_buf_t *sync;
+ nxt_port_recv_msg_t recv_msg;
+
+ if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
+ nxt_log(port->socket.task, NXT_LOG_CRIT,
+ "port %d: too small message:%uz", port->socket.fd, size);
+ goto fail;
+ }
+
+ recv_msg.stream = msg->stream;
+ recv_msg.type = msg->type;
+ recv_msg.fd = fd;
+ recv_msg.buf = b;
+ recv_msg.port = port;
+
+ b->mem.free += size - sizeof(nxt_port_msg_t);
+
+ if (msg->last) {
+ sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(sync == NULL)) {
+ goto fail;
+ }
+
+ b->next = sync;
+ }
+
+ port->handler(task, &recv_msg);
+
+ return;
+
+fail:
+
+ if (fd != -1) {
+ nxt_fd_close(fd);
+ }
+}
+
+
+static nxt_buf_t *
+nxt_port_buf_alloc(nxt_port_t *port)
+{
+ nxt_buf_t *b;
+
+ if (port->free_bufs != NULL) {
+ b = port->free_bufs;
+ port->free_bufs = b->next;
+
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+
+ } else {
+ b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+ }
+
+ return b;
+}
+
+
+static void
+nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
+{
+ b->next = port->free_bufs;
+ port->free_bufs = b;
+}
+
+
+static void
+nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
+{
+ /* TODO */
+}
diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h
new file mode 100644
index 00000000..b72efeba
--- /dev/null
+++ b/src/nxt_port_socket.h
@@ -0,0 +1,75 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_SOCKET_H_INCLUDED_
+#define _NXT_PORT_SOCKET_H_INCLUDED_
+
+
+typedef struct {
+ uint32_t stream;
+
+ uint16_t type;
+ uint8_t last; /* 1 bit */
+} nxt_port_msg_t;
+
+
+typedef struct {
+ nxt_queue_link_t link;
+ nxt_buf_t *buf;
+ size_t share;
+ nxt_fd_t fd;
+ nxt_port_msg_t port_msg;
+} nxt_port_send_msg_t;
+
+
+typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
+typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
+
+typedef struct {
+ /* Must be the first field. */
+ nxt_event_fd_t socket;
+
+ nxt_task_t task;
+
+ nxt_queue_t messages; /* of nxt_port_send_msg_t */
+
+ /* Maximum size of message part. */
+ uint32_t max_size;
+ /* Maximum interleave of message parts. */
+ uint32_t max_share;
+
+ nxt_port_handler_t handler;
+ void *data;
+
+ nxt_mem_pool_t *mem_pool;
+ nxt_buf_t *free_bufs;
+ nxt_socket_t pair[2];
+} nxt_port_t;
+
+
+struct nxt_port_recv_msg_s {
+ uint32_t stream;
+ uint16_t type;
+
+ nxt_fd_t fd;
+ nxt_buf_t *buf;
+ nxt_port_t *port;
+};
+
+
+NXT_EXPORT nxt_port_t *nxt_port_alloc(void);
+NXT_EXPORT nxt_port_t *nxt_port_create(size_t bufsize);
+NXT_EXPORT void nxt_port_destroy(nxt_port_t *port);
+NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
+NXT_EXPORT void nxt_port_write_close(nxt_port_t *port);
+NXT_EXPORT void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
+NXT_EXPORT void nxt_port_read_close(nxt_port_t *port);
+NXT_EXPORT nxt_int_t nxt_port_write(nxt_task_t *task, nxt_port_t *port,
+ nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
+
+
+#endif /* _NXT_PORT_SOCKET_H_INCLUDED_ */
diff --git a/src/nxt_process_chan.c b/src/nxt_process_chan.c
deleted file mode 100644
index 2986f62b..00000000
--- a/src/nxt_process_chan.c
+++ /dev/null
@@ -1,265 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-#include <nxt_cycle.h>
-#include <nxt_process_chan.h>
-
-
-static void nxt_process_chan_handler(nxt_task_t *task,
- nxt_chan_recv_msg_t *msg);
-static void nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj,
- void *data);
-
-
-void
-nxt_process_chan_create(nxt_thread_t *thr, nxt_process_chan_t *proc,
- nxt_process_chan_handler_t *handlers)
-{
- proc->pid = nxt_pid;
- proc->engine = thr->engine->id;
- proc->chan->handler = nxt_process_chan_handler;
- proc->chan->data = handlers;
-
- nxt_chan_write_close(proc->chan);
- nxt_chan_read_enable(&thr->engine->task, proc->chan);
-}
-
-
-void
-nxt_process_chan_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
-{
- nxt_uint_t i, n;
- nxt_process_chan_t *proc;
-
- proc = cycle->processes->elts;
- n = cycle->processes->nelts;
-
- for (i = 0; i < n; i++) {
- if (nxt_pid != proc[i].pid) {
- (void) nxt_chan_write(task, proc[i].chan, type, fd, stream, b);
- }
- }
-}
-
-
-static void
-nxt_process_chan_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
-{
- nxt_process_chan_handler_t *handlers;
-
- if (nxt_fast_path(msg->type <= NXT_CHAN_MSG_MAX)) {
-
- nxt_debug(task, "chan %d: message type:%uD",
- msg->chan->socket.fd, msg->type);
-
- handlers = msg->chan->data;
- handlers[msg->type](task, msg);
-
- return;
- }
-
- nxt_log(task, NXT_LOG_CRIT, "chan %d: unknown message type:%uD",
- msg->chan->socket.fd, msg->type);
-}
-
-
-void
-nxt_process_chan_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
-{
- nxt_cycle_quit(task, NULL);
-}
-
-
-void
-nxt_process_new_chan(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_process_chan_t *proc)
-{
- nxt_buf_t *b;
- nxt_uint_t i, n;
- nxt_process_chan_t *p;
- nxt_proc_msg_new_chan_t *new_chan;
-
- n = cycle->processes->nelts;
- if (n == 0) {
- return;
- }
-
- nxt_thread_log_debug("new chan %d for process %PI engine %uD",
- proc->chan->socket.fd, proc->pid, proc->engine);
-
- p = cycle->processes->elts;
-
- for (i = 0; i < n; i++) {
-
- if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) {
- continue;
- }
-
- b = nxt_buf_mem_alloc(p[i].chan->mem_pool,
- sizeof(nxt_process_chan_data_t), 0);
-
- if (nxt_slow_path(b == NULL)) {
- continue;
- }
-
- b->data = p[i].chan;
- b->completion_handler = nxt_process_new_chan_buf_completion;
- b->mem.free += sizeof(nxt_proc_msg_new_chan_t);
- new_chan = (nxt_proc_msg_new_chan_t *) b->mem.pos;
-
- new_chan->pid = proc->pid;
- new_chan->engine = proc->engine;
- new_chan->max_size = p[i].chan->max_size;
- new_chan->max_share = p[i].chan->max_share;
-
- (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_NEW_CHAN,
- proc->chan->socket.fd, 0, b);
- }
-}
-
-
-static void
-nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *b;
- nxt_chan_t *chan;
-
- b = obj;
- chan = b->data;
-
- /* TODO: b->mem.pos */
-
- nxt_buf_free(chan->mem_pool, b);
-}
-
-
-void
-nxt_process_chan_new_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
-{
- nxt_chan_t *chan;
- nxt_cycle_t *cycle;
- nxt_process_chan_t *proc;
- nxt_proc_msg_new_chan_t *new_chan;
-
- cycle = nxt_thread_cycle();
-
- proc = nxt_array_add(cycle->processes);
- if (nxt_slow_path(proc == NULL)) {
- return;
- }
-
- chan = nxt_chan_alloc();
- if (nxt_slow_path(chan == NULL)) {
- return;
- }
-
- proc->chan = chan;
-
- new_chan = (nxt_proc_msg_new_chan_t *) msg->buf->mem.pos;
- msg->buf->mem.pos = msg->buf->mem.free;
-
- nxt_debug(task, "new chan %d received for process %PI engine %uD",
- msg->fd, new_chan->pid, new_chan->engine);
-
- proc->pid = new_chan->pid;
- proc->engine = new_chan->engine;
- chan->pair[1] = msg->fd;
- chan->max_size = new_chan->max_size;
- chan->max_share = new_chan->max_share;
-
- /* A read chan is not passed at all. */
- nxt_chan_write_enable(task, chan);
-}
-
-
-void
-nxt_process_chan_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_uint_t slot, nxt_fd_t fd)
-{
- nxt_buf_t *b;
- nxt_uint_t i, n;
- nxt_process_chan_t *p;
-
- n = cycle->processes->nelts;
- if (n == 0) {
- return;
- }
-
- nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd);
-
- p = cycle->processes->elts;
-
- /* p[0] is master process. */
-
- for (i = 1; i < n; i++) {
- b = nxt_buf_mem_alloc(p[i].chan->mem_pool,
- sizeof(nxt_process_chan_data_t), 0);
-
- if (nxt_slow_path(b == NULL)) {
- continue;
- }
-
- *(nxt_uint_t *) b->mem.pos = slot;
- b->mem.free += sizeof(nxt_uint_t);
-
- (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_CHANGE_FILE,
- fd, 0, b);
- }
-}
-
-
-void
-nxt_process_chan_change_log_file_handler(nxt_task_t *task,
- nxt_chan_recv_msg_t *msg)
-{
- nxt_buf_t *b;
- nxt_uint_t slot;
- nxt_file_t *log_file;
- nxt_cycle_t *cycle;
-
- cycle = nxt_thread_cycle();
-
- b = msg->buf;
- slot = *(nxt_uint_t *) b->mem.pos;
-
- log_file = nxt_list_elt(cycle->log_files, slot);
-
- nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
-
- /*
- * The old log file descriptor must be closed at the moment when no
- * other threads use it. dup2() allows to use the old file descriptor
- * for new log file. This change is performed atomically in the kernel.
- */
- if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
-
- if (slot == 0) {
- (void) nxt_file_stderr(log_file);
- }
- }
-}
-
-
-void
-nxt_process_chan_data_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
-{
- nxt_buf_t *b;
-
- b = msg->buf;
-
- nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
-
- b->mem.pos = b->mem.free;
-}
-
-
-void
-nxt_process_chan_empty_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
-{
- nxt_debug(task, "chan empty handler");
-}
diff --git a/src/nxt_process_chan.h b/src/nxt_process_chan.h
deleted file mode 100644
index 18ff7be3..00000000
--- a/src/nxt_process_chan.h
+++ /dev/null
@@ -1,68 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_PROCESS_CHAN_H_INCLUDED_
-#define _NXT_PROCESS_CHAN_H_INCLUDED_
-
-
-#define NXT_CHAN_MSG_MAX NXT_CHAN_MSG_DATA
-
-typedef enum {
- NXT_CHAN_MSG_QUIT = 0,
- NXT_CHAN_MSG_NEW_CHAN,
- NXT_CHAN_MSG_CHANGE_FILE,
- NXT_CHAN_MSG_DATA,
-} nxt_chan_msg_type_e;
-
-
-typedef struct {
- nxt_pid_t pid;
- uint32_t engine;
- uint32_t generation;
- nxt_chan_t *chan;
-} nxt_process_chan_t;
-
-
-typedef struct {
- nxt_pid_t pid;
- uint32_t engine;
- size_t max_size;
- size_t max_share;
-} nxt_proc_msg_new_chan_t;
-
-
-/*
- * nxt_process_chan_data_t is allocaiton size
- * enabling effective reuse of memory pool cache.
- */
-typedef union {
- nxt_buf_t buf;
- nxt_proc_msg_new_chan_t new_chan;
-} nxt_process_chan_data_t;
-
-
-typedef void (*nxt_process_chan_handler_t)(nxt_task_t *task,
- nxt_chan_recv_msg_t *msg);
-
-
-void nxt_process_chan_create(nxt_thread_t *thr, nxt_process_chan_t *proc,
- nxt_process_chan_handler_t *handlers);
-void nxt_process_chan_write(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
-void nxt_process_new_chan(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_process_chan_t *proc);
-void nxt_process_chan_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_uint_t slot, nxt_fd_t fd);
-
-void nxt_process_chan_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg);
-void nxt_process_chan_new_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg);
-void nxt_process_chan_change_log_file_handler(nxt_task_t *task,
- nxt_chan_recv_msg_t *msg);
-void nxt_process_chan_data_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg);
-void nxt_process_chan_empty_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg);
-
-
-#endif /* _NXT_PROCESS_CHAN_H_INCLUDED_ */
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index f6d1b764..a30ac9e6 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -6,13 +6,13 @@
#include <nxt_main.h>
#include <nxt_cycle.h>
-#include <nxt_process_chan.h>
+#include <nxt_port.h>
#include <nxt_master_process.h>
static void nxt_worker_process_quit(nxt_task_t *task);
static void nxt_worker_process_quit_handler(nxt_task_t *task,
- nxt_chan_recv_msg_t *msg);
+ nxt_port_recv_msg_t *msg);
static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj,
@@ -21,11 +21,11 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
-static nxt_process_chan_handler_t nxt_worker_process_chan_handlers[] = {
+static nxt_process_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
- nxt_process_chan_new_handler,
- nxt_process_chan_change_log_file_handler,
- nxt_process_chan_data_handler,
+ nxt_process_port_new_handler,
+ nxt_process_port_change_log_file_handler,
+ nxt_process_port_data_handler,
};
@@ -47,7 +47,7 @@ nxt_worker_process_start(void *data)
nxt_int_t n;
nxt_cycle_t *cycle;
nxt_thread_t *thr;
- nxt_process_chan_t *proc;
+ nxt_process_port_t *proc;
const nxt_event_set_ops_t *event_set;
cycle = data;
@@ -94,13 +94,13 @@ nxt_worker_process_start(void *data)
proc = cycle->processes->elts;
- /* A master process chan. */
- nxt_chan_read_close(proc[0].chan);
- nxt_chan_write_enable(&nxt_main_task, proc[0].chan);
+ /* A master process port. */
+ nxt_port_read_close(proc[0].port);
+ nxt_port_write_enable(&nxt_main_task, proc[0].port);
- /* A worker process chan. */
- nxt_process_chan_create(thr, &proc[cycle->current_process],
- nxt_worker_process_chan_handlers);
+ /* A worker process port. */
+ nxt_process_port_create(thr, &proc[cycle->current_process],
+ nxt_worker_process_port_handlers);
#if (NXT_THREADS)
{
@@ -182,7 +182,7 @@ nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data)
static void
-nxt_worker_process_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg)
+nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_worker_process_quit(task);
}