summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--auto/sources3
-rw-r--r--src/nxt_cycle.c19
-rw-r--r--src/nxt_cycle.h3
-rw-r--r--src/nxt_main.h4
-rw-r--r--src/nxt_master_process.c92
-rw-r--r--src/nxt_port.c151
-rw-r--r--src/nxt_port.h125
-rw-r--r--src/nxt_port_socket.c72
-rw-r--r--src/nxt_port_socket.h73
-rw-r--r--src/nxt_sendbuf.c10
-rw-r--r--src/nxt_worker_process.c20
11 files changed, 272 insertions, 300 deletions
diff --git a/auto/sources b/auto/sources
index 3a5cab9a..373813e0 100644
--- a/auto/sources
+++ b/auto/sources
@@ -17,7 +17,7 @@ NXT_LIB_DEPS=" \
src/nxt_socket.h \
src/nxt_process.h \
src/nxt_signal.h \
- src/nxt_port_socket.h \
+ src/nxt_port.h \
src/nxt_dyld.h \
src/nxt_thread.h \
src/nxt_thread_id.h \
@@ -319,7 +319,6 @@ fi
NXT_DEPS=" \
src/nxt_cycle.h \
- src/nxt_port.h \
src/nxt_application.h \
src/nxt_master_process.h \
"
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index 81ad8ce2..e95421d2 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -344,34 +344,33 @@ nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle)
static nxt_int_t
nxt_cycle_processes(nxt_cycle_t *cycle)
{
- nxt_uint_t n;
- nxt_process_port_t *proc, *prev;
+ nxt_uint_t n;
+ nxt_port_t *port, *prev;
/*
* Preallocate double number of previous cycle
* process slots or 2 process slots for initial cycle.
*/
- n = (cycle->previous != NULL) ? cycle->previous->processes->nelts : 1;
+ n = (cycle->previous != NULL) ? cycle->previous->ports->nelts : 1;
- cycle->processes = nxt_array_create(cycle->mem_pool, 2 * n,
- sizeof(nxt_process_port_t));
+ cycle->ports = nxt_array_create(cycle->mem_pool, 2 * n, sizeof(nxt_port_t));
- if (nxt_slow_path(cycle->processes == NULL)) {
+ if (nxt_slow_path(cycle->ports == NULL)) {
return NXT_ERROR;
}
if (cycle->previous != NULL) {
cycle->process_generation = cycle->previous->process_generation;
- prev = cycle->previous->processes->elts;
+ prev = cycle->previous->ports->elts;
while (n != 0) {
- proc = nxt_array_add(cycle->processes);
- if (nxt_slow_path(proc == NULL)) {
+ port = nxt_array_add(cycle->ports);
+ if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
- *proc = *prev++;
+ *port = *prev++;
n--;
}
}
diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h
index 6dbde173..088a91c3 100644
--- a/src/nxt_cycle.h
+++ b/src/nxt_cycle.h
@@ -16,7 +16,6 @@ typedef enum {
} nxt_process_type_e;
-typedef struct nxt_cycle_s nxt_cycle_t;
typedef void (*nxt_cycle_cont_t)(nxt_task_t *task, nxt_cycle_t *cycle);
@@ -47,7 +46,7 @@ struct nxt_cycle_s {
nxt_cycle_cont_t continuation;
#endif
- nxt_array_t *processes; /* of nxt_process_port_t */
+ nxt_array_t *ports; /* of nxt_port_t */
nxt_list_t *log_files; /* of nxt_file_t */
diff --git a/src/nxt_main.h b/src/nxt_main.h
index b692dc92..1388a2f3 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -121,7 +121,8 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_fd_event.h>
-#include <nxt_port_socket.h>
+typedef struct nxt_cycle_s nxt_cycle_t;
+#include <nxt_port.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
#endif
@@ -153,6 +154,7 @@ typedef struct nxt_upstream_source_s nxt_upstream_source_t;
#include <nxt_http_parse.h>
#include <nxt_http_source.h>
#include <nxt_fastcgi_source.h>
+#include <nxt_cycle.h>
#if (NXT_LIB_UNIT_TEST)
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index ceb44e1f..1e613b45 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -71,26 +71,27 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
static nxt_int_t
nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_process_port_t *proc;
+ nxt_int_t ret;
+ nxt_port_t *port;
- proc = nxt_array_add(cycle->processes);
- if (nxt_slow_path(proc == NULL)) {
+ port = nxt_array_zero_add(cycle->ports);
+ if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
- proc->pid = nxt_pid;
- proc->engine = 0;
-
- proc->port = nxt_port_create(task, 0);
- if (nxt_slow_path(proc->port == NULL)) {
- return NXT_ERROR;
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
}
+ port->pid = nxt_pid;
+ port->engine = 0;
+
/*
* A master process port. A write port is not closed
* since it should be inherited by worker processes.
*/
- nxt_port_read_enable(task, proc->port);
+ nxt_port_read_enable(task, port);
return NXT_OK;
}
@@ -143,24 +144,25 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
static nxt_int_t
nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_pid_t pid;
- nxt_process_port_t *proc;
+ nxt_int_t ret;
+ nxt_pid_t pid;
+ nxt_port_t *port;
- proc = nxt_array_add(cycle->processes);
- if (nxt_slow_path(proc == NULL)) {
+ port = nxt_array_zero_add(cycle->ports);
+ if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
- cycle->current_process = cycle->processes->nelts - 1;
+ cycle->current_process = cycle->ports->nelts - 1;
- proc->engine = 0;
- proc->generation = cycle->process_generation;
-
- proc->port = nxt_port_create(task, 0);
- if (nxt_slow_path(proc->port == NULL)) {
- return NXT_ERROR;
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
}
+ port->engine = 0;
+ port->generation = cycle->process_generation;
+
pid = nxt_process_create(nxt_worker_process_start, cycle,
"start worker process");
@@ -175,12 +177,12 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
default:
/* The master process created a new process. */
- proc->pid = pid;
+ port->pid = pid;
- nxt_port_read_close(proc->port);
- nxt_port_write_enable(task, proc->port);
+ nxt_port_read_close(port);
+ nxt_port_write_enable(task, port);
- nxt_process_new_port(task, cycle, proc);
+ nxt_port_send_new_port(task, cycle, port);
return NXT_OK;
}
}
@@ -250,24 +252,24 @@ static void
nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
void *data)
{
- uint32_t generation;
- nxt_uint_t i, n;
- nxt_cycle_t *cycle;
- nxt_process_port_t *proc;
+ uint32_t generation;
+ nxt_uint_t i, n;
+ nxt_port_t *port;
+ nxt_cycle_t *cycle;
cycle = nxt_thread_cycle();
- proc = cycle->processes->elts;
- n = cycle->processes->nelts;
+ port = cycle->ports->elts;
+ n = cycle->ports->nelts;
generation = cycle->process_generation - 1;
- /* The proc[0] is the master process. */
+ /* The port[0] is the master process port. */
for (i = 1; i < n; i++) {
- if (proc[i].generation == generation) {
- (void) nxt_port_write(task, proc[i].port, NXT_PORT_MSG_QUIT,
- -1, 0, NULL);
+ if (port[i].generation == generation) {
+ (void) nxt_port_socket_write(task, &port[i],
+ NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
}
@@ -278,7 +280,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_process_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
+ nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
@@ -366,7 +368,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
nxt_list_each(file, cycle->log_files) {
- nxt_process_port_change_log_file(task, cycle, n, new_file[n].fd);
+ nxt_port_change_log_file(task, cycle, n, new_file[n].fd);
/*
* The old log file descriptor must be closed at the moment
* when no other threads use it. dup2() allows to use the
@@ -617,9 +619,9 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
{
- nxt_uint_t i, n, generation;
- nxt_cycle_t *cycle;
- nxt_process_port_t *proc;
+ nxt_uint_t i, n, generation;
+ nxt_port_t *port;
+ nxt_cycle_t *cycle;
cycle = nxt_thread_cycle();
@@ -630,15 +632,15 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
return;
}
- proc = cycle->processes->elts;
- n = cycle->processes->nelts;
+ port = cycle->ports->elts;
+ n = cycle->ports->nelts;
for (i = 0; i < n; i++) {
- if (pid == proc[i].pid) {
- generation = proc[i].generation;
+ if (pid == port[i].pid) {
+ generation = port[i].generation;
- nxt_array_remove(cycle->processes, &proc[i]);
+ nxt_array_remove(cycle->ports, &port[i]);
if (nxt_exiting) {
nxt_debug(task, "processes %d", n);
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 7da2bcc4..8a56fc6b 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -9,48 +9,47 @@
#include <nxt_port.h>
-static void nxt_process_port_handler(nxt_task_t *task,
- nxt_port_recv_msg_t *msg);
-static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj,
+static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj,
void *data);
void
-nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
- nxt_process_port_handler_t *handlers)
+nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
+ nxt_port_handler_t *handlers)
{
- proc->pid = nxt_pid;
- proc->engine = thr->engine->id;
- proc->port->handler = nxt_process_port_handler;
- proc->port->data = handlers;
+ port->pid = nxt_pid;
+ port->engine = thread->engine->id;
+ port->handler = nxt_port_handler;
+ port->data = handlers;
- nxt_port_write_close(proc->port);
- nxt_port_read_enable(&thr->engine->task, proc->port);
+ nxt_port_write_close(port);
+ nxt_port_read_enable(&thread->engine->task, port);
}
void
-nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
+nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
- nxt_uint_t i, n;
- nxt_process_port_t *proc;
+ nxt_uint_t i, n;
+ nxt_port_t *port;
- proc = cycle->processes->elts;
- n = cycle->processes->nelts;
+ port = cycle->ports->elts;
+ n = cycle->ports->nelts;
for (i = 0; i < n; i++) {
- if (nxt_pid != proc[i].pid) {
- (void) nxt_port_write(task, proc[i].port, type, fd, stream, b);
+ if (nxt_pid != port[i].pid) {
+ (void) nxt_port_socket_write(task, &port[i], type, fd, stream, b);
}
}
}
static void
-nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- nxt_process_port_handler_t *handlers;
+ nxt_port_handler_t *handlers;
if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
@@ -69,62 +68,64 @@ nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
-nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_cycle_quit(task, NULL);
}
void
-nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_process_port_t *proc)
+nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_port_t *new_port)
{
nxt_buf_t *b;
nxt_uint_t i, n;
- nxt_process_port_t *p;
- nxt_proc_msg_new_port_t *new_port;
+ nxt_port_t *port;
+ nxt_port_msg_new_port_t *msg;
- n = cycle->processes->nelts;
+ n = cycle->ports->nelts;
if (n == 0) {
return;
}
- nxt_thread_log_debug("new port %d for process %PI engine %uD",
- proc->port->socket.fd, proc->pid, proc->engine);
+ nxt_debug(task, "new port %d for process %PI engine %uD",
+ new_port->socket.fd, new_port->pid, new_port->engine);
- p = cycle->processes->elts;
+ port = cycle->ports->elts;
for (i = 0; i < n; i++) {
- if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) {
+ if (port[i].pid == new_port->pid
+ || port[i].pid == nxt_pid
+ || port[i].engine != 0)
+ {
continue;
}
- b = nxt_buf_mem_alloc(p[i].port->mem_pool,
- sizeof(nxt_process_port_data_t), 0);
+ b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
}
- b->data = p[i].port;
- b->completion_handler = nxt_process_new_port_buf_completion;
- b->mem.free += sizeof(nxt_proc_msg_new_port_t);
- new_port = (nxt_proc_msg_new_port_t *) b->mem.pos;
+ b->data = &port[i];
+ b->completion_handler = nxt_port_new_port_buf_completion;
+ b->mem.free += sizeof(nxt_port_msg_new_port_t);
+ msg = (nxt_port_msg_new_port_t *) b->mem.pos;
- new_port->pid = proc->pid;
- new_port->engine = proc->engine;
- new_port->max_size = p[i].port->max_size;
- new_port->max_share = p[i].port->max_share;
+ msg->pid = new_port->pid;
+ msg->engine = new_port->engine;
+ msg->max_size = port[i].max_size;
+ msg->max_share = port[i].max_share;
- (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT,
- proc->port->socket.fd, 0, b);
+ (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT,
+ new_port->socket.fd, 0, b);
}
}
static void
-nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
+nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_port_t *port;
@@ -139,66 +140,69 @@ nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
void
-nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
nxt_cycle_t *cycle;
- nxt_process_port_t *proc;
- nxt_proc_msg_new_port_t *new_port;
+ nxt_mem_pool_t *mp;
+ nxt_port_msg_new_port_t *new_port_msg;
cycle = nxt_thread_cycle();
- proc = nxt_array_add(cycle->processes);
- if (nxt_slow_path(proc == NULL)) {
+ port = nxt_array_add(cycle->ports);
+ if (nxt_slow_path(port == NULL)) {
return;
}
- port = nxt_port_alloc(task);
- if (nxt_slow_path(port == NULL)) {
+ mp = nxt_mem_pool_create(1024);
+ if (nxt_slow_path(mp == NULL)) {
return;
}
- proc->port = port;
+ port->mem_pool = mp;
- new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos;
+ new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
msg->buf->mem.pos = msg->buf->mem.free;
nxt_debug(task, "new port %d received for process %PI engine %uD",
- msg->fd, new_port->pid, new_port->engine);
+ msg->fd, new_port_msg->pid, new_port_msg->engine);
- proc->pid = new_port->pid;
- proc->engine = new_port->engine;
+ port->pid = new_port_msg->pid;
+ port->engine = new_port_msg->engine;
+ port->pair[0] = -1;
port->pair[1] = msg->fd;
- port->max_size = new_port->max_size;
- port->max_share = new_port->max_share;
+ port->max_size = new_port_msg->max_size;
+ port->max_share = new_port_msg->max_share;
+
+ nxt_queue_init(&port->messages);
+
+ port->socket.task = task;
- /* A read port is not passed at all. */
nxt_port_write_enable(task, port);
}
void
-nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
+nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_uint_t slot, nxt_fd_t fd)
{
- nxt_buf_t *b;
- nxt_uint_t i, n;
- nxt_process_port_t *p;
+ nxt_buf_t *b;
+ nxt_uint_t i, n;
+ nxt_port_t *port;
- n = cycle->processes->nelts;
+ n = cycle->ports->nelts;
if (n == 0) {
return;
}
- nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd);
+ nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
- p = cycle->processes->elts;
+ port = cycle->ports->elts;
- /* p[0] is master process. */
+ /* port[0] is master process port. */
for (i = 1; i < n; i++) {
- b = nxt_buf_mem_alloc(p[i].port->mem_pool,
- sizeof(nxt_process_port_data_t), 0);
+ b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
@@ -207,15 +211,14 @@ nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
*(nxt_uint_t *) b->mem.pos = slot;
b->mem.free += sizeof(nxt_uint_t);
- (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE,
- fd, 0, b);
+ (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE,
+ fd, 0, b);
}
}
void
-nxt_process_port_change_log_file_handler(nxt_task_t *task,
- nxt_port_recv_msg_t *msg)
+nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b;
nxt_uint_t slot;
@@ -246,7 +249,7 @@ nxt_process_port_change_log_file_handler(nxt_task_t *task,
void
-nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b;
@@ -259,7 +262,7 @@ nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
-nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "port empty handler");
}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index b195bcd7..c27b1bc4 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -8,61 +8,116 @@
#define _NXT_PORT_H_INCLUDED_
+typedef struct nxt_port_s nxt_port_t;
+
+
+typedef struct {
+ uint32_t stream;
+
+ uint16_t type;
+ uint8_t last; /* 1 bit */
+} nxt_port_msg_t;
+
+
+typedef struct {
+ nxt_queue_link_t link;
+ nxt_buf_t *buf;
+ size_t share;
+ nxt_fd_t fd;
+ nxt_port_msg_t port_msg;
+} nxt_port_send_msg_t;
+
+
+typedef struct nxt_port_recv_msg_s {
+ uint32_t stream;
+ uint16_t type;
+
+ nxt_fd_t fd;
+ nxt_buf_t *buf;
+ nxt_port_t *port;
+} nxt_port_recv_msg_t;
+
+
+typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
+
+struct nxt_port_s {
+ /* Must be the first field. */
+ nxt_fd_event_t socket;
+
+ nxt_queue_t messages; /* of nxt_port_send_msg_t */
+
+ /* Maximum size of message part. */
+ uint32_t max_size;
+ /* Maximum interleave of message parts. */
+ uint32_t max_share;
+
+ nxt_port_handler_t handler;
+ void *data;
+
+ nxt_mem_pool_t *mem_pool;
+ nxt_buf_t *free_bufs;
+ nxt_socket_t pair[2];
+
+ nxt_pid_t pid;
+ uint32_t engine;
+ uint32_t generation;
+};
+
+
#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA
typedef enum {
NXT_PORT_MSG_QUIT = 0,
NXT_PORT_MSG_NEW_PORT,
- NXT_PORT_MSG_PORTGE_FILE,
+ NXT_PORT_MSG_CHANGE_FILE,
NXT_PORT_MSG_DATA,
} nxt_port_msg_type_e;
typedef struct {
- nxt_pid_t pid;
- uint32_t engine;
- uint32_t generation;
- nxt_port_t *port;
-} nxt_process_port_t;
-
-
-typedef struct {
- nxt_pid_t pid;
- uint32_t engine;
- size_t max_size;
- size_t max_share;
-} nxt_proc_msg_new_port_t;
+ nxt_pid_t pid;
+ uint32_t engine;
+ size_t max_size;
+ size_t max_share;
+} nxt_port_msg_new_port_t;
/*
- * nxt_process_port_data_t is allocaiton size
- * enabling effective reuse of memory pool cache.
+ * nxt_port_data_t size is allocation size
+ * which enables effective reuse of memory pool cache.
*/
typedef union {
nxt_buf_t buf;
- nxt_proc_msg_new_port_t new_port;
-} nxt_process_port_data_t;
-
-
-typedef void (*nxt_process_port_handler_t)(nxt_task_t *task,
- nxt_port_recv_msg_t *msg);
-
-
-void nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
- nxt_process_port_handler_t *handlers);
-void nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_port_msg_new_port_t new_port;
+} nxt_port_data_t;
+
+
+nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
+ size_t max_size);
+void nxt_port_destroy(nxt_port_t *port);
+void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
+void nxt_port_write_close(nxt_port_t *port);
+void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
+void nxt_port_read_close(nxt_port_t *port);
+nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
-void nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
- nxt_process_port_t *proc);
-void nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
+
+void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
+ nxt_port_handler_t *handlers);
+void nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
+void nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
+ nxt_port_t *port);
+void nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_uint_t slot, nxt_fd_t fd);
-void nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-void nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-void nxt_process_port_change_log_file_handler(nxt_task_t *task,
+void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
-void nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-void nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
#endif /* _NXT_PORT_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index a3bc7f26..671c3334 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -16,44 +16,26 @@ static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
-nxt_port_t *
-nxt_port_alloc(nxt_task_t *task)
+nxt_int_t
+nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
{
- nxt_port_t *port;
+ nxt_int_t sndbuf, rcvbuf, size;
+ nxt_socket_t snd, rcv;
nxt_mem_pool_t *mp;
- mp = nxt_mem_pool_create(1024);
-
- if (nxt_fast_path(mp != NULL)) {
- /* This allocation cannot fail. */
- port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
- port->mem_pool = mp;
-
- port->socket.task = task;
+ port->socket.task = task;
- port->pair[0] = -1;
- port->pair[1] = -1;
+ port->pair[0] = -1;
+ port->pair[1] = -1;
- nxt_queue_init(&port->messages);
+ nxt_queue_init(&port->messages);
- return port;
+ mp = nxt_mem_pool_create(1024);
+ if (nxt_slow_path(mp == NULL)) {
+ return NXT_ERROR;
}
- return NULL;
-}
-
-
-nxt_port_t *
-nxt_port_create(nxt_task_t *task, size_t max_size)
-{
- nxt_int_t sndbuf, rcvbuf, size;
- nxt_port_t *port;
- nxt_socket_t snd, rcv;
-
- port = nxt_port_alloc(task);
- if (nxt_slow_path(port == NULL)) {
- return NULL;
- }
+ port->mem_pool = mp;
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
@@ -109,7 +91,7 @@ nxt_port_create(nxt_task_t *task, size_t max_size)
port->max_size = nxt_min(max_size, (size_t) sndbuf);
port->max_share = (64 * 1024);
- return port;
+ return NXT_OK;
getsockopt_fail:
@@ -120,7 +102,7 @@ socketpair_fail:
nxt_mem_pool_destroy(port->mem_pool);
- return NULL;
+ return NXT_ERROR;
}
@@ -154,7 +136,7 @@ nxt_port_write_close(nxt_port_t *port)
nxt_int_t
-nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
nxt_queue_link_t *link;
@@ -204,9 +186,9 @@ static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
- nxt_uint_t niob;
+ nxt_uint_t niov;
nxt_port_t *port;
- struct iovec iob[NXT_IOBUF_MAX];
+ struct iovec iov[NXT_IOBUF_MAX];
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
@@ -223,21 +205,22 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg = (nxt_port_send_msg_t *) link;
- nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t));
+ iov[0].iov_base = &msg->port_msg;
+ iov[0].iov_len = sizeof(nxt_port_msg_t);
sb.buf = msg->buf;
- sb.iobuf = &iob[1];
+ sb.iobuf = &iov[1];
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
sb.size = sizeof(nxt_port_msg_t);
sb.limit = port->max_size;
- niob = nxt_sendbuf_mem_coalesce(task, &sb);
+ niov = nxt_sendbuf_mem_coalesce(task, &sb);
msg->port_msg.last = sb.last;
- n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1);
+ n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size)) {
@@ -322,7 +305,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_t *port;
- nxt_iobuf_t iob[2];
+ struct iovec iov[2];
nxt_port_msg_t msg;
port = obj;
@@ -335,10 +318,13 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
/* TODO: disable event for some time */
}
- nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t));
- nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size);
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = sizeof(nxt_port_msg_t);
+
+ iov[1].iov_base = b->mem.pos;
+ iov[1].iov_len = port->max_size;
- n = nxt_socketpair_recv(&port->socket, &fd, iob, 2);
+ n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
if (n > 0) {
nxt_port_read_msg_process(task, port, &msg, fd, b, n);
diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h
deleted file mode 100644
index 4740368d..00000000
--- a/src/nxt_port_socket.h
+++ /dev/null
@@ -1,73 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_PORT_SOCKET_H_INCLUDED_
-#define _NXT_PORT_SOCKET_H_INCLUDED_
-
-
-typedef struct {
- uint32_t stream;
-
- uint16_t type;
- uint8_t last; /* 1 bit */
-} nxt_port_msg_t;
-
-
-typedef struct {
- nxt_queue_link_t link;
- nxt_buf_t *buf;
- size_t share;
- nxt_fd_t fd;
- nxt_port_msg_t port_msg;
-} nxt_port_send_msg_t;
-
-
-typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
-typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
-
-
-typedef struct {
- /* Must be the first field. */
- nxt_fd_event_t socket;
-
- nxt_queue_t messages; /* of nxt_port_send_msg_t */
-
- /* Maximum size of message part. */
- uint32_t max_size;
- /* Maximum interleave of message parts. */
- uint32_t max_share;
-
- nxt_port_handler_t handler;
- void *data;
-
- nxt_mem_pool_t *mem_pool;
- nxt_buf_t *free_bufs;
- nxt_socket_t pair[2];
-} nxt_port_t;
-
-
-struct nxt_port_recv_msg_s {
- uint32_t stream;
- uint16_t type;
-
- nxt_fd_t fd;
- nxt_buf_t *buf;
- nxt_port_t *port;
-};
-
-
-NXT_EXPORT nxt_port_t *nxt_port_alloc(nxt_task_t *task);
-NXT_EXPORT nxt_port_t *nxt_port_create(nxt_task_t *task, size_t bufsize);
-NXT_EXPORT void nxt_port_destroy(nxt_port_t *port);
-NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
-NXT_EXPORT void nxt_port_write_close(nxt_port_t *port);
-NXT_EXPORT void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
-NXT_EXPORT void nxt_port_read_close(nxt_port_t *port);
-NXT_EXPORT nxt_int_t nxt_port_write(nxt_task_t *task, nxt_port_t *port,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
-
-
-#endif /* _NXT_PORT_SOCKET_H_INCLUDED_ */
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 21a25e68..62042504 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -122,15 +122,15 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
goto done;
}
- nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size);
+ sb->iobuf[n].iov_base = b->mem.pos;
+ sb->iobuf[n].iov_len = size;
} else {
- nxt_iobuf_add(&sb->iobuf[n], size);
+ sb->iobuf[n].iov_len += size;
}
- nxt_debug(task, "sendbuf: %ui, %p, %uz", n,
- nxt_iobuf_data(&sb->iobuf[n]),
- nxt_iobuf_size(&sb->iobuf[n]));
+ nxt_debug(task, "sendbuf: %ui, %p, %uz",
+ n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
total += size;
last = b->mem.pos + size;
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 01304f65..a6ef8e80 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -21,11 +21,11 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
-static nxt_process_port_handler_t nxt_worker_process_port_handlers[] = {
+static nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
- nxt_process_port_new_handler,
- nxt_process_port_change_log_file_handler,
- nxt_process_port_data_handler,
+ nxt_port_new_port_handler,
+ nxt_port_change_log_file_handler,
+ nxt_port_data_handler,
};
@@ -45,9 +45,9 @@ void
nxt_worker_process_start(void *data)
{
nxt_int_t n;
+ nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_thread_t *thr;
- nxt_process_port_t *proc;
const nxt_event_interface_t *interface;
cycle = data;
@@ -90,15 +90,15 @@ nxt_worker_process_start(void *data)
goto fail;
}
- proc = cycle->processes->elts;
+ port = cycle->ports->elts;
/* A master process port. */
- nxt_port_read_close(proc[0].port);
- nxt_port_write_enable(&nxt_main_task, proc[0].port);
+ nxt_port_read_close(&port[0]);
+ nxt_port_write_enable(&nxt_main_task, &port[0]);
/* A worker process port. */
- nxt_process_port_create(thr, &proc[cycle->current_process],
- nxt_worker_process_port_handlers);
+ nxt_port_create(thr, &port[cycle->current_process],
+ nxt_worker_process_port_handlers);
#if (NXT_THREADS)
{