diff options
-rw-r--r-- | src/nginext/nxt_go_lib.c | 4 | ||||
-rw-r--r-- | src/nginext/nxt_go_port.c | 12 | ||||
-rw-r--r-- | src/nginext/nxt_go_port_memory.c | 4 | ||||
-rw-r--r-- | src/nginext/nxt_go_run_ctx.c | 2 | ||||
-rw-r--r-- | src/nxt_controller.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 32 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 42 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 18 | ||||
-rw-r--r-- | src/nxt_router.c | 5 |
9 files changed, 56 insertions, 66 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c index 57c3da8e..87f583d7 100644 --- a/src/nginext/nxt_go_lib.c +++ b/src/nginext/nxt_go_lib.c @@ -189,8 +189,8 @@ nxt_go_ready() port_msg.stream = atol(go_stream); port_msg.pid = getpid(); port_msg.reply_port = 0; - port_msg.type = NXT_PORT_MSG_READY; - port_msg.last = 0; + port_msg.type = _NXT_PORT_MSG_READY; + port_msg.last = 1; port_msg.mmap = 0; nxt_go_master_send(&port_msg, sizeof(port_msg), NULL, 0); diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c index 58ba90ea..af50b860 100644 --- a/src/nginext/nxt_go_port.c +++ b/src/nginext/nxt_go_port.c @@ -152,13 +152,13 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) } switch (port_msg->type) { - case NXT_PORT_MSG_QUIT: + case _NXT_PORT_MSG_QUIT: nxt_go_debug("quit"); nxt_go_set_quit(); break; - case NXT_PORT_MSG_NEW_PORT: + case _NXT_PORT_MSG_NEW_PORT: nxt_go_debug("new port"); new_port_msg = payload; @@ -166,22 +166,22 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) -1, fd); break; - case NXT_PORT_MSG_CHANGE_FILE: + case _NXT_PORT_MSG_CHANGE_FILE: nxt_go_debug("change file"); break; - case NXT_PORT_MSG_MMAP: + case _NXT_PORT_MSG_MMAP: nxt_go_debug("mmap"); nxt_go_new_incoming_mmap(port_msg->pid, fd); break; - case NXT_PORT_MSG_DATA: + case _NXT_PORT_MSG_DATA: nxt_go_debug("data"); return nxt_go_data_handler(port_msg, buf_size); - case NXT_PORT_MSG_REMOVE_PID: + case _NXT_PORT_MSG_REMOVE_PID: nxt_go_debug("remove pid"); /* TODO remove all ports for this pid in Go */ diff --git a/src/nginext/nxt_go_port_memory.c b/src/nginext/nxt_go_port_memory.c index 7e3f7a1a..dbdf5f73 100644 --- a/src/nginext/nxt_go_port_memory.c +++ b/src/nginext/nxt_go_port_memory.c @@ -112,8 +112,8 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) port_msg.stream = 0; port_msg.pid = getpid(); port_msg.reply_port = 0; - port_msg.type = NXT_PORT_MSG_MMAP; - port_msg.last = 0; + port_msg.type = _NXT_PORT_MSG_MMAP; + port_msg.last = 1; port_msg.mmap = 0; cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c index 0788b0d1..c6481b95 100644 --- a/src/nginext/nxt_go_run_ctx.c +++ b/src/nginext/nxt_go_run_ctx.c @@ -165,7 +165,7 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, ctx->wport_msg.stream = port_msg->stream; ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = NXT_PORT_MSG_DATA; + ctx->wport_msg.type = _NXT_PORT_MSG_DATA; ctx->wport_msg.mmap = 1; ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 660441f7..5ef1f944 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -862,7 +862,8 @@ nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); - return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b); + return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0, + 0, b); } diff --git a/src/nxt_port.h b/src/nxt_port.h index c057007f..27c00132 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -9,15 +9,30 @@ typedef enum { - NXT_PORT_MSG_QUIT = 0, - NXT_PORT_MSG_NEW_PORT, - NXT_PORT_MSG_CHANGE_FILE, - NXT_PORT_MSG_MMAP, - NXT_PORT_MSG_DATA, - NXT_PORT_MSG_REMOVE_PID, - NXT_PORT_MSG_READY, + NXT_PORT_MSG_LAST = 0x100, + NXT_PORT_MSG_CLOSE_FD = 0x200, + + NXT_PORT_MSG_MASK = 0xFF, + + _NXT_PORT_MSG_QUIT = 0, + _NXT_PORT_MSG_NEW_PORT, + _NXT_PORT_MSG_CHANGE_FILE, + _NXT_PORT_MSG_MMAP, + _NXT_PORT_MSG_DATA, + _NXT_PORT_MSG_REMOVE_PID, + _NXT_PORT_MSG_READY, NXT_PORT_MSG_MAX, + + NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST | + NXT_PORT_MSG_CLOSE_FD, + NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, + NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; @@ -27,7 +42,7 @@ typedef struct { nxt_pid_t pid; nxt_port_id_t reply_port; - nxt_port_msg_type_t type:8; + uint8_t type; uint8_t last; /* 1 bit */ /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */ @@ -40,6 +55,7 @@ typedef struct { nxt_buf_t *buf; size_t share; nxt_fd_t fd; + nxt_bool_t close_fd; nxt_port_msg_t port_msg; nxt_work_t work; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 32d4aa5f..454827f0 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -198,37 +198,6 @@ fail: } -static void -nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_fd_t fd; - nxt_buf_t *b; - nxt_mp_t *mp; - - if (nxt_buf_ts_handle(task, obj, data)) { - return; - } - - b = obj; - mp = b->data; - fd = (nxt_fd_t) (intptr_t) data; - -#if (NXT_DEBUG) - if (nxt_slow_path(data != b->parent)) { - nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", - data, b->parent); - nxt_abort(); - } -#endif - - nxt_debug(task, "mmap fd %FD has been sent", fd); - - nxt_fd_close(fd); - - nxt_mp_release(mp, b); -} - - static nxt_port_mmap_header_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) @@ -236,7 +205,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, void *mem; u_char *p, name[64]; nxt_fd_t fd; - nxt_buf_t *b; nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; @@ -310,14 +278,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap->hdr = mem; - b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); - if (nxt_slow_path(b == NULL)) { - goto remove_fail; - } - - b->completion_handler = nxt_port_mmap_send_fd_buf_completion; - b->parent = (void *) (intptr_t) fd; - /* Init segment header. */ hdr = port_mmap->hdr; @@ -336,7 +296,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port->pid); /* TODO handle error */ - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b); + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", hdr->id, nxt_pid, process->pid); diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 31572e54..bdb37483 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -166,12 +166,17 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, if (msg->port_msg.stream == stream && msg->port_msg.reply_port == reply_port) { + + nxt_assert(msg->port_msg.last == 0); + /* * An fd is ignored since a file descriptor * must be sent only in the first message of a stream. */ nxt_buf_chain_add(&msg->buf, b); + msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0; + return NXT_OK; } @@ -187,6 +192,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->buf = b; msg->fd = fd; + msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg->share = 0; msg->work.next = NULL; @@ -201,8 +207,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->port_msg.stream = stream; msg->port_msg.pid = nxt_pid; msg->port_msg.reply_port = reply_port; - msg->port_msg.type = type; - msg->port_msg.last = 0; + msg->port_msg.type = type & NXT_PORT_MSG_MASK; + msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg->port_msg.mmap = 0; nxt_queue_insert_tail(&port->messages, &msg->link); @@ -276,7 +282,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) m = NXT_PORT_METHOD_PLAIN; } - msg->port_msg.last = sb.last; + msg->port_msg.last |= sb.last; n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); @@ -288,6 +294,12 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) goto fail; } + if (msg->fd != -1 && msg->close_fd != 0) { + nxt_fd_close(msg->fd); + + msg->fd = -1; + } + wq = &task->thread->engine->fast_work_queue; if (msg->buf != plain_buf) { diff --git a/src/nxt_router.c b/src/nxt_router.c index 567c9df6..64cc53b7 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -575,7 +575,7 @@ nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, b->parent = tmcf->mem_pool; b->completion_handler = nxt_router_conf_buf_completion; - nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA, -1, + nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1, tmcf->stream, 0, b); } @@ -1977,7 +1977,8 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); - nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b); + nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, + sw->stream, 0, b); } |