summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c61
1 files changed, 18 insertions, 43 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 7623ccbb..f718bb6e 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
- nxt_port_t *app_port);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
{
size_t size;
uint32_t stream;
+ nxt_fd_t port_fd, queue_fd;
nxt_int_t ret;
nxt_app_t *app;
nxt_buf_t *b;
@@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "app '%V' %p start process", &app->name, app);
b = NULL;
+ port_fd = -1;
+ queue_fd = -1;
} else {
if (app->proto_port_requests > 0) {
@@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
+
+ port_fd = app->shared_port->pair[0];
+ queue_fd = app->shared_port->queue_fd;
}
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
@@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
- ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
- -1, stream, port->id, b);
+ ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
+ port_fd, queue_fd, stream, port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
@@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task,
{
size_t size;
uint32_t stream;
+ nxt_fd_t port_fd, queue_fd;
nxt_int_t ret;
nxt_buf_t *b;
nxt_port_t *router_port, *dport;
@@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task,
dport = rt->port_by_type[NXT_PROCESS_MAIN];
+ port_fd = app->shared_port->pair[0];
+ queue_fd = app->shared_port->queue_fd;
+
} else {
nxt_debug(task, "app '%V' prefork", &app->name);
b = NULL;
+ port_fd = -1;
+ queue_fd = -1;
}
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
@@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
stream = nxt_port_rpc_ex_stream(rpc);
- ret = nxt_port_socket_write(task, dport,
- NXT_PORT_MSG_START_PROCESS,
- -1, stream, router_port->id, b);
+ ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
+ port_fd, queue_fd, stream, router_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, router_port, stream);
goto fail;
@@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_port_inc_use(port);
- nxt_router_app_shared_port_send(task, port);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
nxt_work_queue_add(&engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
@@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
- nxt_router_app_shared_port_send(task, port);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
}
-static nxt_int_t
-nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
-{
- nxt_buf_t *b;
- nxt_port_t *port;
- nxt_port_msg_new_port_t *msg;
-
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
- sizeof(nxt_port_data_t));
- if (nxt_slow_path(b == NULL)) {
- return NXT_ERROR;
- }
-
- port = app_port->app->shared_port;
-
- nxt_debug(task, "send port %FD to process %PI",
- port->pair[0], app_port->pid);
-
- b->mem.free += sizeof(nxt_port_msg_new_port_t);
- msg = (nxt_port_msg_new_port_t *) b->mem.pos;
-
- msg->id = port->id;
- msg->pid = port->pid;
- msg->max_size = port->max_size;
- msg->max_share = port->max_share;
- msg->type = port->type;
-
- return nxt_port_socket_write2(task, app_port,
- NXT_PORT_MSG_NEW_PORT,
- port->pair[0], port->queue_fd,
- 0, 0, b);
-}
-
-
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)