diff options
author | Max Romanov <max.romanov@nginx.com> | 2021-11-24 13:11:47 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2021-11-24 13:11:47 +0300 |
commit | 2c636a03f35c1807fa0744b53d19f364b131dc1d (patch) | |
tree | c643468c53c4b8af5c64d4afecd4a8789d1c7ef2 /src/nxt_router.c | |
parent | ef1ebf96f300d9a2f487656ac827a9c166f57197 (diff) | |
download | unit-2c636a03f35c1807fa0744b53d19f364b131dc1d.tar.gz unit-2c636a03f35c1807fa0744b53d19f364b131dc1d.tar.bz2 |
Sending shared port to application prototype.
Application process started with shared port (and queue) already configured.
But still waits for PORT_ACK message from router to start request processing
(so-called "ready state").
Waiting for router confirmation is necessary. Otherwise, the application may
produce response and send it to router before the router have the information
about the application process. This is a subject of further optimizations.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 61 |
1 files changed, 18 insertions, 43 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 7623ccbb..f718bb6e 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, - nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; @@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "app '%V' %p start process", &app->name, app); b = NULL; + port_fd = -1; + queue_fd = -1; } else { if (app->proto_port_requests > 0) { @@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, @@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, - -1, stream, port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *router_port, *dport; @@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task, dport = rt->port_by_type[NXT_PROCESS_MAIN]; + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; + } else { nxt_debug(task, "app '%V' prefork", &app->name); b = NULL; + port_fd = -1; + queue_fd = -1; } router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, stream = nxt_port_rpc_ex_stream(rpc); - ret = nxt_port_socket_write(task, dport, - NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; @@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_inc_use(port); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); @@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } -static nxt_int_t -nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) -{ - nxt_buf_t *b; - nxt_port_t *port; - nxt_port_msg_new_port_t *msg; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - port = app_port->app->shared_port; - - nxt_debug(task, "send port %FD to process %PI", - port->pair[0], app_port->pid); - - b->mem.free += sizeof(nxt_port_msg_new_port_t); - msg = (nxt_port_msg_new_port_t *) b->mem.pos; - - msg->id = port->id; - msg->pid = port->pid; - msg->max_size = port->max_size; - msg->max_share = port->max_share; - msg->type = port->type; - - return nxt_port_socket_write2(task, app_port, - NXT_PORT_MSG_NEW_PORT, - port->pair[0], port->queue_fd, - 0, 0, b); -} - - static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) |