summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nginext/nxt_go_lib.c4
-rw-r--r--src/nginext/nxt_go_port.c12
-rw-r--r--src/nginext/nxt_go_port_memory.c4
-rw-r--r--src/nginext/nxt_go_run_ctx.c2
-rw-r--r--src/nxt_controller.c3
-rw-r--r--src/nxt_port.h32
-rw-r--r--src/nxt_port_memory.c42
-rw-r--r--src/nxt_port_socket.c18
-rw-r--r--src/nxt_router.c5
9 files changed, 56 insertions, 66 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c
index 57c3da8e..87f583d7 100644
--- a/src/nginext/nxt_go_lib.c
+++ b/src/nginext/nxt_go_lib.c
@@ -189,8 +189,8 @@ nxt_go_ready()
port_msg.stream = atol(go_stream);
port_msg.pid = getpid();
port_msg.reply_port = 0;
- port_msg.type = NXT_PORT_MSG_READY;
- port_msg.last = 0;
+ port_msg.type = _NXT_PORT_MSG_READY;
+ port_msg.last = 1;
port_msg.mmap = 0;
nxt_go_master_send(&port_msg, sizeof(port_msg), NULL, 0);
diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c
index 58ba90ea..af50b860 100644
--- a/src/nginext/nxt_go_port.c
+++ b/src/nginext/nxt_go_port.c
@@ -152,13 +152,13 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
}
switch (port_msg->type) {
- case NXT_PORT_MSG_QUIT:
+ case _NXT_PORT_MSG_QUIT:
nxt_go_debug("quit");
nxt_go_set_quit();
break;
- case NXT_PORT_MSG_NEW_PORT:
+ case _NXT_PORT_MSG_NEW_PORT:
nxt_go_debug("new port");
new_port_msg = payload;
@@ -166,22 +166,22 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
-1, fd);
break;
- case NXT_PORT_MSG_CHANGE_FILE:
+ case _NXT_PORT_MSG_CHANGE_FILE:
nxt_go_debug("change file");
break;
- case NXT_PORT_MSG_MMAP:
+ case _NXT_PORT_MSG_MMAP:
nxt_go_debug("mmap");
nxt_go_new_incoming_mmap(port_msg->pid, fd);
break;
- case NXT_PORT_MSG_DATA:
+ case _NXT_PORT_MSG_DATA:
nxt_go_debug("data");
return nxt_go_data_handler(port_msg, buf_size);
- case NXT_PORT_MSG_REMOVE_PID:
+ case _NXT_PORT_MSG_REMOVE_PID:
nxt_go_debug("remove pid");
/* TODO remove all ports for this pid in Go */
diff --git a/src/nginext/nxt_go_port_memory.c b/src/nginext/nxt_go_port_memory.c
index 7e3f7a1a..dbdf5f73 100644
--- a/src/nginext/nxt_go_port_memory.c
+++ b/src/nginext/nxt_go_port_memory.c
@@ -112,8 +112,8 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
port_msg.stream = 0;
port_msg.pid = getpid();
port_msg.reply_port = 0;
- port_msg.type = NXT_PORT_MSG_MMAP;
- port_msg.last = 0;
+ port_msg.type = _NXT_PORT_MSG_MMAP;
+ port_msg.last = 1;
port_msg.mmap = 0;
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c
index 0788b0d1..c6481b95 100644
--- a/src/nginext/nxt_go_run_ctx.c
+++ b/src/nginext/nxt_go_run_ctx.c
@@ -165,7 +165,7 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
ctx->wport_msg.stream = port_msg->stream;
ctx->wport_msg.pid = getpid();
- ctx->wport_msg.type = NXT_PORT_MSG_DATA;
+ ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
ctx->wport_msg.mmap = 1;
ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 660441f7..5ef1f944 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -862,7 +862,8 @@ nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
- return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b);
+ return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0,
+ 0, b);
}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index c057007f..27c00132 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -9,15 +9,30 @@
typedef enum {
- NXT_PORT_MSG_QUIT = 0,
- NXT_PORT_MSG_NEW_PORT,
- NXT_PORT_MSG_CHANGE_FILE,
- NXT_PORT_MSG_MMAP,
- NXT_PORT_MSG_DATA,
- NXT_PORT_MSG_REMOVE_PID,
- NXT_PORT_MSG_READY,
+ NXT_PORT_MSG_LAST = 0x100,
+ NXT_PORT_MSG_CLOSE_FD = 0x200,
+
+ NXT_PORT_MSG_MASK = 0xFF,
+
+ _NXT_PORT_MSG_QUIT = 0,
+ _NXT_PORT_MSG_NEW_PORT,
+ _NXT_PORT_MSG_CHANGE_FILE,
+ _NXT_PORT_MSG_MMAP,
+ _NXT_PORT_MSG_DATA,
+ _NXT_PORT_MSG_REMOVE_PID,
+ _NXT_PORT_MSG_READY,
NXT_PORT_MSG_MAX,
+
+ NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
+ NXT_PORT_MSG_CLOSE_FD,
+ NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
+ NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
@@ -27,7 +42,7 @@ typedef struct {
nxt_pid_t pid;
nxt_port_id_t reply_port;
- nxt_port_msg_type_t type:8;
+ uint8_t type;
uint8_t last; /* 1 bit */
/* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
@@ -40,6 +55,7 @@ typedef struct {
nxt_buf_t *buf;
size_t share;
nxt_fd_t fd;
+ nxt_bool_t close_fd;
nxt_port_msg_t port_msg;
nxt_work_t work;
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 32d4aa5f..454827f0 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -198,37 +198,6 @@ fail:
}
-static void
-nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_fd_t fd;
- nxt_buf_t *b;
- nxt_mp_t *mp;
-
- if (nxt_buf_ts_handle(task, obj, data)) {
- return;
- }
-
- b = obj;
- mp = b->data;
- fd = (nxt_fd_t) (intptr_t) data;
-
-#if (NXT_DEBUG)
- if (nxt_slow_path(data != b->parent)) {
- nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
- data, b->parent);
- nxt_abort();
- }
-#endif
-
- nxt_debug(task, "mmap fd %FD has been sent", fd);
-
- nxt_fd_close(fd);
-
- nxt_mp_release(mp, b);
-}
-
-
static nxt_port_mmap_header_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
@@ -236,7 +205,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
void *mem;
u_char *p, name[64];
nxt_fd_t fd;
- nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
@@ -310,14 +278,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port_mmap->hdr = mem;
- b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
- if (nxt_slow_path(b == NULL)) {
- goto remove_fail;
- }
-
- b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
- b->parent = (void *) (intptr_t) fd;
-
/* Init segment header. */
hdr = port_mmap->hdr;
@@ -336,7 +296,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port->pid);
/* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b);
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
hdr->id, nxt_pid, process->pid);
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 31572e54..bdb37483 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -166,12 +166,17 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (msg->port_msg.stream == stream &&
msg->port_msg.reply_port == reply_port) {
+
+ nxt_assert(msg->port_msg.last == 0);
+
/*
* An fd is ignored since a file descriptor
* must be sent only in the first message of a stream.
*/
nxt_buf_chain_add(&msg->buf, b);
+ msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
+
return NXT_OK;
}
@@ -187,6 +192,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->buf = b;
msg->fd = fd;
+ msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
msg->work.next = NULL;
@@ -201,8 +207,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.stream = stream;
msg->port_msg.pid = nxt_pid;
msg->port_msg.reply_port = reply_port;
- msg->port_msg.type = type;
- msg->port_msg.last = 0;
+ msg->port_msg.type = type & NXT_PORT_MSG_MASK;
+ msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
nxt_queue_insert_tail(&port->messages, &msg->link);
@@ -276,7 +282,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
m = NXT_PORT_METHOD_PLAIN;
}
- msg->port_msg.last = sb.last;
+ msg->port_msg.last |= sb.last;
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
@@ -288,6 +294,12 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
goto fail;
}
+ if (msg->fd != -1 && msg->close_fd != 0) {
+ nxt_fd_close(msg->fd);
+
+ msg->fd = -1;
+ }
+
wq = &task->thread->engine->fast_work_queue;
if (msg->buf != plain_buf) {
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 567c9df6..64cc53b7 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -575,7 +575,7 @@ nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
b->parent = tmcf->mem_pool;
b->completion_handler = nxt_router_conf_buf_completion;
- nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA, -1,
+ nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1,
tmcf->stream, 0, b);
}
@@ -1977,7 +1977,8 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
- nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1,
+ sw->stream, 0, b);
}