From 2c636a03f35c1807fa0744b53d19f364b131dc1d Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 24 Nov 2021 13:11:47 +0300 Subject: Sending shared port to application prototype. Application process started with shared port (and queue) already configured. But still waits for PORT_ACK message from router to start request processing (so-called "ready state"). Waiting for router confirmation is necessary. Otherwise, the application may produce response and send it to router before the router have the information about the application process. This is a subject of further optimizations. --- src/nxt_router.c | 61 +++++++++++++++++--------------------------------------- 1 file changed, 18 insertions(+), 43 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 7623ccbb..f718bb6e 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, - nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; @@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "app '%V' %p start process", &app->name, app); b = NULL; + port_fd = -1; + queue_fd = -1; } else { if (app->proto_port_requests > 0) { @@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, @@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, - -1, stream, port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, { size_t size; uint32_t stream; + nxt_fd_t port_fd, queue_fd; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *router_port, *dport; @@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task, dport = rt->port_by_type[NXT_PROCESS_MAIN]; + port_fd = app->shared_port->pair[0]; + queue_fd = app->shared_port->queue_fd; + } else { nxt_debug(task, "app '%V' prefork", &app->name); b = NULL; + port_fd = -1; + queue_fd = -1; } router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, stream = nxt_port_rpc_ex_stream(rpc); - ret = nxt_port_socket_write(task, dport, - NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS, + port_fd, queue_fd, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; @@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_inc_use(port); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); @@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); - nxt_router_app_shared_port_send(task, port); + nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } -static nxt_int_t -nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) -{ - nxt_buf_t *b; - nxt_port_t *port; - nxt_port_msg_new_port_t *msg; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - port = app_port->app->shared_port; - - nxt_debug(task, "send port %FD to process %PI", - port->pair[0], app_port->pid); - - b->mem.free += sizeof(nxt_port_msg_new_port_t); - msg = (nxt_port_msg_new_port_t *) b->mem.pos; - - msg->id = port->id; - msg->pid = port->pid; - msg->max_size = port->max_size; - msg->max_share = port->max_share; - msg->type = port->type; - - return nxt_port_socket_write2(task, app_port, - NXT_PORT_MSG_NEW_PORT, - port->pair[0], port->queue_fd, - 0, 0, b); -} - - static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) -- cgit From 0af5f6ddb495902914880f540b44d8c828d945f1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 25 Nov 2021 16:58:43 +0300 Subject: Fixing access_log structure reference counting. The reference to the access_log structure is stored in the current nxt_router_conf_t and the global nxt_router_t. When the reference is copied, the reference counter should be adjusted accordingly. This closes #593 issue on GitHub. --- src/nxt_router.c | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index f718bb6e..52ea0f37 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -210,6 +210,8 @@ static void nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock, + nxt_router_access_log_t *access_log); static void nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log); static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, @@ -1153,7 +1155,13 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) nxt_queue_add(&router->sockets, &updating_sockets); nxt_queue_add(&router->sockets, &creating_sockets); - router->access_log = rtcf->access_log; + if (router->access_log != rtcf->access_log) { + nxt_router_access_log_use(&router->lock, rtcf->access_log); + + nxt_router_access_log_release(task, &router->lock, router->access_log); + + router->access_log = rtcf->access_log; + } nxt_router_conf_ready(task, tmcf); @@ -1975,9 +1983,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, access_log = router->access_log; if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) { - nxt_thread_spin_lock(&router->lock); - access_log->count++; - nxt_thread_spin_unlock(&router->lock); + nxt_router_access_log_use(&router->lock, access_log); } else { access_log = nxt_malloc(sizeof(nxt_router_access_log_t) @@ -3931,6 +3937,22 @@ nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } +static void +nxt_router_access_log_use(nxt_thread_spinlock_t *lock, + nxt_router_access_log_t *access_log) +{ + if (access_log == NULL) { + return; + } + + nxt_thread_spin_lock(lock); + + access_log->count++; + + nxt_thread_spin_unlock(lock); +} + + static void nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log) -- cgit From bce0f432c402ad18718aecab227b663160682ea4 Mon Sep 17 00:00:00 2001 From: Alejandro Colomar Date: Sat, 18 Dec 2021 22:58:27 +0100 Subject: Removed special cases for non-NXT_CONF_VALUE_ARRAY. The previous commit added more generic APIs for handling NXT_CONF_VALUE_ARRAY and non-NXT_CONF_VALUE_ARRAY together. Modify calling code to remove special cases for arrays and non-arrays, taking special care that the path for non arrays is logically equivalent to the previous special cased code. Use the now-generic array code only. --- src/nxt_router.c | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 52ea0f37..3a32a363 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1924,25 +1924,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tls_init->tickets_conf = nxt_conf_get_path(listener, &conf_tickets); - if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { - n = nxt_conf_array_elements_count(certificate); + n = nxt_conf_array_elements_count_or_1(certificate); - for (i = 0; i < n; i++) { - value = nxt_conf_get_array_element(certificate, i); - - nxt_assert(value != NULL); - - ret = nxt_router_conf_tls_insert(tmcf, value, skcf, - tls_init, i == 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } + for (i = 0; i < n; i++) { + value = nxt_conf_get_array_element_or_itself(certificate, + i); + nxt_assert(value != NULL); - } else { - /* NXT_CONF_STRING */ - ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, - tls_init, 1); + ret = nxt_router_conf_tls_insert(tmcf, value, skcf, + tls_init, i == 0); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } -- cgit