diff options
-rw-r--r-- | src/nxt_application.c | 3 | ||||
-rw-r--r-- | src/nxt_application.h | 3 | ||||
-rw-r--r-- | src/nxt_external.c | 12 | ||||
-rw-r--r-- | src/nxt_main_process.c | 30 | ||||
-rw-r--r-- | src/nxt_port.c | 5 | ||||
-rw-r--r-- | src/nxt_router.c | 61 | ||||
-rw-r--r-- | src/nxt_unit.c | 79 | ||||
-rw-r--r-- | src/nxt_unit.h | 2 |
8 files changed, 110 insertions, 85 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 589821fb..d1ff9ee7 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -1052,6 +1052,9 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, init->read_port.in_fd = my_port->pair[0]; init->read_port.out_fd = my_port->pair[1]; + init->shared_port_fd = conf->shared_port_fd; + init->shared_queue_fd = conf->shared_queue_fd; + init->log_fd = 2; init->shm_limit = conf->shm_limit; diff --git a/src/nxt_application.h b/src/nxt_application.h index 4612f072..30a1a12f 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -103,6 +103,9 @@ struct nxt_common_app_conf_s { size_t shm_limit; uint32_t request_limit; + nxt_fd_t shared_port_fd; + nxt_fd_t shared_queue_fd; + union { nxt_external_app_conf_t external; nxt_python_app_conf_t python; diff --git a/src/nxt_external.c b/src/nxt_external.c index b41ca51b..c724b9bd 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -106,6 +106,16 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } + rc = nxt_external_fd_no_cloexec(task, conf->shared_port_fd); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_external_fd_no_cloexec(task, conf->shared_queue_fd); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + end = buf + sizeof(buf); p = nxt_sprintf(buf, end, @@ -113,12 +123,14 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) "%PI,%ud,%d;" "%PI,%ud,%d;" "%PI,%ud,%d,%d;" + "%d,%d;" "%d,%z,%uD,%Z", NXT_VERSION, my_port->process->stream, proto_port->pid, proto_port->id, proto_port->pair[1], router_port->pid, router_port->id, router_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], my_port->pair[1], + conf->shared_port_fd, conf->shared_queue_fd, 2, conf->shm_limit, conf->request_limit); if (nxt_slow_path(p == end)) { diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index a5a20d3d..3914c041 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -382,25 +382,25 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port = rt->port_by_type[NXT_PROCESS_ROUTER]; if (nxt_slow_path(port == NULL)) { nxt_alert(task, "router port not found"); - return; + goto close_fds; } if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) { nxt_alert(task, "process %PI cannot start processes", nxt_recv_msg_cmsg_pid(msg)); - return; + goto close_fds; } process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { - return; + goto close_fds; } process->mem_pool = nxt_mp_create(1024, 128, 256, 32); if (process->mem_pool == NULL) { nxt_process_use(task, process, -1); - return; + goto close_fds; } process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; @@ -422,6 +422,9 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto failed; } + app_conf->shared_port_fd = msg->fd[0]; + app_conf->shared_queue_fd = msg->fd[1]; + start = b->mem.pos; app_conf->name.start = start; @@ -509,6 +512,17 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) ret = nxt_process_start(task, process); if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { + + /* Close shared port fds only in main process. */ + if (ret == NXT_OK) { + nxt_fd_close(app_conf->shared_port_fd); + nxt_fd_close(app_conf->shared_queue_fd); + } + + /* Avoid fds close in caller. */ + msg->fd[0] = -1; + msg->fd[1] = -1; + return; } @@ -523,6 +537,14 @@ failed: nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, msg->port_msg.stream, 0, NULL); } + +close_fds: + + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; } diff --git a/src/nxt_port.c b/src/nxt_port.c index 1e8fa28a..88d645af 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -176,8 +176,9 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { - nxt_debug(task, "port %d: message type:%uD", - msg->port->socket.fd, msg->port_msg.type); + nxt_debug(task, "port %d: message type:%uD fds:%d,%d", + msg->port->socket.fd, msg->port_msg.type, + msg->fd[0], msg->fd[1]); handlers = msg->port->data; handlers[msg->port_msg.type](task, msg); diff --git a/src/nxt_router.c b/src/nxt_router.c index 7623ccbb..f718bb6e 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, - nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; @@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "app '%V' %p start process", &app->name, app); b = NULL; + port_fd = -1; + queue_fd = -1; } else { if (app->proto_port_requests > 0) { @@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, @@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, - -1, stream, port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *router_port, *dport; @@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task, dport = rt->port_by_type[NXT_PROCESS_MAIN]; + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; + } else { nxt_debug(task, "app '%V' prefork", &app->name); b = NULL; + port_fd = -1; + queue_fd = -1; } router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, stream = nxt_port_rpc_ex_stream(rpc); - ret = nxt_port_socket_write(task, dport, - NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; @@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_inc_use(port); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); @@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } -static nxt_int_t -nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) -{ - nxt_buf_t *b; - nxt_port_t *port; - nxt_port_msg_new_port_t *msg; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - port = app_port->app->shared_port; - - nxt_debug(task, "send port %FD to process %PI", - port->pair[0], app_port->pid); - - b->mem.free += sizeof(nxt_port_msg_new_port_t); - msg = (nxt_port_msg_new_port_t *) b->mem.pos; - - msg->id = port->id; - msg->pid = port->pid; - msg->max_size = port->max_size; - msg->max_share = port->max_share; - msg->type = port->type; - - return nxt_port_socket_write2(task, app_port, - NXT_PORT_MSG_NEW_PORT, - port->pair[0], port->queue_fd, - 0, 0, b); -} - - static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 135c06ed..57b89617 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,7 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *shared_port_fd, int *shared_queue_fd, int *log_fd, uint32_t *stream, uint32_t *shm_limit, uint32_t *request_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, @@ -424,12 +425,12 @@ static pid_t nxt_unit_pid; nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc, queue_fd; + int rc, queue_fd, shared_queue_fd; void *mem; uint32_t ready_stream, shm_limit, request_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, router_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port, shared_port; nxt_unit_pid = getpid(); @@ -440,6 +441,7 @@ nxt_unit_init(nxt_unit_init_t *init) queue_fd = -1; mem = MAP_FAILED; + shared_port.out_fd = -1; if (init->ready_port.id.pid != 0 && init->ready_stream != 0 @@ -458,8 +460,12 @@ nxt_unit_init(nxt_unit_init_t *init) nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); + shared_port.in_fd = init->shared_port_fd; + shared_queue_fd = init->shared_queue_fd; + } else { rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &shared_port.in_fd, &shared_queue_fd, &lib->log_fd, &ready_stream, &shm_limit, &request_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -525,6 +531,27 @@ nxt_unit_init(nxt_unit_init_t *init) goto fail; } + nxt_unit_port_id_init(&shared_port.id, read_port.id.pid, + NXT_UNIT_SHARED_PORT_ID); + + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, shared_queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_unit_close(shared_queue_fd); + + lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); + if (nxt_slow_path(lib->shared_port == NULL)) { + nxt_unit_alert(NULL, "failed to add shared_port"); + + goto fail; + } + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); @@ -799,7 +826,8 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd, + int *log_fd, uint32_t *stream, uint32_t *shm_limit, uint32_t *request_limit) { int rc; @@ -845,11 +873,13 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d,%d;" + "%d,%d;" "%d,%"PRIu32",%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_in_fd, &read_out_fd, + shared_port_fd, shared_queue_fd, log_fd, shm_limit, request_limit); if (nxt_slow_path(rc == EOF)) { @@ -859,9 +889,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - if (nxt_slow_path(rc != 14)) { + if (nxt_slow_path(rc != 16)) { nxt_unit_alert(NULL, "invalid number of variables in %s env: " - "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars); + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars); return NXT_UNIT_ERROR; } @@ -1137,7 +1167,6 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { void *mem; - nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -1162,33 +1191,17 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { - nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - - new_port.in_fd = recv_msg->fd[0]; - new_port.out_fd = -1; - - mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd[1], 0); - - } else { - if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) - != NXT_UNIT_OK)) - { - return NXT_UNIT_ERROR; - } + if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id); - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd[0]; + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd[0]; - mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd[1], 0); - } + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd[1], 0); if (nxt_slow_path(mem == MAP_FAILED)) { nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], @@ -1206,12 +1219,6 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { - lib->shared_port = port; - - return nxt_unit_ctx_ready(ctx); - } - nxt_unit_port_release(port); return NXT_UNIT_OK; diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 1b5280af..35f9fa55 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -176,6 +176,8 @@ struct nxt_unit_init_s { uint32_t ready_stream; nxt_unit_port_t router_port; nxt_unit_port_t read_port; + int shared_port_fd; + int shared_queue_fd; int log_fd; }; |