diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 85 |
1 files changed, 25 insertions, 60 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 85556a72..3a32a363 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -223,8 +223,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, - nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -396,6 +394,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; @@ -415,6 +414,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "app '%V' %p start process", &app->name, app); b = NULL; + port_fd = -1; + queue_fd = -1; } else { if (app->proto_port_requests > 0) { @@ -441,6 +442,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, @@ -453,8 +457,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, - -1, stream, port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -1920,25 +1924,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tls_init->tickets_conf = nxt_conf_get_path(listener, &conf_tickets); - if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { - n = nxt_conf_array_elements_count(certificate); - - for (i = 0; i < n; i++) { - value = nxt_conf_get_array_element(certificate, i); - - nxt_assert(value != NULL); + n = nxt_conf_array_elements_count_or_1(certificate); - ret = nxt_router_conf_tls_insert(tmcf, value, skcf, - tls_init, i == 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } + for (i = 0; i < n; i++) { + value = nxt_conf_get_array_element_or_itself(certificate, + i); + nxt_assert(value != NULL); - } else { - /* NXT_CONF_STRING */ - ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, - tls_init, 1); + ret = nxt_router_conf_tls_insert(tmcf, value, skcf, + tls_init, i == 0); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -2779,6 +2773,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *router_port, *dport; @@ -2807,10 +2802,15 @@ nxt_router_app_rpc_create(nxt_task_t *task, dport = rt->port_by_type[NXT_PROCESS_MAIN]; + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; + } else { nxt_debug(task, "app '%V' prefork", &app->name); b = NULL; + port_fd = -1; + queue_fd = -1; } router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2829,9 +2829,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, stream = nxt_port_rpc_ex_stream(rpc); - ret = nxt_port_socket_write(task, dport, - NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; @@ -2906,7 +2905,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_inc_use(port); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); @@ -4618,46 +4617,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } -static nxt_int_t -nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) -{ - nxt_buf_t *b; - nxt_port_t *port; - nxt_port_msg_new_port_t *msg; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - port = app_port->app->shared_port; - - nxt_debug(task, "send port %FD to process %PI", - port->pair[0], app_port->pid); - - b->mem.free += sizeof(nxt_port_msg_new_port_t); - msg = (nxt_port_msg_new_port_t *) b->mem.pos; - - msg->id = port->id; - msg->pid = port->pid; - msg->max_size = port->max_size; - msg->max_share = port->max_share; - msg->type = port->type; - - return nxt_port_socket_write2(task, app_port, - NXT_PORT_MSG_NEW_PORT, - port->pair[0], port->queue_fd, - 0, 0, b); -} - - static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) |