diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 61 |
1 files changed, 18 insertions, 43 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 7623ccbb..f718bb6e 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, - nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; @@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "app '%V' %p start process", &app->name, app); b = NULL; + port_fd = -1; + queue_fd = -1; } else { if (app->proto_port_requests > 0) { @@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, @@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, - -1, stream, port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *router_port, *dport; @@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task, dport = rt->port_by_type[NXT_PROCESS_MAIN]; + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; + } else { nxt_debug(task, "app '%V' prefork", &app->name); b = NULL; + port_fd = -1; + queue_fd = -1; } router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, stream = nxt_port_rpc_ex_stream(rpc); - ret = nxt_port_socket_write(task, dport, - NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; @@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_inc_use(port); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); @@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } -static nxt_int_t -nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) -{ - nxt_buf_t *b; - nxt_port_t *port; - nxt_port_msg_new_port_t *msg; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - port = app_port->app->shared_port; - - nxt_debug(task, "send port %FD to process %PI", - port->pair[0], app_port->pid); - - b->mem.free += sizeof(nxt_port_msg_new_port_t); - msg = (nxt_port_msg_new_port_t *) b->mem.pos; - - msg->id = port->id; - msg->pid = port->pid; - msg->max_size = port->max_size; - msg->max_share = port->max_share; - msg->type = port->type; - - return nxt_port_socket_write2(task, app_port, - NXT_PORT_MSG_NEW_PORT, - port->pair[0], port->queue_fd, - 0, 0, b); -} - - static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) |