diff options
author | Max Romanov <max.romanov@nginx.com> | 2021-10-28 17:46:54 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2021-10-28 17:46:54 +0300 |
commit | bba97134e983541e94cf73e93900729e3a3e61fc (patch) | |
tree | ce0322c432f1d08cd302209f1403fab112788b2c | |
parent | 803e0373029a80994a85781d0b73b6cfa95bcf5a (diff) | |
download | unit-bba97134e983541e94cf73e93900729e3a3e61fc.tar.gz unit-bba97134e983541e94cf73e93900729e3a3e61fc.tar.bz2 |
Moving request limit control to libunit.
Introducting application graceful stop. For now only used when application
process reach request limit value.
This closes #585 issue on GitHub.
Diffstat (limited to '')
-rw-r--r-- | docs/changes.xml | 7 | ||||
-rw-r--r-- | go/port.go | 3 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 4 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 3 | ||||
-rw-r--r-- | src/nxt_application.c | 6 | ||||
-rw-r--r-- | src/nxt_application.h | 3 | ||||
-rw-r--r-- | src/nxt_external.c | 4 | ||||
-rw-r--r-- | src/nxt_java.c | 8 | ||||
-rw-r--r-- | src/nxt_main_process.c | 7 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 55 | ||||
-rw-r--r-- | src/nxt_router.h | 1 | ||||
-rw-r--r-- | src/nxt_unit.c | 290 | ||||
-rw-r--r-- | src/nxt_unit.h | 6 | ||||
-rw-r--r-- | src/perl/nxt_perl_psgi.c | 8 | ||||
-rw-r--r-- | src/python/nxt_python.c | 15 | ||||
-rw-r--r-- | src/python/nxt_python.h | 1 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.c | 74 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.h | 1 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 6 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_websocket.c | 4 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.c | 8 | ||||
-rw-r--r-- | src/test/nxt_unit_app_test.c | 2 |
24 files changed, 273 insertions, 248 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index 3a5fa2b2..b9ddd3c7 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -50,6 +50,13 @@ NGINX Unit updated to 1.26.0. </para> </change> +<change type="bugfix"> +<para> +the router and app processes could crash when reaching requests limit +in asynchronous or multithreaded apps. +</para> +</change> + </changes> @@ -120,7 +120,8 @@ func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int { } //export nxt_go_remove_port -func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) { +func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t, + p *C.nxt_unit_port_t) { key := port_key{ pid: int(p.id.pid), diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 589eca3f..ee5dc46f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -519,11 +519,11 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) void -Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) +Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { port_data_t *data; - if (port->data != NULL) { + if (port->data != NULL && ctx != NULL) { data = (port_data_t *) port->data; data->stop(); diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 4ef40d45..1aa93073 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -41,7 +41,8 @@ private: void shm_ack_handler(nxt_unit_ctx_t *ctx); static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); - static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port); + static void remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void quit_cb(nxt_unit_ctx_t *ctx); void quit(nxt_unit_ctx_t *ctx); diff --git a/src/nxt_application.c b/src/nxt_application.c index 5d58e60c..096ba4b4 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -687,7 +687,8 @@ nxt_app_parse_type(u_char *p, size_t length) nxt_int_t -nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) +nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, + nxt_common_app_conf_t *conf) { nxt_port_t *my_port, *main_port, *router_port; nxt_runtime_t *rt; @@ -730,5 +731,8 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) init->log_fd = 2; + init->shm_limit = conf->shm_limit; + init->request_limit = conf->request_limit; + return NXT_OK; } diff --git a/src/nxt_application.h b/src/nxt_application.h index 6fbdc4be..4612f072 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -101,6 +101,7 @@ struct nxt_common_app_conf_s { nxt_conf_value_t *limits; size_t shm_limit; + uint32_t request_limit; union { nxt_external_app_conf_t external; @@ -137,7 +138,7 @@ NXT_EXPORT extern nxt_str_t nxt_server; extern nxt_app_module_t nxt_external_module; NXT_EXPORT nxt_int_t nxt_unit_default_init(nxt_task_t *task, - nxt_unit_init_t *init); + nxt_unit_init_t *init, nxt_common_app_conf_t *conf); #endif /* _NXT_APPLICATION_H_INCLIDED_ */ diff --git a/src/nxt_external.c b/src/nxt_external.c index 5703e294..1275aa87 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -113,13 +113,13 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) "%PI,%ud,%d;" "%PI,%ud,%d;" "%PI,%ud,%d,%d;" - "%d,%z,%Z", + "%d,%z,%uD,%Z", NXT_VERSION, my_port->process->stream, main_port->pid, main_port->id, main_port->pair[1], router_port->pid, router_port->id, router_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], my_port->pair[1], - 2, conf->shm_limit); + 2, conf->shm_limit, conf->request_limit); if (nxt_slow_path(p == end)) { nxt_alert(task, "internal error: buffer too small for NXT_UNIT_INIT"); diff --git a/src/nxt_java.c b/src/nxt_java.c index ac715c0b..75c8ee19 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -428,7 +428,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } - nxt_unit_default_init(task, &java_init); + nxt_unit_default_init(task, &java_init, app_conf); java_init.callbacks.request_handler = nxt_java_request_handler; java_init.callbacks.websocket_handler = nxt_java_websocket_handler; @@ -437,7 +437,6 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &java_data; java_init.ctx_data = env; - java_init.shm_limit = app_conf->shm_limit; ctx = nxt_unit_init(&java_init); if (nxt_slow_path(ctx == NULL)) { @@ -616,11 +615,6 @@ nxt_java_ready_handler(nxt_unit_ctx_t *ctx) nxt_java_data_t *java_data; nxt_java_app_conf_t *c; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - java_data = ctx->unit->data; c = java_data->conf; diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 86425dab..fe21aeb5 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -154,6 +154,12 @@ static nxt_conf_map_t nxt_common_app_limits_conf[] = { offsetof(nxt_common_app_conf_t, shm_limit), }, + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, request_limit), + }, + }; @@ -392,6 +398,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; + app_conf->request_limit = 0; start += app_conf->name.length + 1; diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 3fb3b0db..ea5f5581 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -485,14 +485,13 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) } } - ret = nxt_unit_default_init(task, &php_init); + ret = nxt_unit_default_init(task, &php_init, conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "nxt_unit_default_init() failed"); return ret; } php_init.callbacks.request_handler = nxt_php_request_handler; - php_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&php_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_port.h b/src/nxt_port.h index a0bc2512..c698a49c 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -224,8 +224,6 @@ struct nxt_port_s { /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_responses; - uint32_t active_websockets; uint32_t active_requests; diff --git a/src/nxt_router.c b/src/nxt_router.c index bbfe3dcf..9317d752 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -27,7 +27,6 @@ typedef struct { uint32_t spare_processes; nxt_msec_t timeout; nxt_msec_t idle_timeout; - uint32_t requests; nxt_conf_value_t *limits_value; nxt_conf_value_t *processes_value; nxt_conf_value_t *targets_value; @@ -1293,12 +1292,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_router_app_conf_t, timeout), }, - - { - nxt_string("requests"), - NXT_CONF_MAP_INT32, - offsetof(nxt_router_app_conf_t, requests), - }, }; @@ -1567,7 +1560,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.spare_processes = 0; apcf.timeout = 0; apcf.idle_timeout = 15000; - apcf.requests = 0; apcf.limits_value = NULL; apcf.processes_value = NULL; apcf.targets_value = NULL; @@ -1647,7 +1639,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application processes: %D", apcf.processes); nxt_debug(task, "application request timeout: %M", apcf.timeout); - nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1678,7 +1669,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, ? apcf.spare_processes : 1; app->timeout = apcf.timeout; app->idle_timeout = apcf.idle_timeout; - app->max_requests = apcf.requests; app->targets = targets; @@ -4676,7 +4666,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, { int inc_use; uint32_t got_response, dec_requests; - nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_bool_t adjust_idle_timer; nxt_port_t *main_app_port; nxt_assert(port != NULL); @@ -4722,40 +4712,18 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, nxt_thread_mutex_lock(&app->mutex); - main_app_port->app_responses += got_response; main_app_port->active_requests -= got_response + dec_requests; app->active_requests -= got_response + dec_requests; - if (main_app_port->pair[1] != -1 - && (app->max_requests == 0 - || main_app_port->app_responses < app->max_requests)) - { - if (main_app_port->app_link.next == NULL) { - nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - - nxt_port_inc_use(main_app_port); - } - } - - send_quit = (app->max_requests > 0 - && main_app_port->app_responses >= app->max_requests); - - if (send_quit) { - port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - - nxt_port_hash_remove(&app->port_hash, main_app_port); - app->port_hash_count--; - - main_app_port->app = NULL; - app->processes--; + if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - } else { - port_unchained = 0; + nxt_port_inc_use(main_app_port); } adjust_idle_timer = 0; - if (main_app_port->pair[1] != -1 && !send_quit + if (main_app_port->pair[1] != -1 && main_app_port->active_requests == 0 && main_app_port->active_websockets == 0 && main_app_port->idle_link.next == NULL) @@ -4800,19 +4768,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, goto adjust_use; } - if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - - nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, - NULL); - - if (port_unchained) { - nxt_port_use(task, main_app_port, -1); - } - - goto adjust_use; - } - nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); diff --git a/src/nxt_router.h b/src/nxt_router.h index fc068b53..011663c3 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -124,7 +124,6 @@ struct nxt_app_s { uint32_t max_processes; uint32_t spare_processes; uint32_t max_pending_processes; - uint32_t max_requests; uint32_t generation; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ae4499d8..3a31becd 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -25,6 +25,11 @@ #define NXT_UNIT_LOCAL_BUF_SIZE \ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) +enum { + NXT_QUIT_NORMAL = 0, + NXT_QUIT_GRACEFUL = 1, +}; + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; @@ -51,7 +56,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit); + int *log_fd, uint32_t *stream, uint32_t *shm_limit, + uint32_t *request_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, @@ -130,6 +136,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); @@ -150,14 +157,14 @@ static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req); -static void nxt_unit_remove_port(nxt_unit_impl_t *lib, +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); -static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param); static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, @@ -174,7 +181,7 @@ static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); -static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, +static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline int nxt_unit_close(int fd); static int nxt_unit_fd_blocking(int fd); @@ -311,8 +318,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_read_buf_t */ nxt_queue_t free_rbuf; - int online; - int ready; + uint8_t online; /* 1 bit */ + uint8_t ready; /* 1 bit */ + uint8_t quit_param; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -344,9 +352,11 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; nxt_atomic_t use_count; + nxt_atomic_t request_count; uint32_t request_data_size; uint32_t shm_mmap_limit; + uint32_t request_limit; pthread_mutex_t mutex; @@ -414,7 +424,7 @@ nxt_unit_init(nxt_unit_init_t *init) { int rc, queue_fd; void *mem; - uint32_t ready_stream, shm_limit; + uint32_t ready_stream, shm_limit, request_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, router_port, read_port; @@ -446,13 +456,15 @@ nxt_unit_init(nxt_unit_init_t *init) } else { rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, - &lib->log_fd, &ready_stream, &shm_limit); + &lib->log_fd, &ready_stream, &shm_limit, + &request_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = request_limit; } if (nxt_slow_path(lib->shm_mmap_limit < 1)) { @@ -564,6 +576,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->request_data_size = init->request_data_size; lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = init->request_limit; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -573,6 +586,7 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); lib->use_count = 0; + lib->request_count = 0; lib->router_port = NULL; lib->shared_port = NULL; @@ -632,6 +646,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->wait_items = 0; ctx_impl->online = 1; ctx_impl->ready = 0; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -780,7 +795,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit) + uint32_t *shm_limit, uint32_t *request_limit) { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; @@ -825,12 +840,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d,%d;" - "%d,%"PRIu32, + "%d,%"PRIu32",%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_in_fd, &read_out_fd, - log_fd, shm_limit); + log_fd, shm_limit, request_limit); if (nxt_slow_path(rc == EOF)) { nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", @@ -839,9 +854,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - if (nxt_slow_path(rc != 13)) { + if (nxt_slow_path(rc != 14)) { nxt_unit_alert(NULL, "invalid number of variables in %s env: " - "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars); return NXT_UNIT_ERROR; } @@ -929,6 +944,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, { int rc; pid_t pid; + uint8_t quit_param; struct cmsghdr *cm; nxt_port_msg_t *port_msg; nxt_unit_impl_t *lib; @@ -959,7 +975,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, if (nxt_slow_path(rbuf->size == 0)) { nxt_unit_debug(ctx, "read port closed"); - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); rc = NXT_UNIT_OK; goto done; } @@ -1018,9 +1034,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, break; case _NXT_PORT_MSG_QUIT: - nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); + if (recv_msg.size == sizeof(quit_param)) { + memcpy(&quit_param, recv_msg.start, sizeof(quit_param)); + + } else { + quit_param = NXT_QUIT_NORMAL; + } + + nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, + (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : "")); + + nxt_unit_quit(ctx, quit_param); - nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -1220,15 +1245,36 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + if (nxt_slow_path(ctx_impl->ready)) { + return NXT_UNIT_OK; + } + ctx_impl->ready = 1; - if (lib->callbacks.ready_handler) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + /* Call ready_handler() only for main context. */ + if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { return lib->callbacks.ready_handler(ctx); } + if (&lib->main_ctx != ctx_impl) { + /* Check if the main context is already stopped or quit. */ + if (nxt_slow_path(!lib->main_ctx.ready)) { + ctx_impl->ready = 0; + + nxt_unit_quit(ctx, lib->main_ctx.quit_param); + + return NXT_UNIT_OK; + } + + if (lib->callbacks.add_port != NULL) { + lib->callbacks.add_port(ctx, lib->shared_port); + } + } + return NXT_UNIT_OK; } @@ -1741,10 +1787,12 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) static void nxt_unit_request_info_release(nxt_unit_request_info_t *req) { + nxt_unit_ctx_t *ctx; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + ctx = req->ctx; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); req->response = NULL; @@ -1783,6 +1831,10 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); pthread_mutex_unlock(&ctx_impl->mutex); + + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } } @@ -4522,7 +4574,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) rc = nxt_unit_run_once_impl(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_NORMAL); break; } } @@ -4586,6 +4638,7 @@ static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int nevents, res, err; + nxt_uint_t nfds; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_port_impl_t *port_impl; @@ -4593,7 +4646,7 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { + if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) { return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } @@ -4626,20 +4679,28 @@ retry: } } - res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); - if (res == NXT_UNIT_OK) { - return NXT_UNIT_OK; + if (nxt_fast_path(nxt_unit_chk_ready(ctx))) { + res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + + nfds = 2; + + } else { + nfds = 1; } fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; - fds[1].fd = lib->shared_port->in_fd; - fds[1].events = POLLIN; fds[1].revents = 0; - nevents = poll(fds, 2, -1); + nevents = poll(fds, nfds, -1); if (nxt_slow_path(nevents == -1)) { err = errno; @@ -4686,6 +4747,21 @@ retry: static int +nxt_unit_chk_ready(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + return (ctx_impl->ready + && (lib->request_limit == 0 + || lib->request_count < lib->request_limit)); +} + + +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { int rc; @@ -4723,6 +4799,10 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + if (!ctx_impl->ready) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } + return rc; } @@ -4903,16 +4983,14 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); rc = NXT_UNIT_OK; - while (nxt_fast_path(ctx_impl->online)) { + while (nxt_fast_path(nxt_unit_chk_ready(ctx))) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4949,17 +5027,15 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_t *req; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req = NULL; - if (nxt_slow_path(!ctx_impl->online)) { + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { goto done; } @@ -4968,7 +5044,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) goto done; } - rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); goto done; @@ -4985,17 +5061,6 @@ done: int -nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) -{ - nxt_unit_impl_t *lib; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return (ctx == &lib->main_ctx.ctx); -} - - -int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -5017,13 +5082,17 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { + return NXT_UNIT_AGAIN; + } + rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { return NXT_UNIT_ERROR; } - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5194,7 +5263,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) pthread_mutex_unlock(&lib->mutex); if (nxt_fast_path(ctx_impl->read_port != NULL)) { - nxt_unit_remove_port(lib, &ctx_impl->read_port->id); + nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -5605,7 +5674,8 @@ nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) static void -nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_id_t *port_id) { nxt_unit_port_t *port; nxt_unit_port_impl_t *port_impl; @@ -5623,7 +5693,7 @@ nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.remove_port != NULL && port != NULL) { - lib->callbacks.remove_port(&lib->unit, port); + lib->callbacks.remove_port(&lib->unit, ctx, port); } if (nxt_fast_path(port != NULL)) { @@ -5700,7 +5770,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) nxt_queue_remove(&port->link); if (lib->callbacks.remove_port != NULL) { - lib->callbacks.remove_port(&lib->unit, &port->port); + lib->callbacks.remove_port(&lib->unit, NULL, &port->port); } nxt_unit_port_release(&port->port); @@ -5712,56 +5782,96 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void -nxt_unit_quit(nxt_unit_ctx_t *ctx) +nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param) { - nxt_port_msg_t msg; + nxt_bool_t skip_graceful_broadcast, quit; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (!ctx_impl->online) { + nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready, + ctx_impl->online); + + if (nxt_slow_path(!ctx_impl->online)) { return; } - ctx_impl->online = 0; + skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL + && !ctx_impl->ready; cb = &lib->callbacks; - if (cb->quit != NULL) { - cb->quit(ctx); + if (nxt_fast_path(ctx_impl->ready)) { + ctx_impl->ready = 0; + + if (cb->remove_port != NULL) { + cb->remove_port(&lib->unit, ctx, lib->shared_port); + } } - nxt_queue_each(req_impl, &ctx_impl->active_req, - nxt_unit_request_info_impl_t, link) - { - req = &req_impl->req; + if (quit_param == NXT_QUIT_GRACEFUL) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_req_warn(req, "active request on ctx quit"); + quit = nxt_queue_is_empty(&ctx_impl->active_req) + && nxt_queue_is_empty(&ctx_impl->pending_rbuf) + && ctx_impl->wait_items == 0; - if (cb->close_handler) { - nxt_unit_req_debug(req, "close_handler"); + pthread_mutex_unlock(&ctx_impl->mutex); - cb->close_handler(req); + } else { + quit = 1; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; + } - } else { - nxt_unit_request_done(req, NXT_UNIT_ERROR); + if (quit) { + ctx_impl->online = 0; + + if (cb->quit != NULL) { + cb->quit(ctx); } - } nxt_queue_loop; + nxt_queue_each(req_impl, &ctx_impl->active_req, + nxt_unit_request_info_impl_t, link) + { + req = &req_impl->req; - if (ctx != &lib->main_ctx.ctx) { + nxt_unit_req_warn(req, "active request on ctx quit"); + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + } nxt_queue_loop; + + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); + } + } + + if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { return; } - memset(&msg, 0, sizeof(nxt_port_msg_t)); + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); - msg.pid = lib->pid; - msg.type = _NXT_PORT_MSG_QUIT; + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = quit_param; pthread_mutex_lock(&lib->mutex); @@ -5775,7 +5885,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } (void) nxt_unit_port_send(ctx, ctx_impl->read_port, - &msg, sizeof(msg), NULL, 0); + &m, sizeof(m), NULL, 0); } nxt_queue_loop; @@ -6089,7 +6199,11 @@ nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, retry: - res = nxt_unit_app_queue_recv(port, rbuf); + res = nxt_unit_app_queue_recv(ctx, port, rbuf); + + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } if (res == NXT_UNIT_AGAIN) { res = nxt_unit_port_recv(ctx, port, rbuf); @@ -6194,13 +6308,20 @@ nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) static int -nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) { uint32_t cookie; nxt_port_msg_t *port_msg; nxt_app_queue_t *queue; + nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); queue = port_impl->queue; @@ -6214,6 +6335,25 @@ retry: port_msg = (nxt_port_msg_t *) rbuf->buf; if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->request_limit != 0) { + nxt_atomic_fetch_add(&lib->request_count, 1); + + if (nxt_slow_path(lib->request_count >= lib->request_limit)) { + nxt_unit_debug(ctx, "request limit reached"); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = NXT_QUIT_GRACEFUL; + + (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, + &m, sizeof(m), NULL, 0); + } + } + return NXT_UNIT_OK; } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 1e1a8dbe..484b7d56 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -136,7 +136,8 @@ struct nxt_unit_callbacks_s { int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); /* Remove previously added port. Optional. */ - void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); + void (*remove_port)(nxt_unit_t *, nxt_unit_ctx_t *, + nxt_unit_port_t *port); /* Remove all data associated with process pid including ports. Optional. */ void (*remove_pid)(nxt_unit_t *, pid_t pid); @@ -167,6 +168,7 @@ struct nxt_unit_init_s { uint32_t request_data_size; uint32_t shm_limit; + uint32_t request_limit; nxt_unit_callbacks_t callbacks; @@ -215,8 +217,6 @@ int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx); -int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx); - /* * Receive and process one message, invoke configured callbacks. * diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index 5df1465d..02555c96 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -1184,13 +1184,12 @@ nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) goto fail; } - nxt_unit_default_init(task, &perl_init); + nxt_unit_default_init(task, &perl_init, common_conf); perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler; perl_init.data = c; perl_init.ctx_data = &pctx; - perl_init.shm_limit = common_conf->shm_limit; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { @@ -1292,11 +1291,6 @@ nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx) nxt_perl_app_conf_t *c; nxt_perl_psgi_ctx_t *pctx; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index abb04194..8687c869 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -230,10 +230,9 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) } } - nxt_unit_default_init(task, &python_init); + nxt_unit_default_init(task, &python_init, data->app); python_init.data = c; - python_init.shm_limit = data->app->shm_limit; python_init.callbacks.ready_handler = nxt_python_ready_handler; proto = c->protocol; @@ -522,18 +521,6 @@ nxt_python_ready_handler(nxt_unit_ctx_t *ctx) nxt_py_thread_info_t *ti; nxt_python_app_conf_t *c; - if (nxt_py_proto.ready != NULL) { - res = nxt_py_proto.ready(ctx); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return NXT_UNIT_ERROR; - } - } - - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h index e4eac9dc..eddb1cfc 100644 --- a/src/python/nxt_python.h +++ b/src/python/nxt_python.h @@ -64,7 +64,6 @@ typedef struct { void (*ctx_data_free)(void *data); int (*startup)(void *data); int (*run)(nxt_unit_ctx_t *ctx); - int (*ready)(nxt_unit_ctx_t *ctx); void (*done)(void); } nxt_python_proto_t; diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 8bf43d94..354e3a81 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -33,11 +33,10 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); -static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); - static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); -static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); +static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); @@ -45,7 +44,6 @@ static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); static void nxt_python_asgi_done(void); static PyObject *nxt_py_port_read; -static nxt_unit_port_t *nxt_py_shared_port; static PyMethodDef nxt_py_port_read_method = {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; @@ -55,7 +53,6 @@ static nxt_python_proto_t nxt_py_asgi_proto = { .ctx_data_free = nxt_python_asgi_ctx_data_free, .startup = nxt_python_asgi_startup, .run = nxt_python_asgi_run, - .ready = nxt_python_asgi_ready, .done = nxt_python_asgi_done, }; @@ -362,13 +359,6 @@ nxt_python_asgi_run(nxt_unit_ctx_t *ctx) Py_DECREF(res); - nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); - nxt_py_asgi_remove_reader(ctx, ctx_data->port); - - if (ctx_data->port != NULL) { - ctx_data->port = NULL; - } - nxt_py_asgi_lifespan_shutdown(ctx); return NXT_UNIT_OK; @@ -892,27 +882,9 @@ fail: static int -nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) -{ - nxt_unit_port_t *port; - - if (nxt_slow_path(nxt_py_shared_port == NULL)) { - return NXT_UNIT_ERROR; - } - - port = nxt_py_shared_port; - - nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); - - return nxt_py_asgi_add_reader(ctx, port); -} - - -static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int nb; - nxt_py_asgi_ctx_data_t *ctx_data; + int nb; if (port->in_fd == -1) { return NXT_UNIT_OK; @@ -929,16 +901,6 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); - if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { - nxt_py_shared_port = port; - - return NXT_UNIT_OK; - } - - ctx_data = ctx->data; - - ctx_data->port = port; - return nxt_py_asgi_add_reader(ctx, port); } @@ -1008,17 +970,16 @@ clean_fd: static void -nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) +nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port) { - if (port->in_fd == -1) { + if (port->in_fd == -1 || ctx == NULL) { return; } nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - if (nxt_py_shared_port == port) { - nxt_py_shared_port = NULL; - } + nxt_py_asgi_remove_reader(ctx, port); } @@ -1032,27 +993,6 @@ nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) ctx_data = ctx->data; - if (nxt_py_shared_port != NULL) { - p = PyLong_FromLong(nxt_py_shared_port->in_fd); - if (nxt_slow_path(p == NULL)) { - nxt_unit_alert(NULL, "Python failed to create Long"); - nxt_python_print_exception(); - - } else { - res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, - p, NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(NULL, "Python failed to remove_reader"); - nxt_python_print_exception(); - - } else { - Py_DECREF(res); - } - - Py_DECREF(p); - } - } - p = PyLong_FromLong(0); if (nxt_slow_path(p == NULL)) { nxt_unit_alert(NULL, "Python failed to create Long"); diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h index 20702065..94478a36 100644 --- a/src/python/nxt_python_asgi.h +++ b/src/python/nxt_python_asgi.h @@ -34,7 +34,6 @@ typedef struct { PyObject *quit_future; PyObject *quit_future_set_result; PyObject **target_lifespans; - nxt_unit_port_t *port; } nxt_py_asgi_ctx_data_t; PyObject *nxt_py_asgi_enum_headers(PyObject *headers, diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index c4a77d53..05c0da4f 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -636,9 +636,11 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) nxt_unit_req_debug(req, "asgi_http_close_handler"); - http->closed = 1; + if (nxt_fast_path(http != NULL)) { + http->closed = 1; - nxt_py_asgi_http_emit_disconnect(http); + nxt_py_asgi_http_emit_disconnect(http); + } } diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c index fc7d9fa4..ab1d0324 100644 --- a/src/python/nxt_python_asgi_websocket.c +++ b/src/python/nxt_python_asgi_websocket.c @@ -980,6 +980,10 @@ nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) nxt_unit_req_debug(req, "asgi_websocket_close_handler"); + if (nxt_slow_path(ws == NULL)) { + return; + } + if (ws->receive_future == NULL) { ws->state = NXT_WS_DISCONNECTED; diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 522869b5..62498127 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -357,11 +357,10 @@ nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data) goto fail; } - nxt_unit_default_init(task, &ruby_unit_init); + nxt_unit_default_init(task, &ruby_unit_init, conf); ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler; - ruby_unit_init.shm_limit = conf->shm_limit; ruby_unit_init.data = c; ruby_unit_init.ctx_data = &ruby_ctx; @@ -1258,11 +1257,6 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) nxt_ruby_ctx_t *rctx; nxt_ruby_app_conf_t *c; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/test/nxt_unit_app_test.c b/src/test/nxt_unit_app_test.c index a5f3728c..8fda9740 100644 --- a/src/test/nxt_unit_app_test.c +++ b/src/test/nxt_unit_app_test.c @@ -97,7 +97,7 @@ ready_handler(nxt_unit_ctx_t *ctx) nxt_unit_debug(ctx, "ready"); - if (!nxt_unit_is_main_ctx(ctx) || thread_count <= 1) { + if (thread_count <= 1) { return NXT_UNIT_OK; } |