summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_application.c3
-rw-r--r--src/nxt_application.h3
-rw-r--r--src/nxt_external.c12
-rw-r--r--src/nxt_main_process.c30
-rw-r--r--src/nxt_port.c5
-rw-r--r--src/nxt_router.c61
-rw-r--r--src/nxt_unit.c79
-rw-r--r--src/nxt_unit.h2
8 files changed, 110 insertions, 85 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 589821fb..d1ff9ee7 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -1052,6 +1052,9 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
init->read_port.in_fd = my_port->pair[0];
init->read_port.out_fd = my_port->pair[1];
+ init->shared_port_fd = conf->shared_port_fd;
+ init->shared_queue_fd = conf->shared_queue_fd;
+
init->log_fd = 2;
init->shm_limit = conf->shm_limit;
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 4612f072..30a1a12f 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -103,6 +103,9 @@ struct nxt_common_app_conf_s {
size_t shm_limit;
uint32_t request_limit;
+ nxt_fd_t shared_port_fd;
+ nxt_fd_t shared_queue_fd;
+
union {
nxt_external_app_conf_t external;
nxt_python_app_conf_t python;
diff --git a/src/nxt_external.c b/src/nxt_external.c
index b41ca51b..c724b9bd 100644
--- a/src/nxt_external.c
+++ b/src/nxt_external.c
@@ -106,6 +106,16 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
return NXT_ERROR;
}
+ rc = nxt_external_fd_no_cloexec(task, conf->shared_port_fd);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ rc = nxt_external_fd_no_cloexec(task, conf->shared_queue_fd);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
end = buf + sizeof(buf);
p = nxt_sprintf(buf, end,
@@ -113,12 +123,14 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
"%PI,%ud,%d;"
"%PI,%ud,%d;"
"%PI,%ud,%d,%d;"
+ "%d,%d;"
"%d,%z,%uD,%Z",
NXT_VERSION, my_port->process->stream,
proto_port->pid, proto_port->id, proto_port->pair[1],
router_port->pid, router_port->id, router_port->pair[1],
my_port->pid, my_port->id, my_port->pair[0],
my_port->pair[1],
+ conf->shared_port_fd, conf->shared_queue_fd,
2, conf->shm_limit, conf->request_limit);
if (nxt_slow_path(p == end)) {
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index a5a20d3d..3914c041 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -382,25 +382,25 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port = rt->port_by_type[NXT_PROCESS_ROUTER];
if (nxt_slow_path(port == NULL)) {
nxt_alert(task, "router port not found");
- return;
+ goto close_fds;
}
if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
nxt_alert(task, "process %PI cannot start processes",
nxt_recv_msg_cmsg_pid(msg));
- return;
+ goto close_fds;
}
process = nxt_process_new(rt);
if (nxt_slow_path(process == NULL)) {
- return;
+ goto close_fds;
}
process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
if (process->mem_pool == NULL) {
nxt_process_use(task, process, -1);
- return;
+ goto close_fds;
}
process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
@@ -422,6 +422,9 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
goto failed;
}
+ app_conf->shared_port_fd = msg->fd[0];
+ app_conf->shared_queue_fd = msg->fd[1];
+
start = b->mem.pos;
app_conf->name.start = start;
@@ -509,6 +512,17 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
ret = nxt_process_start(task, process);
if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
+
+ /* Close shared port fds only in main process. */
+ if (ret == NXT_OK) {
+ nxt_fd_close(app_conf->shared_port_fd);
+ nxt_fd_close(app_conf->shared_queue_fd);
+ }
+
+ /* Avoid fds close in caller. */
+ msg->fd[0] = -1;
+ msg->fd[1] = -1;
+
return;
}
@@ -523,6 +537,14 @@ failed:
nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
-1, msg->port_msg.stream, 0, NULL);
}
+
+close_fds:
+
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
+
+ nxt_fd_close(msg->fd[1]);
+ msg->fd[1] = -1;
}
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 1e8fa28a..88d645af 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -176,8 +176,9 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
- nxt_debug(task, "port %d: message type:%uD",
- msg->port->socket.fd, msg->port_msg.type);
+ nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
+ msg->port->socket.fd, msg->port_msg.type,
+ msg->fd[0], msg->fd[1]);
handlers = msg->port->data;
handlers[msg->port_msg.type](task, msg);
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 7623ccbb..f718bb6e 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
- nxt_port_t *app_port);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
{
size_t size;
uint32_t stream;
+ nxt_fd_t port_fd, queue_fd;
nxt_int_t ret;
nxt_app_t *app;
nxt_buf_t *b;
@@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "app '%V' %p start process", &app->name, app);
b = NULL;
+ port_fd = -1;
+ queue_fd = -1;
} else {
if (app->proto_port_requests > 0) {
@@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
+
+ port_fd = app->shared_port->pair[0];
+ queue_fd = app->shared_port->queue_fd;
}
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
@@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
- ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
- -1, stream, port->id, b);
+ ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
+ port_fd, queue_fd, stream, port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
@@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task,
{
size_t size;
uint32_t stream;
+ nxt_fd_t port_fd, queue_fd;
nxt_int_t ret;
nxt_buf_t *b;
nxt_port_t *router_port, *dport;
@@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task,
dport = rt->port_by_type[NXT_PROCESS_MAIN];
+ port_fd = app->shared_port->pair[0];
+ queue_fd = app->shared_port->queue_fd;
+
} else {
nxt_debug(task, "app '%V' prefork", &app->name);
b = NULL;
+ port_fd = -1;
+ queue_fd = -1;
}
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
@@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
stream = nxt_port_rpc_ex_stream(rpc);
- ret = nxt_port_socket_write(task, dport,
- NXT_PORT_MSG_START_PROCESS,
- -1, stream, router_port->id, b);
+ ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
+ port_fd, queue_fd, stream, router_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, router_port, stream);
goto fail;
@@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_port_inc_use(port);
- nxt_router_app_shared_port_send(task, port);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
nxt_work_queue_add(&engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
@@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
- nxt_router_app_shared_port_send(task, port);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
}
-static nxt_int_t
-nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
-{
- nxt_buf_t *b;
- nxt_port_t *port;
- nxt_port_msg_new_port_t *msg;
-
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
- sizeof(nxt_port_data_t));
- if (nxt_slow_path(b == NULL)) {
- return NXT_ERROR;
- }
-
- port = app_port->app->shared_port;
-
- nxt_debug(task, "send port %FD to process %PI",
- port->pair[0], app_port->pid);
-
- b->mem.free += sizeof(nxt_port_msg_new_port_t);
- msg = (nxt_port_msg_new_port_t *) b->mem.pos;
-
- msg->id = port->id;
- msg->pid = port->pid;
- msg->max_size = port->max_size;
- msg->max_share = port->max_share;
- msg->type = port->type;
-
- return nxt_port_socket_write2(task, app_port,
- NXT_PORT_MSG_NEW_PORT,
- port->pair[0], port->queue_fd,
- 0, 0, b);
-}
-
-
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 135c06ed..57b89617 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -55,6 +55,7 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
+ int *shared_port_fd, int *shared_queue_fd,
int *log_fd, uint32_t *stream, uint32_t *shm_limit,
uint32_t *request_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
@@ -424,12 +425,12 @@ static pid_t nxt_unit_pid;
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
- int rc, queue_fd;
+ int rc, queue_fd, shared_queue_fd;
void *mem;
uint32_t ready_stream, shm_limit, request_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
- nxt_unit_port_t ready_port, router_port, read_port;
+ nxt_unit_port_t ready_port, router_port, read_port, shared_port;
nxt_unit_pid = getpid();
@@ -440,6 +441,7 @@ nxt_unit_init(nxt_unit_init_t *init)
queue_fd = -1;
mem = MAP_FAILED;
+ shared_port.out_fd = -1;
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
@@ -458,8 +460,12 @@ nxt_unit_init(nxt_unit_init_t *init)
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
+ shared_port.in_fd = init->shared_port_fd;
+ shared_queue_fd = init->shared_queue_fd;
+
} else {
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
+ &shared_port.in_fd, &shared_queue_fd,
&lib->log_fd, &ready_stream, &shm_limit,
&request_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -525,6 +531,27 @@ nxt_unit_init(nxt_unit_init_t *init)
goto fail;
}
+ nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
+ NXT_UNIT_SHARED_PORT_ID);
+
+ mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, shared_queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ nxt_unit_close(shared_queue_fd);
+
+ lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
+ if (nxt_slow_path(lib->shared_port == NULL)) {
+ nxt_unit_alert(NULL, "failed to add shared_port");
+
+ goto fail;
+ }
+
rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
@@ -799,7 +826,8 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
- nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
+ nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
+ int *log_fd, uint32_t *stream,
uint32_t *shm_limit, uint32_t *request_limit)
{
int rc;
@@ -845,11 +873,13 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d,%d;"
+ "%d,%d;"
"%d,%"PRIu32",%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&router_pid, &router_id, &router_fd,
&read_pid, &read_id, &read_in_fd, &read_out_fd,
+ shared_port_fd, shared_queue_fd,
log_fd, shm_limit, request_limit);
if (nxt_slow_path(rc == EOF)) {
@@ -859,9 +889,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(rc != 14)) {
+ if (nxt_slow_path(rc != 16)) {
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
- "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
+ "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
return NXT_UNIT_ERROR;
}
@@ -1137,7 +1167,6 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
void *mem;
- nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -1162,33 +1191,17 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
- nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
-
- new_port.in_fd = recv_msg->fd[0];
- new_port.out_fd = -1;
-
- mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
- MAP_SHARED, recv_msg->fd[1], 0);
-
- } else {
- if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
- != NXT_UNIT_OK))
- {
- return NXT_UNIT_ERROR;
- }
+ if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd[0];
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd[0];
- mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
- MAP_SHARED, recv_msg->fd[1], 0);
- }
+ mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd[1], 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
@@ -1206,12 +1219,6 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
- lib->shared_port = port;
-
- return nxt_unit_ctx_ready(ctx);
- }
-
nxt_unit_port_release(port);
return NXT_UNIT_OK;
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 1b5280af..35f9fa55 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -176,6 +176,8 @@ struct nxt_unit_init_s {
uint32_t ready_stream;
nxt_unit_port_t router_port;
nxt_unit_port_t read_port;
+ int shared_port_fd;
+ int shared_queue_fd;
int log_fd;
};