diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-03-09 18:03:27 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-03-09 18:03:27 +0300 |
commit | 6f2c9acd1841ca20a1388b34aef64e9f00459090 (patch) | |
tree | c0b9c1063ec464027d1ca29a793f6c0b7a6878d5 | |
parent | 5745e4826427155e29c1d520fe77811a0f570689 (diff) | |
download | unit-6f2c9acd1841ca20a1388b34aef64e9f00459090.tar.gz unit-6f2c9acd1841ca20a1388b34aef64e9f00459090.tar.bz2 |
Processes refactoring.
The cycle has been renamed to the runtime.
47 files changed, 2594 insertions, 2675 deletions
@@ -237,7 +237,7 @@ fi cat << END > Makefile -all: libnxt $NXT_BIN +all: $NXT_BIN libnxt: make -f $NXT_MAKEFILE libnxt diff --git a/auto/modules/conf b/auto/modules/conf index 141bd098..f060bdeb 100644 --- a/auto/modules/conf +++ b/auto/modules/conf @@ -18,7 +18,7 @@ NXT_MODULES_SRC=$NXT_BUILD_DIR/nxt_modules.c cat << END > $NXT_MODULES_SRC #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> END @@ -26,7 +26,7 @@ END for nxt_init in $NXT_MODULES_INIT do - $echo "extern nxt_int_t $nxt_init(nxt_thread_t *thr, nxt_cycle_t *cycle);" \ + $echo "extern nxt_int_t $nxt_init(nxt_thread_t *thr, nxt_runtime_t *rt);" \ >> $NXT_MODULES_SRC done diff --git a/auto/sources b/auto/sources index af9a21f4..5fb97779 100644 --- a/auto/sources +++ b/auto/sources @@ -82,6 +82,7 @@ NXT_LIB_SRCS=" \ src/nxt_process_title.c \ src/nxt_signal.c \ src/nxt_port_socket.c \ + src/nxt_port.c \ src/nxt_dyld.c \ src/nxt_random.c \ src/nxt_queue.c \ @@ -320,7 +321,7 @@ if [ $NXT_LIB_UNIT_TEST = YES ]; then fi NXT_DEPS=" \ - src/nxt_cycle.h \ + src/nxt_runtime.h \ src/nxt_application.h \ src/nxt_master_process.h \ " @@ -328,10 +329,11 @@ NXT_DEPS=" \ NXT_SRCS=" \ src/nxt_main.c \ src/nxt_app_log.c \ - src/nxt_cycle.c \ - src/nxt_port.c \ - src/nxt_application.c \ + src/nxt_runtime.c \ src/nxt_stream_module.c \ src/nxt_master_process.c \ src/nxt_worker_process.c \ + src/nxt_controller.c \ + src/nxt_router.c \ + src/nxt_application.c \ " diff --git a/src/nxt_app_log.c b/src/nxt_app_log.c index 8aedfacb..0519c018 100644 --- a/src/nxt_app_log.c +++ b/src/nxt_app_log.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> static nxt_time_string_t nxt_log_error_time_cache; diff --git a/src/nxt_application.c b/src/nxt_application.c index e0e5ffc7..ed661ce7 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -6,13 +6,14 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_application.h> #define NXT_PARSE_AGAIN (u_char *) -1 +static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_app_thread(void *ctx); static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, nxt_log_t *log); @@ -77,11 +78,15 @@ static nxt_uint_t nxt_app_buf_max_number = 16; nxt_int_t -nxt_app_start(nxt_cycle_t *cycle) +nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) { nxt_thread_link_t *link; nxt_thread_handle_t handle; + if (nxt_app_listen_socket(task, rt) != NXT_OK) { + return NXT_ERROR; + } + if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { return NXT_ERROR; } @@ -94,7 +99,7 @@ nxt_app_start(nxt_cycle_t *cycle) if (nxt_fast_path(link != NULL)) { link->start = nxt_app_thread; - link->data = cycle; + link->data = rt; return nxt_thread_create(&handle, link); } @@ -103,6 +108,39 @@ nxt_app_start(nxt_cycle_t *cycle) } +static nxt_int_t +nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_sockaddr_t *sa; + nxt_listen_socket_t *ls; + + sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (sa == NULL) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons(8080); + + nxt_sockaddr_text(sa); + + ls = nxt_runtime_listen_socket_add(rt, sa); + if (ls == NULL) { + return NXT_ERROR; + } + + ls->read_after_accept = 1; + + if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { + return NXT_ERROR; + } + + return NXT_OK; +} + + #define SIZE 4096 static void @@ -110,9 +148,9 @@ nxt_app_thread(void *ctx) { ssize_t n; nxt_err_t err; - nxt_cycle_t *cycle; nxt_socket_t s; nxt_thread_t *thr; + nxt_runtime_t *rt; nxt_app_request_t *r; nxt_event_engine_t **engines; nxt_listen_socket_t *ls; @@ -124,8 +162,8 @@ nxt_app_thread(void *ctx) nxt_log_debug(thr->log, "app thread"); - cycle = ctx; - engines = cycle->engines->elts; + rt = ctx; + engines = rt->engines->elts; nxt_app_engine = engines[0]; @@ -138,7 +176,7 @@ nxt_app_thread(void *ctx) nxt_log_debug(thr->log, "application init failed"); } - ls = cycle->listen_sockets->elts; + ls = rt->listen_sockets->elts; for ( ;; ) { nxt_log_debug(thr->log, "wait on accept"); diff --git a/src/nxt_application.h b/src/nxt_application.h index efbf9c51..ebe45e87 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -53,7 +53,8 @@ typedef struct { extern nxt_application_module_t nxt_python_module; -nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len); +nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, + size_t len); nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len); diff --git a/src/nxt_controller.c b/src/nxt_controller.c new file mode 100644 index 00000000..d5092d57 --- /dev/null +++ b/src/nxt_controller.c @@ -0,0 +1,254 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_runtime.h> +#include <nxt_master_process.h> + + +static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); +static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c, + uintptr_t data); +static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); +static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); + + +static const nxt_event_conn_state_t nxt_controller_conn_read_state; +static const nxt_event_conn_state_t nxt_controller_conn_close_state; + + +nxt_int_t +nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) +{ + if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_sockaddr_t *sa; + nxt_listen_socket_t *ls; + + sa = rt->controller_listen; + + if (rt->controller_listen == NULL) { + sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (sa == NULL) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons(8443); + + nxt_sockaddr_text(sa); + + rt->controller_listen = sa; + } + + ls = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); + if (ls == NULL) { + return NXT_ERROR; + } + + ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, + sa->socklen, sa->length); + if (ls->sockaddr == NULL) { + return NXT_ERROR; + } + + ls->sockaddr->type = sa->type; + + nxt_sockaddr_text(ls->sockaddr); + + ls->socket = -1; + ls->backlog = NXT_LISTEN_BACKLOG; + ls->read_after_accept = 1; + ls->flags = NXT_NONBLOCK; + +#if 0 + /* STUB */ + wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); + if (wq == NULL) { + return NXT_ERROR; + } + nxt_work_queue_name(wq, "listen"); + /**/ + + ls->work_queue = wq; +#endif + ls->handler = nxt_controller_conn_init; + + /* + * Connection memory pool chunk size is tunned to + * allocate the most data in one mem_pool chunk. + */ + ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { + return NXT_ERROR; + } + + rt->controller_socket = ls; + + return NXT_OK; +} + + +static void +nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + + c = obj; + + nxt_debug(task, "controller conn init fd:%d", c->socket.fd); + + b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); + if (nxt_slow_path(b == NULL)) { + nxt_controller_conn_free(task, c, NULL); + return; + } + + c->read = b; + c->socket.read_ready = 1; + c->read_state = &nxt_controller_conn_read_state; + + engine = task->thread->engine; + c->read_work_queue = &engine->read_work_queue; + + nxt_event_conn_read(engine, c); +} + + +static const nxt_event_conn_state_t nxt_controller_conn_read_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_NO_AUTORESET, + + nxt_controller_conn_read, + nxt_controller_conn_close, + nxt_controller_conn_read_error, + + nxt_controller_conn_read_timeout, + nxt_controller_conn_timeout_value, + 60 * 1000, +}; + + +static void +nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn read"); + + nxt_controller_conn_close(task, c, c->socket.data); +} + + +static nxt_msec_t +nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +{ + return (nxt_msec_t) data; +} + + +static void +nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn read error"); + + nxt_controller_conn_close(task, c, c->socket.data); +} + + +static void +nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *ev; + nxt_event_conn_t *c; + + ev = obj; + + c = nxt_event_read_timer_conn(ev); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "controller conn read timeout"); + + nxt_controller_conn_close(task, c, c->socket.data); +} + + +static const nxt_event_conn_state_t nxt_controller_conn_close_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_NO_AUTORESET, + + nxt_controller_conn_free, + NULL, + NULL, + + NULL, + NULL, + 0, +}; + + +static void +nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn close"); + + c->write_state = &nxt_controller_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static void +nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn free"); + + nxt_mem_pool_destroy(c->mem_pool); + + //nxt_free(c); +} diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c deleted file mode 100644 index e95421d2..00000000 --- a/src/nxt_cycle.c +++ /dev/null @@ -1,1791 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) Valentin V. Bartenev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> -#include <nxt_cycle.h> -#include <nxt_port.h> -#include <nxt_master_process.h> - - -static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_task_t *task, - nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_task_t *task, - nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_cycle_start(nxt_task_t *task, void *obj, void *data); -static void nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle); -static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task); -static void nxt_cycle_exit(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_cycle_event_engine_change(nxt_thread_t *thr, - nxt_task_t *task, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_conf_init(nxt_thread_t *thr, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle); -static nxt_sockaddr_t *nxt_cycle_sockaddr_parse(nxt_str_t *addr, - nxt_mem_pool_t *mp, nxt_log_t *log); -static nxt_sockaddr_t *nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, - nxt_mem_pool_t *mp, nxt_log_t *log); -static nxt_sockaddr_t *nxt_cycle_sockaddr_inet6_parse(nxt_str_t *addr, - nxt_mem_pool_t *mp, nxt_log_t *log); -static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, - nxt_mem_pool_t *mp, nxt_log_t *log); -static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_app_listen_socket(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_log_files_create(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_listen_sockets_create(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_cycle_listen_sockets_close(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_cycle_pid_file_delete(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone); - -#if (NXT_THREADS) -static void nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, - nxt_task_t *task, nxt_cycle_t *cycle, nxt_cycle_cont_t cont); -#endif - - -nxt_thread_declare_data(nxt_cycle_t *, nxt_thread_cycle_data); - - -nxt_int_t -nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous, - nxt_cycle_cont_t start) -{ - nxt_int_t ret; - nxt_cycle_t *cycle; - nxt_array_t *listen_sockets; - nxt_mem_pool_t *mp; - static nxt_str_t upstream_zone = nxt_string("upstream_zone"); - - mp = nxt_mem_pool_create(1024); - - if (nxt_slow_path(mp == NULL)) { - return NXT_ERROR; - } - - /* This alloction cannot fail. */ - cycle = nxt_mem_zalloc(mp, sizeof(nxt_cycle_t)); - - cycle->mem_pool = mp; - cycle->previous = previous; - - if (previous == NULL) { - cycle->prefix = nxt_current_directory(mp); - - } else { - cycle->type = previous->type; - cycle->prefix = nxt_str_dup(mp, NULL, previous->prefix); - } - - if (nxt_slow_path(cycle->prefix == NULL)) { - goto fail; - } - - cycle->conf_prefix = cycle->prefix; - - cycle->services = nxt_services_init(mp); - if (nxt_slow_path(cycle->services == NULL)) { - goto fail; - } - - listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); - if (nxt_slow_path(listen_sockets == NULL)) { - goto fail; - } - - cycle->listen_sockets = listen_sockets; - - if (previous == NULL) { - ret = nxt_cycle_inherited_listen_sockets(task, cycle); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } - - if (nxt_slow_path(nxt_cycle_hostname(thr, cycle) != NXT_OK)) { - goto fail; - } - - if (nxt_slow_path(nxt_cycle_log_files_init(cycle) != NXT_OK)) { - goto fail; - } - - if (nxt_slow_path(nxt_cycle_event_engines(thr, cycle) != NXT_OK)) { - goto fail; - } - - if (nxt_slow_path(nxt_cycle_processes(cycle) != NXT_OK)) { - goto fail; - } - - if (nxt_slow_path(nxt_cycle_thread_pools(thr, cycle) != NXT_OK)) { - goto fail; - } - - ret = nxt_cycle_shm_zone_add(cycle, &upstream_zone, 1024 * 1024, 8192); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - /* Cycle shm zones array is created on demand. */ - - if (previous != NULL) { - previous->reconfiguring = 1; - cycle->start = start; - - } else { - nxt_thread_init_data(nxt_thread_cycle_data); - nxt_thread_cycle_set(cycle); - - cycle->start = nxt_cycle_initial_start; - } - - nxt_log_debug(thr->log, "new cycle: %p", cycle); - - nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_start, - task, cycle, NULL); - - return NXT_OK; - -fail: - - nxt_mem_pool_destroy(mp); - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) -{ - u_char *v, *p; - nxt_int_t type; - nxt_array_t *inherited_sockets; - nxt_socket_t s; - nxt_listen_socket_t *ls; - - v = (u_char *) getenv("NGINX"); - - if (v == NULL) { - return nxt_cycle_systemd_listen_sockets(task, cycle); - } - - nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); - - inherited_sockets = nxt_array_create(cycle->mem_pool, - 1, sizeof(nxt_listen_socket_t)); - if (inherited_sockets == NULL) { - return NXT_ERROR; - } - - cycle->inherited_sockets = inherited_sockets; - - for (p = v; *p != '\0'; p++) { - - if (*p == ';') { - s = nxt_int_parse(v, p - v); - - if (nxt_slow_path(s < 0)) { - nxt_log(task, NXT_LOG_CRIT, "invalid socket number " - "\"%s\" in NGINX environment variable, " - "ignoring the rest of the variable", v); - return NXT_ERROR; - } - - v = p + 1; - - ls = nxt_array_zero_add(inherited_sockets); - if (nxt_slow_path(ls == NULL)) { - return NXT_ERROR; - } - - ls->socket = s; - - ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); - if (nxt_slow_path(ls->sockaddr == NULL)) { - return NXT_ERROR; - } - - type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); - if (nxt_slow_path(type == -1)) { - return NXT_ERROR; - } - - ls->sockaddr->type = (uint16_t) type; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) -{ - u_char *nfd, *pid; - nxt_int_t n; - nxt_array_t *inherited_sockets; - nxt_socket_t s; - nxt_listen_socket_t *ls; - - /* - * Number of listening sockets passed. The socket - * descriptors start from number 3 and are sequential. - */ - nfd = (u_char *) getenv("LISTEN_FDS"); - if (nfd == NULL) { - return NXT_OK; - } - - /* The pid of the service process. */ - pid = (u_char *) getenv("LISTEN_PID"); - if (pid == NULL) { - return NXT_OK; - } - - n = nxt_int_parse(nfd, nxt_strlen(nfd)); - if (n < 0) { - return NXT_OK; - } - - if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { - return NXT_OK; - } - - nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); - - inherited_sockets = nxt_array_create(cycle->mem_pool, - n, sizeof(nxt_listen_socket_t)); - if (inherited_sockets == NULL) { - return NXT_ERROR; - } - - cycle->inherited_sockets = inherited_sockets; - - for (s = 3; s < n; s++) { - ls = nxt_array_zero_add(inherited_sockets); - if (nxt_slow_path(ls == NULL)) { - return NXT_ERROR; - } - - ls->socket = s; - - ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); - if (nxt_slow_path(ls->sockaddr == NULL)) { - return NXT_ERROR; - } - - ls->sockaddr->type = SOCK_STREAM; - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ - nxt_event_engine_t *engine, **e, **engines; - const nxt_event_interface_t *interface; - - cycle->engines = nxt_array_create(cycle->mem_pool, 1, - sizeof(nxt_event_engine_t *)); - - if (nxt_slow_path(cycle->engines == NULL)) { - return NXT_ERROR; - } - - e = nxt_array_add(cycle->engines); - if (nxt_slow_path(e == NULL)) { - return NXT_ERROR; - } - - if (cycle->previous != NULL) { - /* Event engines are not allocated in memory pool. */ - engines = cycle->previous->engines->elts; - *e = engines[0]; - - } else { - interface = nxt_service_get(cycle->services, "engine", NULL); - - if (nxt_slow_path(interface == NULL)) { - /* TODO: log */ - return NXT_ERROR; - } - - engine = nxt_event_engine_create(thr, interface, - nxt_master_process_signals, 0, 0); - - if (nxt_slow_path(engine == NULL)) { - return NXT_ERROR; - } - - engine->id = cycle->last_engine_id++; - *e = engine; - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_processes(nxt_cycle_t *cycle) -{ - nxt_uint_t n; - nxt_port_t *port, *prev; - - /* - * Preallocate double number of previous cycle - * process slots or 2 process slots for initial cycle. - */ - n = (cycle->previous != NULL) ? cycle->previous->ports->nelts : 1; - - cycle->ports = nxt_array_create(cycle->mem_pool, 2 * n, sizeof(nxt_port_t)); - - if (nxt_slow_path(cycle->ports == NULL)) { - return NXT_ERROR; - } - - if (cycle->previous != NULL) { - cycle->process_generation = cycle->previous->process_generation; - - prev = cycle->previous->ports->elts; - - while (n != 0) { - port = nxt_array_add(cycle->ports); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - *port = *prev++; - n--; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ -#if (NXT_THREADS) - nxt_int_t ret; - nxt_array_t *thread_pools; - - thread_pools = nxt_array_create(cycle->mem_pool, 1, - sizeof(nxt_thread_pool_t *)); - - if (nxt_slow_path(thread_pools == NULL)) { - return NXT_ERROR; - } - - cycle->thread_pools = thread_pools; - - ret = nxt_cycle_thread_pool_create(thr, cycle, 2, 60000 * 1000000LL); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - -#endif - - return NXT_OK; -} - - -static void -nxt_cycle_start(nxt_task_t *task, void *obj, void *data) -{ - nxt_uint_t i; - nxt_cycle_t *cycle; - - cycle = obj; - - nxt_debug(task, "cycle conf done"); - - nxt_mem_pool_debug_lock(cycle->mem_pool, nxt_thread_tid(task->thread)); - - task->thread->log->ctx_handler = NULL; - task->thread->log->ctx = NULL; - - if (nxt_cycle_conf_init(task->thread, cycle) != NXT_OK) { - goto fail; - } - - for (i = 0; i < nxt_init_modules_n; i++) { - if (nxt_init_modules[i](task->thread, cycle) != NXT_OK) { - goto fail; - } - } - - if (nxt_cycle_conf_apply(task->thread, task, cycle) != NXT_OK) { - goto fail; - } - - nxt_thread_cycle_set(cycle); - -#if (NXT_THREADS) - - /* - * Thread pools should be destroyed before starting worker - * processes, because thread pool semaphores will stick in - * locked state in new processes after fork(). - */ - nxt_cycle_thread_pool_destroy(task->thread, task, cycle, cycle->start); - -#else - - cycle->start(task->thread, cycle); - -#endif - - return; - -fail: - - nxt_cycle_quit(task, cycle); -} - - -static void -nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_int_t ret; - nxt_thread_t *thr; - const nxt_event_interface_t *interface; - - thr = task->thread; - - if (cycle->inherited_sockets == NULL && cycle->daemon) { - - if (nxt_process_daemon() != NXT_OK) { - goto fail; - } - - /* - * An event engine should be updated after fork() - * even if an event facility was not changed because: - * 1) inherited kqueue descriptor is invalid, - * 2) the signal thread is not inherited. - */ - interface = nxt_service_get(cycle->services, "engine", cycle->engine); - if (interface == NULL) { - goto fail; - } - - ret = nxt_event_engine_change(thr, task, interface, cycle->batch); - if (ret != NXT_OK) { - goto fail; - } - } - - ret = nxt_cycle_pid_file_create(cycle->pid_file, cycle->test_config); - if (ret != NXT_OK) { - goto fail; - } - - if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) { - goto fail; - } - - thr->engine->max_connections = cycle->engine_connections; - - if (cycle->master_process) { - if (nxt_master_process_start(thr, task, cycle) != NXT_ERROR) { - return; - } - - } else { - nxt_single_process_start(thr, task, cycle); - return; - } - -fail: - - nxt_cycle_quit(task, cycle); -} - - -static void -nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle) -{ -#if (NXT_THREADS) - nxt_int_t ret; - - ret = nxt_cycle_thread_pool_create(thr, cycle, cycle->auxiliary_threads, - 60000 * 1000000LL); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_cycle_quit(task, cycle); - return; - } - -#endif - - cycle->type = NXT_PROCESS_SINGLE; - - nxt_cycle_listen_sockets_enable(task, cycle); - - return; -} - - -void -nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_bool_t done; - nxt_thread_t *thr; - - thr = task->thread; - - nxt_debug(task, "exiting"); - - if (cycle == NULL) { - cycle = nxt_thread_cycle(); - } - - done = 1; - - if (!thr->engine->shutdown) { - thr->engine->shutdown = 1; - -#if (NXT_THREADS) - - if (!nxt_array_is_empty(cycle->thread_pools)) { - nxt_cycle_thread_pool_destroy(thr, task, cycle, nxt_cycle_quit); - done = 0; - } - -#endif - - if (!cycle->test_config && cycle->type == NXT_PROCESS_MASTER) { - nxt_master_stop_worker_processes(task, cycle); - done = 0; - } - } - - nxt_cycle_close_idle_connections(thr, task); - - if (done) { - nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_exit, - task, cycle, NULL); - } -} - - -static void -nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task) -{ - nxt_queue_t *idle; - nxt_queue_link_t *link, *next; - nxt_event_conn_t *c; - - nxt_log_debug(thr->log, "close idle connections"); - - idle = &thr->engine->idle_connections; - - for (link = nxt_queue_head(idle); - link != nxt_queue_tail(idle); - link = next) - { - next = nxt_queue_next(link); - c = nxt_queue_link_data(link, nxt_event_conn_t, link); - - if (!c->socket.read_ready) { - nxt_queue_remove(link); - nxt_event_conn_close(thr->engine, c); - } - } -} - - -static void -nxt_cycle_exit(nxt_task_t *task, void *obj, void *data) -{ - nxt_cycle_t *cycle; - - cycle = obj; - -#if (NXT_THREADS) - - nxt_debug(task, "thread pools: %d", cycle->thread_pools->nelts); - - if (!nxt_array_is_empty(cycle->thread_pools)) { - return; - } - -#endif - - if (cycle->type <= NXT_PROCESS_MASTER) { - nxt_cycle_pid_file_delete(cycle); - } - - if (!task->thread->engine->event.signal_support) { - nxt_event_engine_signals_stop(task->thread->engine); - } - - nxt_debug(task, "exit"); - - exit(0); - nxt_unreachable(); -} - - -static nxt_int_t -nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle) -{ - const nxt_event_interface_t *interface; - - if (thr->engine->batch0 == cycle->batch - && nxt_strcmp(thr->engine->event.name, cycle->engine) == 0) - { - return NXT_OK; - } - - interface = nxt_service_get(cycle->services, "engine", cycle->engine); - if (interface != NULL) { - return nxt_event_engine_change(thr, task, interface, cycle->batch); - } - - return NXT_ERROR; -} - - -void -nxt_cycle_event_engine_free(nxt_cycle_t *cycle) -{ - nxt_event_engine_t *engine, **engines; - - engines = cycle->engines->elts; - engine = engines[0]; - nxt_array_remove(cycle->engines, &engines[0]); - - nxt_event_engine_free(engine); -} - - -#if (NXT_THREADS) - -static void nxt_cycle_thread_pool_init(void); -static void nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data); - - -nxt_int_t -nxt_cycle_thread_pool_create(nxt_thread_t *thr, nxt_cycle_t *cycle, - nxt_uint_t max_threads, nxt_nsec_t timeout) -{ - nxt_thread_pool_t *thread_pool, **tp; - - tp = nxt_array_add(cycle->thread_pools); - if (tp == NULL) { - return NXT_ERROR; - } - - thread_pool = nxt_thread_pool_create(max_threads, timeout, - nxt_cycle_thread_pool_init, - thr->engine, - nxt_cycle_thread_pool_exit); - - if (nxt_fast_path(thread_pool != NULL)) { - *tp = thread_pool; - } - - return NXT_OK; -} - - -static void -nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle, nxt_cycle_cont_t cont) -{ - nxt_uint_t n; - nxt_thread_pool_t **tp; - - cycle->continuation = cont; - - n = cycle->thread_pools->nelts; - - if (n == 0) { - cont(task, cycle); - return; - } - - tp = cycle->thread_pools->elts; - - do { - nxt_thread_pool_destroy(*tp); - - tp++; - n--; - } while (n != 0); -} - - -static void -nxt_cycle_thread_pool_init(void) -{ -#if (NXT_REGEX) - nxt_regex_init(0); -#endif -} - - -static void -nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data) -{ - nxt_uint_t i, n; - nxt_cycle_t *cycle; - nxt_thread_pool_t *tp, **thread_pools; - nxt_thread_handle_t handle; - - tp = obj; - - if (data != NULL) { - handle = (nxt_thread_handle_t) (uintptr_t) data; - nxt_thread_wait(handle); - } - - cycle = nxt_thread_cycle(); - - thread_pools = cycle->thread_pools->elts; - n = cycle->thread_pools->nelts; - - nxt_debug(task, "thread pools: %ui, cycle %p", n, cycle); - - for (i = 0; i < n; i++) { - - if (tp == thread_pools[i]) { - nxt_array_remove(cycle->thread_pools, &thread_pools[i]); - - if (n == 1) { - /* The last thread pool. */ - cycle->continuation(task, cycle); - } - - return; - } - } -} - -#endif - - -static nxt_int_t -nxt_cycle_conf_init(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ - nxt_int_t ret; - nxt_str_t *prefix; - nxt_file_t *file; - nxt_file_name_str_t file_name; - const nxt_event_interface_t *interface; - - cycle->daemon = 1; - cycle->master_process = 1; - cycle->engine_connections = 256; - cycle->worker_processes = 1; - cycle->auxiliary_threads = 2; - cycle->user_cred.user = "nobody"; - cycle->group = NULL; - cycle->pid = "nginext.pid"; - cycle->error_log = "error.log"; - - if (nxt_cycle_conf_read_cmd(thr, cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_user_cred_get(&cycle->user_cred, cycle->group) != NXT_OK) { - return NXT_ERROR; - } - - /* An engine's parameters. */ - - interface = nxt_service_get(cycle->services, "engine", cycle->engine); - if (interface == NULL) { - return NXT_ERROR; - } - - cycle->engine = interface->name; - - prefix = nxt_file_name_is_absolute(cycle->pid) ? NULL : cycle->prefix; - - ret = nxt_file_name_create(cycle->mem_pool, &file_name, "%V%s%Z", - prefix, cycle->pid); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - - cycle->pid_file = file_name.start; - - prefix = nxt_file_name_is_absolute(cycle->error_log) ? NULL : cycle->prefix; - - ret = nxt_file_name_create(cycle->mem_pool, &file_name, "%V%s%Z", - prefix, cycle->error_log); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - - file = nxt_list_first(cycle->log_files); - file->name = file_name.start; - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ - char *p, **argv; - nxt_int_t n; - nxt_str_t addr; - nxt_sockaddr_t *sa; - - argv = nxt_process_argv; - - while (*argv != NULL) { - p = *argv++; - - if (nxt_strcmp(p, "--listen") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--listen\""); - return NXT_ERROR; - } - - p = *argv++; - - addr.length = nxt_strlen(p); - addr.start = (u_char *) p; - - sa = nxt_cycle_sockaddr_parse(&addr, cycle->mem_pool, thr->log); - - if (sa == NULL) { - return NXT_ERROR; - } - - cycle->app_listen = sa; - - continue; - } - - if (nxt_strcmp(p, "--upstream") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, - "no argument for option \"--upstream\""); - return NXT_ERROR; - } - - p = *argv++; - - cycle->upstream.length = nxt_strlen(p); - cycle->upstream.start = (u_char *) p; - - continue; - } - - if (nxt_strcmp(p, "--workers") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--workers\""); - return NXT_ERROR; - } - - p = *argv++; - n = nxt_int_parse((u_char *) p, nxt_strlen(p)); - - if (n < 1) { - nxt_log_emerg(thr->log, "invalid number of workers: \"%s\"", p); - return NXT_ERROR; - } - - cycle->worker_processes = n; - - continue; - } - - if (nxt_strcmp(p, "--user") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--user\""); - return NXT_ERROR; - } - - p = *argv++; - - cycle->user_cred.user = p; - - continue; - } - - if (nxt_strcmp(p, "--group") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--group\""); - return NXT_ERROR; - } - - p = *argv++; - - cycle->group = p; - - continue; - } - - if (nxt_strcmp(p, "--pid") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--pid\""); - return NXT_ERROR; - } - - p = *argv++; - - cycle->pid = p; - - continue; - } - - if (nxt_strcmp(p, "--log") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, "no argument for option \"--log\""); - return NXT_ERROR; - } - - p = *argv++; - - cycle->error_log = p; - - continue; - } - - if (nxt_strcmp(p, "--no-daemonize") == 0) { - cycle->daemon = 0; - continue; - } - } - - return NXT_OK; -} - - -static nxt_sockaddr_t * -nxt_cycle_sockaddr_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, nxt_log_t *log) -{ - u_char *p; - size_t length; - - length = addr->length; - p = addr->start; - - if (length >= 5 && nxt_memcmp(p, (u_char *) "unix:", 5) == 0) { - return nxt_cycle_sockaddr_unix_parse(addr, mp, log); - } - - if (length != 0 && *p == '[') { - return nxt_cycle_sockaddr_inet6_parse(addr, mp, log); - } - - return nxt_cycle_sockaddr_inet_parse(addr, mp, log); -} - - -static nxt_sockaddr_t * -nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, - nxt_log_t *log) -{ -#if (NXT_HAVE_UNIX_DOMAIN) - u_char *p; - size_t length, socklen; - nxt_sockaddr_t *sa; - - /* - * Actual sockaddr_un length can be lesser or even larger than defined - * struct sockaddr_un length (see comment in unix/nxt_socket.h). So - * limit maximum Unix domain socket address length by defined sun_path[] - * length because some OSes accept addresses twice larger than defined - * struct sockaddr_un. Also reserve space for a trailing zero to avoid - * ambiguity, since many OSes accept Unix domain socket addresses - * without a trailing zero. - */ - const size_t max_len = sizeof(struct sockaddr_un) - - offsetof(struct sockaddr_un, sun_path) - 1; - - /* cutting "unix:" */ - length = addr->length - 5; - p = addr->start + 5; - - if (length == 0) { - nxt_log_emerg(log, "unix domain socket \"%V\" name is invalid", addr); - return NULL; - } - - if (length > max_len) { - nxt_log_emerg(log, "unix domain socket \"%V\" name is too long", addr); - return NULL; - } - - socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; - -#if (NXT_LINUX) - - /* - * Linux unix(7): - * - * abstract: an abstract socket address is distinguished by the fact - * that sun_path[0] is a null byte ('\0'). The socket's address in - * this namespace is given by the additional bytes in sun_path that - * are covered by the specified length of the address structure. - * (Null bytes in the name have no special significance.) - */ - if (p[0] == '@') { - p[0] = '\0'; - socklen--; - } - -#endif - - sa = nxt_sockaddr_alloc(mp, socklen, addr->length); - - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - sa->type = SOCK_STREAM; - - sa->u.sockaddr_un.sun_family = AF_UNIX; - nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); - - return sa; - -#else /* !(NXT_HAVE_UNIX_DOMAIN) */ - - nxt_log_emerg(log, "unix domain socket \"%V\" is not supported", addr); - - return NULL; - -#endif -} - - -static nxt_sockaddr_t * -nxt_cycle_sockaddr_inet6_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, - nxt_log_t *log) -{ -#if (NXT_INET6) - u_char *p, *addr, *addr_end; - size_t length; - nxt_int_t port; - nxt_mem_pool_t *mp; - nxt_sockaddr_t *sa; - struct in6_addr *in6_addr; - - length = addr->length - 1; - p = addr->start + 1; - - addr_end = nxt_memchr(p, ']', length); - - if (addr_end == NULL) { - goto invalid_address; - } - - sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6)); - - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - in6_addr = &sa->u.sockaddr_in6.sin6_addr; - - if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { - goto invalid_address; - } - - p = addr_end + 1; - length = (p + length) - p; - - if (length == 0) { - goto found; - } - - if (*p == ':') { - port = nxt_int_parse(p + 1, length - 1); - - if (port >= 1 && port <= 65535) { - goto found; - } - } - - nxt_log_emerg(log, "invalid port in \"%V\"", addr); - - return NULL; - -found: - - sa->type = SOCK_STREAM; - - sa->u.sockaddr_in6.sin6_family = AF_INET6; - sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); - - return sa; - -invalid_address: - - nxt_log_emerg(log, "invalid IPv6 address in \"%V\"", addr); - - return NULL; - -#else - - nxt_log_emerg(log, "IPv6 socket \"%V\" is not supported", addr); - - return NULL; - -#endif -} - - -static nxt_sockaddr_t * -nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, - nxt_log_t *log) -{ - u_char *p, *ip; - size_t length; - in_addr_t s_addr; - nxt_int_t port; - nxt_sockaddr_t *sa; - - s_addr = INADDR_ANY; - - length = addr->length; - ip = addr->start; - - p = nxt_memchr(ip, ':', length); - - if (p == NULL) { - - /* single value port, or address */ - - port = nxt_int_parse(ip, length); - - if (port > 0) { - /* "*:XX" */ - - if (port < 1 || port > 65535) { - goto invalid_port; - } - - } else { - /* "x.x.x.x" */ - - s_addr = nxt_inet_addr(ip, length); - - if (s_addr == INADDR_NONE) { - goto invalid_port; - } - - port = 8080; - } - - } else { - - /* x.x.x.x:XX */ - - p++; - length = (ip + length) - p; - port = nxt_int_parse(p, length); - - if (port < 1 || port > 65535) { - goto invalid_port; - } - - length = (p - 1) - ip; - - if (length != 1 || ip[0] != '*') { - s_addr = nxt_inet_addr(ip, length); - - if (s_addr == INADDR_NONE) { - goto invalid_addr; - } - - /* "x.x.x.x:XX" */ - } - } - - sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - sa->type = SOCK_STREAM; - - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons((in_port_t) port); - sa->u.sockaddr_in.sin_addr.s_addr = s_addr; - - return sa; - -invalid_port: - - nxt_log_emerg(log, "invalid port in \"%V\"", addr); - - return NULL; - -invalid_addr: - - nxt_log_emerg(log, "invalid address in \"%V\"", addr); - - return NULL; -} - - -static nxt_int_t -nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) -{ - if (nxt_cycle_log_files_create(cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_cycle_app_listen_socket(cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_cycle_listen_socket(cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_cycle_listen_sockets_create(task, cycle) != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_cycle_shm_zones_enable(cycle) != NXT_OK) { - return NXT_ERROR; - } - - nxt_cycle_listen_sockets_close(task, cycle); - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_listen_socket(nxt_cycle_t *cycle) -{ - nxt_sockaddr_t *sa; - nxt_listen_socket_t *ls; - - if (cycle->stream_listen == NULL) { - sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (sa == NULL) { - return NXT_ERROR; - } - - sa->type = SOCK_STREAM; - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8000); - - cycle->stream_listen = sa; - } - - nxt_sockaddr_text(cycle->stream_listen); - - ls = nxt_cycle_listen_socket_add(cycle, cycle->stream_listen); - if (ls == NULL) { - return NXT_ERROR; - } - - ls->read_after_accept = 1; - - ls->flags = NXT_NONBLOCK; - -#if 0 - /* STUB */ - wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); - if (wq == NULL) { - return NXT_ERROR; - } - nxt_work_queue_name(wq, "listen"); - /**/ - - ls->work_queue = wq; -#endif - ls->handler = nxt_stream_connection_init; - - /* - * Connection memory pool chunk size is tunned to - * allocate the most data in one mem_pool chunk. - */ - ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) - + 4 * sizeof(nxt_buf_t); - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_app_listen_socket(nxt_cycle_t *cycle) -{ - nxt_sockaddr_t *sa; -// nxt_work_queue_t *wq; - nxt_listen_socket_t *ls; - - if (cycle->app_listen == NULL) { - sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (sa == NULL) { - return NXT_ERROR; - } - - sa->type = SOCK_STREAM; - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8080); - - cycle->app_listen = sa; - } - - nxt_sockaddr_text(cycle->app_listen); - - ls = nxt_cycle_listen_socket_add(cycle, cycle->app_listen); - if (ls == NULL) { - return NXT_ERROR; - } - - ls->read_after_accept = 1; - - return NXT_OK; -} - - -nxt_listen_socket_t * -nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, nxt_sockaddr_t *sa) -{ - nxt_mem_pool_t *mp; - nxt_listen_socket_t *ls; - - ls = nxt_array_zero_add(cycle->listen_sockets); - if (ls == NULL) { - return NULL; - } - - mp = cycle->mem_pool; - - ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, - sa->length); - if (ls->sockaddr == NULL) { - return NULL; - } - - ls->sockaddr->type = sa->type; - - nxt_sockaddr_text(ls->sockaddr); - - ls->socket = -1; - ls->backlog = NXT_LISTEN_BACKLOG; - - return ls; -} - - -static nxt_int_t -nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ - size_t length; - char hostname[NXT_MAXHOSTNAMELEN + 1]; - - if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { - nxt_log_emerg(thr->log, "gethostname() failed %E", nxt_errno); - return NXT_ERROR; - } - - /* - * Linux gethostname(2): - * - * If the null-terminated hostname is too large to fit, - * then the name is truncated, and no error is returned. - * - * For this reason an additional byte is reserved in the buffer. - */ - hostname[NXT_MAXHOSTNAMELEN] = '\0'; - - length = nxt_strlen(hostname); - cycle->hostname.length = length; - - cycle->hostname.start = nxt_mem_nalloc(cycle->mem_pool, length); - - if (cycle->hostname.start != NULL) { - nxt_memcpy_lowcase(cycle->hostname.start, (u_char *) hostname, length); - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_cycle_log_files_init(nxt_cycle_t *cycle) -{ - nxt_uint_t n; - nxt_file_t *file; - nxt_list_t *log_files; - - n = (cycle->previous != NULL) ? nxt_list_nelts(cycle->previous->log_files): - 1; - - log_files = nxt_list_create(cycle->mem_pool, n, sizeof(nxt_file_t)); - - if (nxt_fast_path(log_files != NULL)) { - cycle->log_files = log_files; - - /* Preallocate the main error_log. This allocation cannot fail. */ - file = nxt_list_zero_add(log_files); - - file->fd = NXT_FILE_INVALID; - file->log_level = NXT_LOG_CRIT; - - return NXT_OK; - } - - return NXT_ERROR; -} - - -nxt_file_t * -nxt_cycle_log_file_add(nxt_cycle_t *cycle, nxt_str_t *name) -{ - nxt_int_t ret; - nxt_str_t *prefix; - nxt_file_t *file; - nxt_file_name_str_t file_name; - - prefix = nxt_file_name_is_absolute(name->start) ? NULL : cycle->prefix; - - ret = nxt_file_name_create(cycle->mem_pool, &file_name, "%V%V%Z", - prefix, name); - - if (nxt_slow_path(ret != NXT_OK)) { - return NULL; - } - - nxt_list_each(file, cycle->log_files) { - - /* STUB: hardecoded case sensitive/insensitive. */ - - if (file->name != NULL - && nxt_file_name_eq(file->name, file_name.start)) - { - return file; - } - - } nxt_list_loop; - - file = nxt_list_zero_add(cycle->log_files); - - if (nxt_slow_path(file == NULL)) { - return NULL; - } - - file->fd = NXT_FILE_INVALID; - file->log_level = NXT_LOG_CRIT; - file->name = file_name.start; - - return file; -} - - -static nxt_int_t -nxt_cycle_log_files_create(nxt_cycle_t *cycle) -{ - nxt_int_t ret; - nxt_file_t *file; - - nxt_list_each(file, cycle->log_files) { - - ret = nxt_file_open(file, NXT_FILE_APPEND, NXT_FILE_CREATE_OR_OPEN, - NXT_FILE_OWNER_ACCESS); - - if (ret != NXT_OK) { - return NXT_ERROR; - } - - } nxt_list_loop; - - file = nxt_list_first(cycle->log_files); - - return nxt_file_stderr(file); -} - - -static nxt_int_t -nxt_cycle_listen_sockets_create(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_uint_t c, p, ncurr, nprev; - nxt_listen_socket_t *curr, *prev; - - curr = cycle->listen_sockets->elts; - ncurr = cycle->listen_sockets->nelts; - - if (cycle->previous != NULL) { - prev = cycle->previous->listen_sockets->elts; - nprev = cycle->previous->listen_sockets->nelts; - - } else if (cycle->inherited_sockets != NULL) { - prev = cycle->inherited_sockets->elts; - nprev = cycle->inherited_sockets->nelts; - - } else { - prev = NULL; - nprev = 0; - } - - for (c = 0; c < ncurr; c++) { - - for (p = 0; p < nprev; p++) { - - if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { - - if (nxt_listen_socket_update(task, &curr[c], &prev[p]) != NXT_OK) { - return NXT_ERROR; - } - - goto next; - } - } - - if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { - return NXT_ERROR; - } - - next: - - continue; - } - - return NXT_OK; -} - - -static void -nxt_cycle_listen_sockets_close(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_uint_t p, c, nprev, ncurr; - nxt_listen_socket_t *curr, *prev; - - if (cycle->previous == NULL) { - return; - } - - prev = cycle->previous->listen_sockets->elts; - nprev = cycle->previous->listen_sockets->nelts; - - curr = cycle->listen_sockets->elts; - ncurr = cycle->listen_sockets->nelts; - - for (p = 0; p < nprev; p++) { - - for (c = 0; c < ncurr; c++) { - if (nxt_sockaddr_cmp(prev[p].sockaddr, curr[c].sockaddr)) { - goto next; - } - } - - nxt_socket_close(task, prev[p].socket); - - next: - - continue; - } - - return; -} - - -nxt_int_t -nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_uint_t i, n; - nxt_listen_socket_t *ls; - - ls = cycle->listen_sockets->elts; - n = cycle->listen_sockets->nelts; - - for (i = 0; i < n; i++) { - if (ls[i].flags == NXT_NONBLOCK) { - if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { - return NXT_ERROR; - } - } - } - - return NXT_OK; -} - - -nxt_str_t * -nxt_current_directory(nxt_mem_pool_t *mp) -{ - size_t length; - u_char *p; - nxt_str_t *name; - char buf[NXT_MAX_PATH_LEN]; - - length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); - - if (nxt_fast_path(length != 0)) { - name = nxt_str_alloc(mp, length + 1); - - if (nxt_fast_path(name != NULL)) { - p = nxt_cpymem(name->start, buf, length); - *p = '/'; - - return name; - } - } - - return NULL; -} - - -nxt_int_t -nxt_cycle_pid_file_create(nxt_file_name_t *pid_file, nxt_bool_t test) -{ - ssize_t length; - nxt_int_t n; - nxt_uint_t create; - nxt_file_t file; - u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; - - nxt_memzero(&file, sizeof(nxt_file_t)); - - file.name = pid_file; - - create = test ? NXT_FILE_CREATE_OR_OPEN : NXT_FILE_TRUNCATE; - - n = nxt_file_open(&file, NXT_FILE_WRONLY, create, NXT_FILE_DEFAULT_ACCESS); - - if (n != NXT_OK) { - return NXT_ERROR; - } - - if (!test) { - length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; - - if (nxt_file_write(&file, pid, length, 0) != length) { - return NXT_ERROR; - } - } - - nxt_file_close(&file); - - return NXT_OK; -} - - -static void -nxt_cycle_pid_file_delete(nxt_cycle_t *cycle) -{ - nxt_file_name_t *pid_file; - - if (!cycle->test_config) { - pid_file = (cycle->new_binary != 0) ? cycle->oldbin_file: - cycle->pid_file; - if (pid_file != NULL) { - nxt_file_delete(pid_file); - } - } -} - - -nxt_int_t -nxt_cycle_shm_zone_add(nxt_cycle_t *cycle, nxt_str_t *name, size_t size, - nxt_uint_t page_size) -{ - nxt_cycle_shm_zone_t *shm_zone; - - if (cycle->shm_zones == NULL) { - cycle->shm_zones = nxt_array_create(cycle->mem_pool, 1, - sizeof(nxt_cycle_shm_zone_t)); - if (cycle->shm_zones == NULL) { - return NXT_ERROR; - } - } - - shm_zone = nxt_array_add(cycle->shm_zones); - - if (shm_zone != NULL) { - shm_zone->size = size; - shm_zone->page_size = page_size; - shm_zone->name = *name; - - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle) -{ - nxt_uint_t i, n; - nxt_cycle_shm_zone_t *shm_zone; - - if (cycle->shm_zones != NULL) { - shm_zone = cycle->shm_zones->elts; - n = cycle->shm_zones->nelts; - - for (i = 0; i < n; i++) { - if (nxt_cycle_shm_zone_create(&shm_zone[i]) != NXT_OK) { - return NXT_ERROR; - } - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone) -{ - nxt_mem_zone_t *zone; - - /* - * Unix-only code because Windows ASLR maps shared memory segments at - * different addresses in different processes. Unix ASLR does not affect - * this because all shared memory segments are inherited during fork(). - */ - - shm_zone->addr = nxt_mem_mmap(NULL, shm_zone->size, - NXT_MEM_MAP_READ | NXT_MEM_MAP_WRITE, - NXT_MEM_MAP_SHARED, NXT_FILE_INVALID, 0); - - if (shm_zone->addr != NXT_MEM_MAP_FAILED) { - - zone = nxt_mem_zone_init(shm_zone->addr, shm_zone->size, - shm_zone->page_size); - if (zone != NULL) { - return NXT_OK; - } - - nxt_mem_munmap(shm_zone->addr, shm_zone->size); - } - - return NXT_ERROR; -} diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h deleted file mode 100644 index 088a91c3..00000000 --- a/src/nxt_cycle.h +++ /dev/null @@ -1,159 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) Valentin V. Bartenev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_CYCLE_H_INCLUDED_ -#define _NXT_CYCLE_H_INCLUDED_ - - -typedef enum { - NXT_PROCESS_SINGLE = 0, - NXT_PROCESS_MASTER, - NXT_PROCESS_WORKER, -} nxt_process_type_e; - - -typedef void (*nxt_cycle_cont_t)(nxt_task_t *task, nxt_cycle_t *cycle); - - -struct nxt_cycle_s { - nxt_mem_pool_t *mem_pool; - - nxt_cycle_t *previous; - - nxt_array_t *inherited_sockets; /* of nxt_listen_socket_t */ - nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */ - - nxt_array_t *services; /* of nxt_service_t */ - nxt_array_t *engines; /* of nxt_event_engine_t */ - - nxt_cycle_cont_t start; - - nxt_str_t *conf_prefix; - nxt_str_t *prefix; - - nxt_str_t hostname; - - nxt_file_name_t *pid_file; - nxt_file_name_t *oldbin_file; - nxt_pid_t new_binary; - -#if (NXT_THREADS) - nxt_array_t *thread_pools; /* of nxt_thread_pool_t */ - nxt_cycle_cont_t continuation; -#endif - - nxt_array_t *ports; /* of nxt_port_t */ - - nxt_list_t *log_files; /* of nxt_file_t */ - - nxt_array_t *shm_zones; /* of nxt_cycle_shm_zone_t */ - - uint32_t process_generation; - uint32_t current_process; - uint32_t last_engine_id; - - nxt_process_type_e type; - - uint8_t test_config; /* 1 bit */ - uint8_t reconfiguring; /* 1 bit */ - - void **core_ctx; - - nxt_timer_t timer; - - uint8_t daemon; - uint8_t batch; - uint8_t master_process; - const char *engine; - uint32_t engine_connections; - uint32_t worker_processes; - uint32_t auxiliary_threads; - nxt_user_cred_t user_cred; - const char *group; - const char *pid; - const char *error_log; - - nxt_sockaddr_t *app_listen; - nxt_sockaddr_t *stream_listen; - nxt_str_t upstream; -}; - - -typedef struct { - void *addr; - size_t size; - nxt_uint_t page_size; - nxt_str_t name; -} nxt_cycle_shm_zone_t; - - - -typedef nxt_int_t (*nxt_module_init_t)(nxt_thread_t *thr, nxt_cycle_t *cycle); - - -nxt_thread_extern_data(nxt_cycle_t *, nxt_thread_cycle_data); - - -nxt_inline void -nxt_thread_cycle_set(nxt_cycle_t *cycle) -{ - nxt_cycle_t **p; - - p = nxt_thread_get_data(nxt_thread_cycle_data); - - *p = cycle; -} - - -nxt_inline nxt_cycle_t * -nxt_thread_cycle(void) -{ - nxt_cycle_t **p; - - p = nxt_thread_get_data(nxt_thread_cycle_data); - - return *p; -} - - -nxt_int_t nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *previous, nxt_cycle_cont_t start); -void nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle); - -void nxt_cycle_event_engine_free(nxt_cycle_t *cycle); - -#if (NXT_THREADS) -nxt_int_t nxt_cycle_thread_pool_create(nxt_thread_t *thr, nxt_cycle_t *cycle, - nxt_uint_t max_threads, nxt_nsec_t timeout); -#endif - -/* STUB */ -nxt_str_t *nxt_current_directory(nxt_mem_pool_t *mp); - -nxt_int_t nxt_cycle_pid_file_create(nxt_file_name_t *pid_file, nxt_bool_t test); - -nxt_listen_socket_t *nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, - nxt_sockaddr_t *sa); -nxt_int_t nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle); -nxt_file_t *nxt_cycle_log_file_add(nxt_cycle_t *cycle, nxt_str_t *name); - -nxt_int_t nxt_cycle_shm_zone_add(nxt_cycle_t *cycle, nxt_str_t *name, - size_t size, nxt_uint_t page_size); - -/* STUB */ -void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log, - const char *fmt, ...); - -void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); -nxt_int_t nxt_app_start(nxt_cycle_t *cycle); - - -extern nxt_module_init_t nxt_init_modules[]; -extern nxt_uint_t nxt_init_modules_n; - - -#endif /* _NXT_CYCLE_H_INCLIDED_ */ diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index 73415cfe..30d4fedc 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -65,7 +65,7 @@ typedef struct { /* * The write() is an interface to write a buffer chain with a given rate - * limit. It calls write_chunk() in a cycle and handles write event timer. + * limit. It calls write_chunk() in a loop and handles write event timer. */ nxt_work_handler_t write; @@ -175,7 +175,7 @@ typedef struct { nxt_task_t task; uint32_t ready; - uint32_t batch0; + uint32_t batch; /* An accept() interface is cached to minimize memory accesses. */ nxt_work_handler_t accept; @@ -299,13 +299,13 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, c->socket.task, c, c->socket.data) -#define nxt_event_conn_read(e, c) \ +#define nxt_event_conn_read(engine, c) \ do { \ - nxt_event_engine_t *engine = e; \ + nxt_event_engine_t *e = engine; \ \ - c->socket.read_work_queue = &engine->read_work_queue; \ + c->socket.read_work_queue = &e->read_work_queue; \ \ - nxt_work_queue_add(&engine->read_work_queue, c->io->read, \ + nxt_work_queue_add(&e->read_work_queue, c->io->read, \ c->socket.task, c, c->socket.data); \ } while (0) diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c index 606cdf03..5f0a72f0 100644 --- a/src/nxt_event_conn_accept.c +++ b/src/nxt_event_conn_accept.c @@ -44,7 +44,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) cls->socket.fd = ls->socket; engine = task->thread->engine; - cls->batch0 = engine->batch0; + cls->batch = engine->batch; cls->socket.read_work_queue = &engine->accept_work_queue; cls->socket.read_handler = nxt_event_conn_listen_handler; @@ -130,7 +130,7 @@ nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data) nxt_event_conn_listen_t *cls; cls = obj; - cls->ready = cls->batch0; + cls->ready = cls->batch; cls->accept(task, cls, data); } diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c index bf2219f2..a283de86 100644 --- a/src/nxt_event_conn_job_sendfile.c +++ b/src/nxt_event_conn_job_sendfile.c @@ -182,7 +182,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) b = jbs->out; /* The job must be destroyed before connection error handler. */ - nxt_job_destroy(jbs); + nxt_job_destroy(task, jbs); if (c->write_state->process_buffers) { b = nxt_event_conn_job_sendfile_completion(task, c, b); diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c index d91cff4a..20c96370 100644 --- a/src/nxt_event_conn_proxy.c +++ b/src/nxt_event_conn_proxy.c @@ -1039,7 +1039,7 @@ static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_proxy_t *p; - + p = data; nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d", diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index ca79e8ed..bd10a270 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -25,11 +25,12 @@ static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_event_engine_t * -nxt_event_engine_create(nxt_thread_t *thr, +nxt_event_engine_create(nxt_task_t *task, const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, nxt_uint_t flags, nxt_uint_t batch) { nxt_uint_t events; + nxt_thread_t *thread; nxt_event_engine_t *engine; engine = nxt_zalloc(sizeof(nxt_event_engine_t)); @@ -37,14 +38,16 @@ nxt_event_engine_create(nxt_thread_t *thr, return NULL; } - engine->task.thread = thr; - engine->task.log = thr->log; + thread = task->thread; + + engine->task.thread = thread; + engine->task.log = thread->log; engine->task.ident = nxt_task_next_ident(); - thr->engine = engine; - thr->fiber = &engine->fibers->fiber; + thread->engine = engine; + thread->fiber = &engine->fibers->fiber; - engine->batch0 = batch; + engine->batch = batch; if (flags & NXT_ENGINE_FIBERS) { engine->fibers = nxt_fiber_main_create(engine); @@ -111,8 +114,10 @@ nxt_event_engine_create(nxt_thread_t *thr, goto timers_fail; } - nxt_thread_time_update(thr); - engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; + thread = task->thread; + + nxt_thread_time_update(thread); + engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; engine->max_connections = 0xffffffff; @@ -122,7 +127,7 @@ nxt_event_engine_create(nxt_thread_t *thr, #if !(NXT_THREADS) if (interface->signal_support) { - thr->time.signal = -1; + thread->time.signal = -1; } #endif @@ -368,14 +373,12 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t -nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, +nxt_event_engine_change(nxt_event_engine_t *engine, const nxt_event_interface_t *interface, nxt_uint_t batch) { - nxt_uint_t events; - nxt_event_engine_t *engine; + nxt_uint_t events; - engine = thr->engine; - engine->batch0 = batch; + engine->batch = batch; if (!engine->event.signal_support && interface->signal_support) { /* @@ -388,7 +391,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, * Add to engine fast work queue the signal events possibly * received before the blocking signal processing. */ - nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); + nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); } if (engine->pipe != NULL && interface->enable_post != NULL) { diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index b78b1b71..30d1fb21 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -477,7 +477,7 @@ struct nxt_event_engine_s { uint8_t shutdown; /* 1 bit */ - uint32_t batch0; + uint32_t batch; uint32_t connections; uint32_t max_connections; @@ -486,11 +486,11 @@ struct nxt_event_engine_s { }; -NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_thread_t *thr, +NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_task_t *task, const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, nxt_uint_t flags, nxt_uint_t batch); -NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_thread_t *thr, - nxt_task_t *task, const nxt_event_interface_t *interface, nxt_uint_t batch); +NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_event_engine_t *engine, + const nxt_event_interface_t *interface, nxt_uint_t batch); NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine); NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine); diff --git a/src/nxt_file.c b/src/nxt_file.c index ddde8a9e..8381dc74 100644 --- a/src/nxt_file.c +++ b/src/nxt_file.c @@ -8,11 +8,9 @@ nxt_int_t -nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create, - nxt_file_access_t access) +nxt_file_open(nxt_task_t *task, nxt_file_t *file, nxt_uint_t mode, + nxt_uint_t create, nxt_file_access_t access) { - nxt_thread_debug(thr); - #ifdef __CYGWIN__ mode |= O_BINARY; #endif @@ -24,18 +22,20 @@ nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create, file->error = (file->fd == -1) ? nxt_errno : 0; - nxt_thread_time_debug_update(thr); +#if (NXT_DEBUG) + nxt_thread_time_update(task->thread); +#endif - nxt_log_debug(thr->log, "open(\"%FN\", 0x%uXi, 0x%uXi): %FD err:%d", - file->name, mode, access, file->fd, file->error); + nxt_debug(task, "open(\"%FN\", 0x%uXi, 0x%uXi): %FD err:%d", + file->name, mode, access, file->fd, file->error); if (file->fd != -1) { return NXT_OK; } if (file->log_level != 0) { - nxt_thread_log_error(file->log_level, "open(\"%FN\") failed %E", - file->name, file->error); + nxt_log(task, file->log_level, "open(\"%FN\") failed %E", + file->name, file->error); } return NXT_ERROR; @@ -43,13 +43,13 @@ nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create, void -nxt_file_close(nxt_file_t *file) +nxt_file_close(nxt_task_t *task, nxt_file_t *file) { - nxt_thread_log_debug("close(%FD)", file->fd); + nxt_debug(task, "close(%FD)", file->fd); if (close(file->fd) != 0) { - nxt_thread_log_error(NXT_LOG_CRIT, "close(%FD, \"%FN\") failed %E", - file->fd, file->name, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "close(%FD, \"%FN\") failed %E", + file->fd, file->name, nxt_errno); } } diff --git a/src/nxt_file.h b/src/nxt_file.h index 01486e95..66220127 100644 --- a/src/nxt_file.h +++ b/src/nxt_file.h @@ -106,10 +106,8 @@ typedef struct { } nxt_file_t; -NXT_EXPORT nxt_int_t nxt_file_open(nxt_file_t *file, nxt_uint_t mode, - nxt_uint_t create, nxt_file_access_t access); - -#define nxt_file_open_n "open" +NXT_EXPORT nxt_int_t nxt_file_open(nxt_task_t *task, nxt_file_t *file, + nxt_uint_t mode, nxt_uint_t create, nxt_file_access_t access); /* The file open access modes. */ @@ -128,7 +126,7 @@ NXT_EXPORT nxt_int_t nxt_file_open(nxt_file_t *file, nxt_uint_t mode, #define NXT_FILE_OWNER_ACCESS 0600 -NXT_EXPORT void nxt_file_close(nxt_file_t *file); +NXT_EXPORT void nxt_file_close(nxt_task_t *task, nxt_file_t *file); NXT_EXPORT ssize_t nxt_file_write(nxt_file_t *file, const u_char *buf, size_t size, nxt_off_t offset); NXT_EXPORT ssize_t nxt_file_read(nxt_file_t *file, u_char *buf, size_t size, diff --git a/src/nxt_job.c b/src/nxt_job.c index 86cfc462..68ced550 100644 --- a/src/nxt_job.c +++ b/src/nxt_job.c @@ -60,7 +60,7 @@ nxt_job_init(nxt_job_t *job, size_t size) void -nxt_job_destroy(void *data) +nxt_job_destroy(nxt_task_t *task, void *data) { nxt_job_t *job; diff --git a/src/nxt_job.h b/src/nxt_job.h index b1af2e80..9d004d0a 100644 --- a/src/nxt_job.h +++ b/src/nxt_job.h @@ -60,7 +60,7 @@ typedef struct { NXT_EXPORT void *nxt_job_create(nxt_mem_pool_t *mp, size_t size); NXT_EXPORT void nxt_job_init(nxt_job_t *job, size_t size); -NXT_EXPORT void nxt_job_destroy(void *data); +NXT_EXPORT void nxt_job_destroy(nxt_task_t *task, void *data); NXT_EXPORT nxt_int_t nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job); NXT_EXPORT void nxt_job_start(nxt_task_t *task, nxt_job_t *job, diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index 0aeb4faa..559f0c9e 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -738,7 +738,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->kq_errno = err; ev->kq_eof = eof; - if (ev->read == NXT_EVENT_BLOCKED) { + if (ev->read <= NXT_EVENT_BLOCKED) { nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); continue; } @@ -769,7 +769,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->kq_errno = err; ev->kq_eof = eof; - if (ev->write == NXT_EVENT_BLOCKED) { + if (ev->write <= NXT_EVENT_BLOCKED) { nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); continue; } @@ -908,7 +908,7 @@ nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "kevent fd:%d avail:%D", cls->socket.fd, cls->socket.kq_available); - cls->ready = nxt_min(cls->batch0, (uint32_t) cls->socket.kq_available); + cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); nxt_kqueue_event_conn_io_accept(task, cls, data); } diff --git a/src/nxt_lib.c b/src/nxt_lib.c index ee36c91a..d113bcbb 100644 --- a/src/nxt_lib.c +++ b/src/nxt_lib.c @@ -36,7 +36,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp) int n; nxt_int_t flags; nxt_bool_t update; - nxt_thread_t *thr; + nxt_thread_t *thread; flags = nxt_stderr_start(); @@ -64,16 +64,15 @@ nxt_lib_start(const char *app, char **argv, char ***envp) /* Thread log is required for nxt_malloc() in nxt_strerror_start(). */ nxt_thread_init_data(nxt_thread_context); - thr = nxt_thread(); - thr->log = &nxt_main_log; + thread = nxt_thread(); + thread->log = &nxt_main_log; -#if (NXT_THREADS) - thr->handle = nxt_thread_handle(); - thr->time.signal = -1; -#endif + thread->handle = nxt_thread_handle(); + thread->time.signal = -1; + nxt_thread_time_update(thread); - nxt_main_task.thread = thr; - nxt_main_task.log = thr->log; + nxt_main_task.thread = thread; + nxt_main_task.log = thread->log; nxt_main_task.ident = nxt_task_next_ident(); if (nxt_strerror_start() != NXT_OK) { @@ -81,7 +80,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp) } if (flags != -1) { - nxt_log_debug(thr->log, "stderr flags: 0x%04Xd", flags); + nxt_debug(&nxt_main_task, "stderr flags: 0x%04Xd", flags); } #ifdef _SC_NPROCESSORS_ONLN @@ -93,7 +92,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp) #endif - nxt_log_debug(thr->log, "ncpu: %ui", n); + nxt_debug(&nxt_main_task, "ncpu: %ui", n); if (n > 1) { nxt_ncpu = n; @@ -105,12 +104,12 @@ nxt_lib_start(const char *app, char **argv, char ***envp) nxt_pagesize = getpagesize(); - nxt_log_debug(thr->log, "pagesize: %ui", nxt_pagesize); + nxt_debug(&nxt_main_task, "pagesize: %ui", nxt_pagesize); if (argv != NULL) { update = (argv[0] == app); - nxt_process_arguments(argv, envp); + nxt_process_arguments(&nxt_main_task, argv, envp); if (update) { nxt_log_start(nxt_process_argv[0]); @@ -131,12 +130,12 @@ nxt_lib_stop(void) for ( ;; ) { nxt_thread_pool_t *tp; - nxt_thread_spin_lock(&cycle->lock); + nxt_thread_spin_lock(&rt->lock); - tp = cycle->thread_pools; - cycle->thread_pools = (tp != NULL) ? tp->next : NULL; + tp = rt->thread_pools; + rt->thread_pools = (tp != NULL) ? tp->next : NULL; - nxt_thread_spin_unlock(&cycle->lock); + nxt_thread_spin_unlock(&rt->lock); if (tp == NULL) { break; diff --git a/src/nxt_list.h b/src/nxt_list.h index 4d14f4d9..4b314360 100644 --- a/src/nxt_list.h +++ b/src/nxt_list.h @@ -75,7 +75,7 @@ nxt_list_elt(nxt_list_t *list, nxt_uint_t n) #define nxt_list_each(elt, list) \ do { \ - if (nxt_fast_path((list) != NULL)) { \ + if (nxt_fast_path((list) != NULL)) { \ void *_end; \ nxt_list_part_t *_part = nxt_list_part(list); \ \ diff --git a/src/nxt_main.c b/src/nxt_main.c index 4219fdea..7cd032d5 100644 --- a/src/nxt_main.c +++ b/src/nxt_main.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> extern char **environ; @@ -14,8 +14,7 @@ extern char **environ; int nxt_cdecl main(int argc, char **argv) { - nxt_int_t ret; - nxt_thread_t *thr; + nxt_int_t ret; if (nxt_lib_start("nginext", argv, &environ) != NXT_OK) { return 1; @@ -23,20 +22,17 @@ main(int argc, char **argv) // nxt_main_log.level = NXT_LOG_INFO; - thr = nxt_thread(); - nxt_thread_time_update(thr); - nxt_main_log.handler = nxt_log_time_handler; - nxt_log_error(NXT_LOG_INFO, thr->log, "nginext started"); + nxt_log(&nxt_main_task, NXT_LOG_INFO, "nginext started"); - ret = nxt_cycle_create(thr, &nxt_main_task, NULL, NULL); + ret = nxt_runtime_create(&nxt_main_task); if (ret != NXT_OK) { return 1; } - nxt_event_engine_start(thr->engine); + nxt_event_engine_start(nxt_main_task.thread->engine); nxt_unreachable(); return 0; diff --git a/src/nxt_main.h b/src/nxt_main.h index 8db1f833..a1a01434 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -14,13 +14,22 @@ #include <nxt_clang.h> #include <nxt_types.h> #include <nxt_time.h> -#include <nxt_process.h> +typedef struct nxt_mem_pool_s nxt_mem_pool_t; +#include <nxt_array.h> + +typedef struct nxt_port_s nxt_port_t; typedef struct nxt_task_s nxt_task_t; +typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; +typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); +typedef struct nxt_sig_event_s nxt_sig_event_t; +typedef struct nxt_runtime_s nxt_runtime_t; + +#include <nxt_process.h> + typedef struct nxt_thread_s nxt_thread_t; #include <nxt_thread_id.h> -typedef struct nxt_mem_pool_s nxt_mem_pool_t; #include <nxt_mem_pool.h> #include <nxt_errno.h> @@ -89,7 +98,6 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t; #include <nxt_hash.h> #include <nxt_sort.h> -#include <nxt_array.h> #include <nxt_vector.h> #include <nxt_list.h> @@ -110,8 +118,7 @@ typedef struct nxt_event_conn_s nxt_event_conn_t; #endif -#define \ -nxt_thread() \ +#define nxt_thread() \ (nxt_thread_t *) nxt_thread_get_data(nxt_thread_context) nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); @@ -121,7 +128,6 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_fd_event.h> -typedef struct nxt_cycle_s nxt_cycle_t; #include <nxt_port.h> #if (NXT_THREADS) #include <nxt_thread_pool.h> @@ -154,7 +160,7 @@ typedef struct nxt_upstream_source_s nxt_upstream_source_t; #include <nxt_upstream_source.h> #include <nxt_http_source.h> #include <nxt_fastcgi_source.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #if (NXT_LIB_UNIT_TEST) diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 1e613b45..12c61d16 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -5,46 +5,39 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_master_process_title(void); + nxt_runtime_t *rt); +static void nxt_master_process_title(nxt_task_t *task); +static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, + nxt_runtime_t *rt); static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, - nxt_cycle_t *cycle); + nxt_runtime_t *rt); static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_master_stop_previous_worker_processes(nxt_task_t *task, - void *obj, void *data); -static void nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle); + nxt_runtime_t *rt, nxt_process_init_t *init); static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, - void *data); -static char **nxt_master_process_upgrade_environment(nxt_cycle_t *cycle); -static char **nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle); static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); const nxt_sig_event_t nxt_master_process_signals[] = { - nxt_event_signal(SIGHUP, nxt_master_process_sighup_handler), nxt_event_signal(SIGINT, nxt_master_process_sigterm_handler), nxt_event_signal(SIGQUIT, nxt_master_process_sigquit_handler), nxt_event_signal(SIGTERM, nxt_master_process_sigterm_handler), nxt_event_signal(SIGCHLD, nxt_master_process_sigchld_handler), nxt_event_signal(SIGUSR1, nxt_master_process_sigusr1_handler), - nxt_event_signal(SIGUSR2, nxt_master_process_sigusr2_handler), nxt_event_signal_end, }; @@ -54,27 +47,47 @@ static nxt_bool_t nxt_exiting; nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle) + nxt_runtime_t *rt) { - cycle->type = NXT_PROCESS_MASTER; + nxt_int_t ret; + + rt->type = NXT_PROCESS_MASTER; - if (nxt_master_process_port_create(task, cycle) != NXT_OK) { + if (nxt_master_process_port_create(task, rt) != NXT_OK) { return NXT_ERROR; } - nxt_master_process_title(); + nxt_master_process_title(task); + + ret = nxt_master_start_controller_process(task, rt); + if (ret != NXT_OK) { + return ret; + } + + ret = nxt_master_start_router_process(task, rt); + if (ret != NXT_OK) { + return ret; + } - return nxt_master_start_worker_processes(task, cycle); + return nxt_master_start_worker_processes(task, rt); } static nxt_int_t -nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - nxt_port_t *port; + nxt_int_t ret; + nxt_port_t *port; + nxt_process_t *process; - port = nxt_array_zero_add(cycle->ports); + process = nxt_runtime_new_process(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->pid = nxt_pid; + + port = nxt_array_zero_add(process->ports); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -98,7 +111,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) static void -nxt_master_process_title(void) +nxt_master_process_title(nxt_task_t *task) { u_char *p, *end; nxt_uint_t i; @@ -115,22 +128,75 @@ nxt_master_process_title(void) *p = '\0'; - nxt_process_title((char *) title); + nxt_process_title(task, (char *) title); } static nxt_int_t -nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - nxt_uint_t n; + nxt_process_init_t *init; - cycle->process_generation++; + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } - n = cycle->worker_processes; + init->start = nxt_controller_start; + init->name = "controller process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_CONTROLLER; + + return nxt_master_create_worker_process(task, rt, init); +} + + +static nxt_int_t +nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_process_init_t *init; + + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } + + init->start = nxt_router_start; + init->name = "router process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_ROUTER; + + return nxt_master_create_worker_process(task, rt, init); +} + + +static nxt_int_t +nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + nxt_uint_t n; + nxt_process_init_t *init; + + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } + + init->start = nxt_app_start; + init->name = "worker process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_WORKER; + + n = rt->worker_processes; while (n-- != 0) { - ret = nxt_master_create_worker_process(task, cycle); + ret = nxt_master_create_worker_process(task, rt, init); if (ret != NXT_OK) { return ret; @@ -142,18 +208,34 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) static nxt_int_t -nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init) { - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; + nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; + nxt_process_t *process, *master_process; - port = nxt_array_zero_add(cycle->ports); + /* + * TODO: remove process, init, ports from array on memory and fork failures. + */ + + process = nxt_runtime_new_process(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->init = init; + + master_process = rt->processes->elts; + init->master_port = master_process->ports->elts; + + port = nxt_array_zero_add(process->ports); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } - cycle->current_process = cycle->ports->nelts - 1; + init->port = port; ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { @@ -161,10 +243,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) } port->engine = 0; - port->generation = cycle->process_generation; - pid = nxt_process_create(nxt_worker_process_start, cycle, - "start worker process"); + pid = nxt_process_create(task, init); switch (pid) { @@ -177,110 +257,42 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) default: /* The master process created a new process. */ + process->pid = pid; port->pid = pid; nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_send_new_port(task, cycle, port); + nxt_port_send_new_port(task, rt, port); return NXT_OK; } } -static void -nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_cycle_t *cycle; - - cycle = nxt_thread_cycle(); - - nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", - (int) (uintptr_t) obj, data, - cycle->reconfiguring ? "ignored" : "reconfiguring"); - - if (!cycle->reconfiguring) { - (void) nxt_cycle_create(task->thread, task, cycle, - nxt_master_process_new_cycle); - } -} - - -static void -nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_thread_t *thr; - - thr = task->thread; - - nxt_debug(task, "new cycle"); - - /* A safe place to free the previous cycle. */ - nxt_mem_pool_destroy(cycle->previous->mem_pool); - - switch (nxt_master_start_worker_processes(task, cycle)) { - - case NXT_OK: - /* - * The master process, allow old worker processes to accept new - * connections yet 500ms in parallel with new worker processes. - */ - cycle->timer.handler = nxt_master_stop_previous_worker_processes; - cycle->timer.log = &nxt_main_log; - - cycle->timer.work_queue = &thr->engine->fast_work_queue; - - nxt_timer_add(thr->engine, &cycle->timer, 500); - - return; - - case NXT_ERROR: - /* - * The master process, one or more new worker processes - * could not be created, there is no fallback. - */ - return; - - default: /* NXT_AGAIN */ - /* A worker process, return to the event engine work queue loop. */ - return; - } -} - - -static void -nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, - void *data) +void +nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) { - uint32_t generation; - nxt_uint_t i, n; - nxt_port_t *port; - nxt_cycle_t *cycle; + nxt_uint_t i, n, nprocesses, nports; + nxt_port_t *port; + nxt_process_t *process; - cycle = nxt_thread_cycle(); + process = rt->processes->elts; + nprocesses = rt->processes->nelts; - port = cycle->ports->elts; - n = cycle->ports->nelts; + for (i = 0; i < nprocesses; i++) { - generation = cycle->process_generation - 1; + if (nxt_pid != process[i].pid) { + process[i].init = NULL; - /* The port[0] is the master process port. */ + port = process[i].ports->elts; + nports = process[i].ports->nelts; - for (i = 1; i < n; i++) { - if (port[i].generation == generation) { - (void) nxt_port_socket_write(task, &port[i], - NXT_PORT_MSG_QUIT, -1, 0, NULL); + for (n = 0; n < nports; n++) { + (void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT, + -1, 0, NULL); + } } } - - cycle->reconfiguring = 0; -} - - -void -nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL); } @@ -295,7 +307,7 @@ nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) nxt_exiting = 1; - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } @@ -309,7 +321,7 @@ nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) nxt_exiting = 1; - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } @@ -319,7 +331,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_uint_t n; nxt_file_t *file, *new_file; - nxt_cycle_t *cycle; + nxt_runtime_t *rt; nxt_array_t *new_files; nxt_mem_pool_t *mp; @@ -331,9 +343,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) return; } - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - n = nxt_list_nelts(cycle->log_files); + n = nxt_list_nelts(rt->log_files); new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); if (new_files == NULL) { @@ -341,7 +353,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) return; } - nxt_list_each(file, cycle->log_files) { + nxt_list_each(file, rt->log_files) { /* This allocation cannot fail. */ new_file = nxt_array_add(new_files); @@ -350,7 +362,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) new_file->fd = NXT_FILE_INVALID; new_file->log_level = NXT_LOG_CRIT; - ret = nxt_file_open(new_file, NXT_FILE_APPEND, NXT_FILE_CREATE_OR_OPEN, + ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, NXT_FILE_OWNER_ACCESS); if (ret != NXT_OK) { @@ -366,9 +378,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) if (ret == NXT_OK) { n = 0; - nxt_list_each(file, cycle->log_files) { + nxt_list_each(file, rt->log_files) { - nxt_port_change_log_file(task, cycle, n, new_file[n].fd); + nxt_port_change_log_file(task, rt, n, new_file[n].fd); /* * The old log file descriptor must be closed at the moment * when no other threads use it. dup2() allows to use the @@ -392,7 +404,7 @@ fail: while (n != 0) { if (new_file->fd != NXT_FILE_INVALID) { - nxt_file_close(new_file); + nxt_file_close(task, new_file); } new_file++; @@ -404,164 +416,6 @@ fail: static void -nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, void *data) -{ - char **env; - nxt_int_t ret; - nxt_pid_t pid, ppid; - nxt_bool_t ignore; - nxt_cycle_t *cycle; - - cycle = nxt_thread_cycle(); - - /* Is upgrade or reconfiguring in progress? */ - ignore = (cycle->new_binary != 0) || cycle->reconfiguring; - - ppid = getppid(); - - if (ppid == nxt_ppid && ppid != 1) { - /* - * Ignore the upgrade signal in a new master process if an old - * master process is still running. After the old process's exit - * getppid() will return 1 (init process pid) or pid of zsched (zone - * scheduler) if the processes run in Solaris zone. There is little - * race condition between the parent process exit and getting getppid() - * for the very start of the new master process execution, so init or - * zsched pid may be stored in nxt_ppid. For this reason pid 1 is - * tested explicitly. There is no workaround for this race condition - * in Solaris zons. To eliminate this race condition in Solaris - * zone the old master process should be quit only when both - * "nginext.pid.oldbin" (created by the old master process) and - * "nginext.pid" (created by the new master process) files exists. - */ - ignore = 1; - } - - nxt_log(task, NXT_LOG_NOTICE, - "signal %d (%s) recevied, %s, parent pid: %PI", - (int) (uintptr_t) obj, data, - ignore ? "ignored" : "online binary file upgrade", ppid); - - if (ignore) { - return; - } - - env = nxt_master_process_upgrade_environment(cycle); - if (nxt_slow_path(env == NULL)) { - return; - } - - cycle->new_binary = -1; - - ret = nxt_cycle_pid_file_create(cycle->oldbin_file, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - pid = nxt_process_execute(nxt_process_argv[0], nxt_process_argv, env); - - if (pid == -1) { - cycle->new_binary = 0; - (void) nxt_file_delete(cycle->oldbin_file); - - } else { - cycle->new_binary = pid; - } - -fail: - - /* Zero slot is NGINX variable slot, all other slots must not be free()d. */ - nxt_free(env[0]); - nxt_free(env); -} - - -static char ** -nxt_master_process_upgrade_environment(nxt_cycle_t *cycle) -{ - size_t len; - char **env; - u_char *p, *end; - nxt_uint_t n; - nxt_listen_socket_t *ls; - - env = nxt_master_process_upgrade_environment_create(cycle); - if (nxt_slow_path(env == NULL)) { - return NULL; - } - - ls = cycle->listen_sockets->elts; - n = cycle->listen_sockets->nelts; - - len = sizeof("NGINX=") + n * (NXT_INT_T_LEN + 1); - - p = nxt_malloc(len); - - if (nxt_slow_path(p == NULL)) { - nxt_free(env); - return NULL; - } - - env[0] = (char *) p; - end = p + len; - - p = nxt_cpymem(p, "NGINX=", sizeof("NGINX=") - 1); - - do { - p = nxt_sprintf(p, end, "%ud;", ls->socket); - - ls++; - n--; - } while (n != 0); - - *p = '\0'; - - return env; -} - - -static char ** -nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle) -{ - char **env; - nxt_uint_t n; - - /* 2 is for "NGINX" variable and the last NULL slot. */ - n = 2; - -#if (NXT_SETPROCTITLE_ARGV) - n++; -#endif - - env = nxt_malloc(n * sizeof(char *)); - if (nxt_slow_path(env == NULL)) { - return NULL; - } - - /* Zero slot is reserved for "NGINX" variable. */ - n = 1; - - /* TODO: copy env values */ - -#if (NXT_SETPROCTITLE_ARGV) - - /* 300 spare bytes for new process title. */ - env[n++] = (char *) - "SPARE=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; - -#endif - - env[n] = NULL; - - return env; -} - - -static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { int status; @@ -619,38 +473,36 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_uint_t i, n, generation; - nxt_port_t *port; - nxt_cycle_t *cycle; + nxt_uint_t i, n; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *init; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - if (cycle->new_binary == pid) { - cycle->new_binary = 0; + process = rt->processes->elts; + n = rt->processes->nelts; - (void) nxt_file_rename(cycle->oldbin_file, cycle->pid_file); - return; - } + /* A process[0] is the master process. */ - port = cycle->ports->elts; - n = cycle->ports->nelts; + for (i = 1; i < n; i++) { - for (i = 0; i < n; i++) { + if (pid == process[i].pid) { + init = process[i].init; - if (pid == port[i].pid) { - generation = port[i].generation; + /* TODO: free ports fds. */ - nxt_array_remove(cycle->ports, &port[i]); + nxt_array_remove(rt->processes, &process[i]); if (nxt_exiting) { nxt_debug(task, "processes %d", n); if (n == 2) { - nxt_cycle_quit(task, cycle); + nxt_runtime_quit(task); } - } else if (generation == cycle->process_generation) { - (void) nxt_master_create_worker_process(task, cycle); + } else if (init != NULL) { + (void) nxt_master_create_worker_process(task, rt, init); } return; diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h index 8c40b5b3..46c570be 100644 --- a/src/nxt_master_process.h +++ b/src/nxt_master_process.h @@ -4,17 +4,21 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_ -#define _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_ +#ifndef _NXT_MASTER_PROCESS_H_INCLUDED_ +#define _NXT_MASTER_PROCESS_H_INCLUDED_ nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle); -void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle); -void nxt_worker_process_start(void *data); + nxt_runtime_t *runtime); +void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime); +nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt); +nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt); + +extern nxt_port_handler_t nxt_worker_process_port_handlers[]; extern const nxt_sig_event_t nxt_master_process_signals[]; +extern const nxt_sig_event_t nxt_worker_process_signals[]; -#endif /* _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_ */ +#endif /* _NXT_MASTER_PROCESS_H_INCLUDED_ */ diff --git a/src/nxt_mem_pool.c b/src/nxt_mem_pool.c index 0d4b5688..7817bac7 100644 --- a/src/nxt_mem_pool.c +++ b/src/nxt_mem_pool.c @@ -126,16 +126,19 @@ nxt_mem_pool_create(size_t size) void nxt_mem_pool_destroy(nxt_mem_pool_t *mp) { + nxt_task_t *task; nxt_mem_pool_ext_t *ext; nxt_mem_pool_chunk_t *chunk, *next; nxt_mem_pool_cleanup_t *mpcl; + task = NULL; + nxt_mem_pool_thread_assert(mp); for (mpcl = mp->cleanup; mpcl != NULL; mpcl = mpcl->next) { if (mpcl->handler != NULL) { nxt_thread_log_debug("mem pool cleanup: %p", mpcl); - mpcl->handler(mpcl->data); + mpcl->handler(task, mpcl->data); } } diff --git a/src/nxt_mem_pool.h b/src/nxt_mem_pool.h index f59e29e1..a896956c 100644 --- a/src/nxt_mem_pool.h +++ b/src/nxt_mem_pool.h @@ -11,7 +11,7 @@ #define NXT_MEM_POOL_MIN_EXT_SIZE nxt_pagesize -typedef void (*nxt_mem_pool_cleanup_handler_t)(void *data); +typedef void (*nxt_mem_pool_cleanup_handler_t)(nxt_task_t *task, void *data); typedef struct nxt_mem_pool_cleanup_s nxt_mem_pool_cleanup_t; typedef struct nxt_mem_pool_cache_s nxt_mem_pool_cache_t; typedef struct nxt_mem_pool_chunk_s nxt_mem_pool_chunk_t; diff --git a/src/nxt_mem_pool_cleanup.c b/src/nxt_mem_pool_cleanup.c index febfc959..ceafc9c8 100644 --- a/src/nxt_mem_pool_cleanup.c +++ b/src/nxt_mem_pool_cleanup.c @@ -7,7 +7,7 @@ #include <nxt_main.h> -static void nxt_mem_pool_file_cleanup_handler(void *data); +static void nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data); nxt_mem_pool_cleanup_t * @@ -27,13 +27,13 @@ nxt_mem_pool_file_cleanup(nxt_mem_pool_t *mp, nxt_file_t *file) static void -nxt_mem_pool_file_cleanup_handler(void *data) +nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data) { nxt_file_t *file; file = data; if (file->fd != NXT_FILE_INVALID) { - nxt_file_close(file); + nxt_file_close(task, file); } } diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 664ae88e..ece3ec92 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -507,62 +507,62 @@ nxt_php_register_variables(zval *track_vars_array TSRMLS_DC) nxt_log_debug(r->log, "php register variables"); php_register_variable_safe((char *) "PHP_SELF", - (char *) r->header.path.data, - ctx->script_name_len, track_vars_array TSRMLS_CC); + (char *) r->header.path.data, + ctx->script_name_len, track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "SERVER_PROTOCOL", - (char *) r->header.version.data, - r->header.version.len, track_vars_array TSRMLS_CC); + (char *) r->header.version.data, + r->header.version.len, track_vars_array TSRMLS_CC); #if ABS_MODE php_register_variable_safe((char *) "SCRIPT_NAME", - (char *) nxt_php_script.data, - nxt_php_script.len, track_vars_array TSRMLS_CC); + (char *) nxt_php_script.data, + nxt_php_script.len, track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "SCRIPT_FILENAME", - (char *) nxt_php_path.data, - nxt_php_path.len, track_vars_array TSRMLS_CC); + (char *) nxt_php_path.data, + nxt_php_path.len, track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "DOCUMENT_ROOT", - (char *) nxt_php_root.data, - nxt_php_root.len, track_vars_array TSRMLS_CC); + (char *) nxt_php_root.data, + nxt_php_root.len, track_vars_array TSRMLS_CC); #else php_register_variable_safe((char *) "SCRIPT_NAME", - (char *) r->header.path.data, - ctx->script_name_len, track_vars_array TSRMLS_CC); + (char *) r->header.path.data, + ctx->script_name_len, track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "SCRIPT_FILENAME", - (char *) ctx->script.data, ctx->script.len, - track_vars_array TSRMLS_CC); + (char *) ctx->script.data, ctx->script.len, + track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "DOCUMENT_ROOT", (char *) root, - sizeof(root) - 1, track_vars_array TSRMLS_CC); + sizeof(root) - 1, track_vars_array TSRMLS_CC); #endif php_register_variable_safe((char *) "REQUEST_METHOD", - (char *) r->header.method.data, - r->header.method.len, track_vars_array TSRMLS_CC); + (char *) r->header.method.data, + r->header.method.len, track_vars_array TSRMLS_CC); php_register_variable_safe((char *) "REQUEST_URI", - (char *) r->header.path.data, - r->header.path.len, track_vars_array TSRMLS_CC); + (char *) r->header.path.data, + r->header.path.len, track_vars_array TSRMLS_CC); if (ctx->query.data != NULL) { php_register_variable_safe((char *) "QUERY_STRING", - (char *) ctx->query.data, - ctx->query.len, track_vars_array TSRMLS_CC); + (char *) ctx->query.data, + ctx->query.len, track_vars_array TSRMLS_CC); } if (ctx->content_type != NULL) { php_register_variable_safe((char *) "CONTENT_TYPE", - (char *) ctx->content_type->data, - ctx->content_type->len, track_vars_array TSRMLS_CC); + (char *) ctx->content_type->data, + ctx->content_type->len, track_vars_array TSRMLS_CC); } if (ctx->content_length != NULL) { php_register_variable_safe((char *) "CONTENT_LENGTH", - (char *) ctx->content_length->data, - ctx->content_length->len, track_vars_array TSRMLS_CC); + (char *) ctx->content_length->data, + ctx->content_length->len, track_vars_array TSRMLS_CC); } var = nxt_mem_nalloc(r->mem_pool, sizeof(prefix) + ctx->max_name + 1); diff --git a/src/nxt_port.c b/src/nxt_port.c index 8a56fc6b..1da16587 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_port.h> @@ -29,18 +29,26 @@ nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, void -nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, +nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_uint_t i, n; - nxt_port_t *port; + nxt_uint_t i, n, nprocesses, nports; + nxt_port_t *port; + nxt_process_t *process; - port = cycle->ports->elts; - n = cycle->ports->nelts; + process = rt->processes->elts; + nprocesses = rt->processes->nelts; - for (i = 0; i < n; i++) { - if (nxt_pid != port[i].pid) { - (void) nxt_port_socket_write(task, &port[i], type, fd, stream, b); + for (i = 0; i < nprocesses; i++) { + + if (nxt_pid != process[i].pid) { + port = process[i].ports->elts; + nports = process[i].ports->nelts; + + for (n = 0; n < nports; n++) { + (void) nxt_port_socket_write(task, &port[n], type, + fd, stream, b); + } } } } @@ -70,20 +78,21 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } void -nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, +nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port) { nxt_buf_t *b; nxt_uint_t i, n; nxt_port_t *port; + nxt_process_t *process; nxt_port_msg_new_port_t *msg; - n = cycle->ports->nelts; + n = rt->processes->nelts; if (n == 0) { return; } @@ -91,34 +100,33 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, nxt_debug(task, "new port %d for process %PI engine %uD", new_port->socket.fd, new_port->pid, new_port->engine); - port = cycle->ports->elts; + process = rt->processes->elts; for (i = 0; i < n; i++) { - if (port[i].pid == new_port->pid - || port[i].pid == nxt_pid - || port[i].engine != 0) - { + if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) { continue; } - b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); + port = process[i].ports->elts; + + b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - b->data = &port[i]; + b->data = port; b->completion_handler = nxt_port_new_port_buf_completion; b->mem.free += sizeof(nxt_port_msg_new_port_t); msg = (nxt_port_msg_new_port_t *) b->mem.pos; msg->pid = new_port->pid; msg->engine = new_port->engine; - msg->max_size = port[i].max_size; - msg->max_share = port[i].max_share; + msg->max_size = port->max_size; + msg->max_share = port->max_share; - (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT, + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, new_port->socket.fd, 0, b); } } @@ -143,13 +151,19 @@ void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; - nxt_cycle_t *cycle; + nxt_process_t *process; + nxt_runtime_t *rt; nxt_mem_pool_t *mp; nxt_port_msg_new_port_t *new_port_msg; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - port = nxt_array_add(cycle->ports); + process = nxt_runtime_new_process(rt); + if (nxt_slow_path(process == NULL)) { + return; + } + + port = nxt_array_zero_add(process->ports); if (nxt_slow_path(port == NULL)) { return; } @@ -167,6 +181,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "new port %d received for process %PI engine %uD", msg->fd, new_port_msg->pid, new_port_msg->engine); + process->pid = new_port_msg->pid; + port->pid = new_port_msg->pid; port->engine = new_port_msg->engine; port->pair[0] = -1; @@ -183,27 +199,29 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, - nxt_uint_t slot, nxt_fd_t fd) +nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, + nxt_fd_t fd) { - nxt_buf_t *b; - nxt_uint_t i, n; - nxt_port_t *port; + nxt_buf_t *b; + nxt_uint_t i, n; + nxt_port_t *port; + nxt_process_t *process; - n = cycle->ports->nelts; + n = rt->processes->nelts; if (n == 0) { return; } nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); - port = cycle->ports->elts; + process = rt->processes->elts; - /* port[0] is master process port. */ + /* process[0] is master process. */ for (i = 1; i < n; i++) { - b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0); + port = process[i].ports->elts; + b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { continue; } @@ -211,7 +229,7 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, *(nxt_uint_t *) b->mem.pos = slot; b->mem.free += sizeof(nxt_uint_t); - (void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE, + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, fd, 0, b); } } @@ -220,17 +238,17 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b; - nxt_uint_t slot; - nxt_file_t *log_file; - nxt_cycle_t *cycle; + nxt_buf_t *b; + nxt_uint_t slot; + nxt_file_t *log_file; + nxt_runtime_t *rt; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; b = msg->buf; slot = *(nxt_uint_t *) b->mem.pos; - log_file = nxt_list_elt(cycle->log_files, slot); + log_file = nxt_list_elt(rt->log_files, slot); nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); diff --git a/src/nxt_port.h b/src/nxt_port.h index c27b1bc4..8a9b8926 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -8,9 +8,6 @@ #define _NXT_PORT_H_INCLUDED_ -typedef struct nxt_port_s nxt_port_t; - - typedef struct { uint32_t stream; @@ -28,17 +25,14 @@ typedef struct { } nxt_port_send_msg_t; -typedef struct nxt_port_recv_msg_s { +struct nxt_port_recv_msg_s { uint32_t stream; uint16_t type; nxt_fd_t fd; nxt_buf_t *buf; nxt_port_t *port; -} nxt_port_recv_msg_t; - - -typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); +}; struct nxt_port_s { @@ -61,7 +55,6 @@ struct nxt_port_s { nxt_pid_t pid; uint32_t engine; - uint32_t generation; }; @@ -105,11 +98,11 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, nxt_port_handler_t *handlers); -void nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, +void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); -void nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle, +void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *port); -void nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, +void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd); void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_process.c b/src/nxt_process.c index b79e4388..d24be580 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -5,9 +5,11 @@ */ #include <nxt_main.h> +#include <nxt_master_process.h> -static nxt_int_t nxt_user_groups_get(nxt_user_cred_t *uc); +static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process); +static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); /* A cached process pid. */ @@ -18,20 +20,17 @@ nxt_pid_t nxt_ppid; nxt_pid_t -nxt_process_create(nxt_process_start_t start, void *data, const char *name) +nxt_process_create(nxt_task_t *task, nxt_process_init_t *process) { - nxt_pid_t pid; - nxt_thread_t *thr; - - thr = nxt_thread(); + nxt_pid_t pid; pid = fork(); switch (pid) { case -1: - nxt_log_alert(thr->log, "fork() failed while creating \"%s\" %E", - name, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "fork() failed while creating \"%s\" %E", + process->name, nxt_errno); break; case 0: @@ -39,14 +38,14 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name) nxt_pid = getpid(); /* Clean inherited cached thread tid. */ - thr->tid = 0; + task->thread->tid = 0; - start(data); + nxt_process_start(task, process); break; default: /* A parent. */ - nxt_log_debug(thr->log, "fork(): %PI", pid); + nxt_debug(task, "fork(\"%s\"): %PI", process->name, pid); break; } @@ -54,6 +53,73 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name) } +static void +nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) +{ + nxt_int_t ret; + nxt_thread_t *thread; + nxt_runtime_t *rt; + nxt_event_engine_t *engine; + const nxt_event_interface_t *interface; + + nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + + nxt_process_title(task, "nginext: %s", process->name); + + nxt_random_init(&nxt_random_data); + + if (process->user_cred != NULL && getuid() == 0) { + /* Super-user. */ + + ret = nxt_user_cred_set(task, process->user_cred); + if (ret != NXT_OK) { + goto fail; + } + } + + thread = task->thread; + rt = thread->runtime; + + rt->type = process->type; + + engine = thread->engine; + + /* Update inherited master process event engine and signals processing. */ + engine->signals->sigev = process->signals; + + interface = nxt_service_get(rt->services, "engine", rt->engine); + if (interface == NULL) { + goto fail; + } + + if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) { + goto fail; + } + + nxt_port_read_close(process->master_port); + nxt_port_write_enable(task, process->master_port); + + /* A worker process port. */ + nxt_port_create(thread, process->port, process->port_handlers); + + ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, + 60000 * 1000000LL); + if (ret != NXT_OK) { + goto fail; + } + + ret = process->start(task, rt); + + if (ret == NXT_OK) { + return; + } + +fail: + + exit(1); +} + + #if (NXT_HAVE_POSIX_SPAWN) /* @@ -76,14 +142,15 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name) */ nxt_pid_t -nxt_process_execute(char *name, char **argv, char **envp) +nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp) { nxt_pid_t pid; - nxt_thread_log_debug("posix_spawn(\"%s\")", name); + nxt_debug(task, "posix_spawn(\"%s\")", name); if (posix_spawn(&pid, name, NULL, NULL, argv, envp) != 0) { - nxt_thread_log_alert("posix_spawn(\"%s\") failed %E", name, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "posix_spawn(\"%s\") failed %E", + name, nxt_errno); return -1; } @@ -93,7 +160,7 @@ nxt_process_execute(char *name, char **argv, char **envp) #else nxt_pid_t -nxt_process_execute(char *name, char **argv, char **envp) +nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp) { nxt_pid_t pid; @@ -109,24 +176,26 @@ nxt_process_execute(char *name, char **argv, char **envp) switch (pid) { case -1: - nxt_thread_log_alert("vfork() failed while executing \"%s\" %E", - name, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "vfork() failed while executing \"%s\" %E", + name, nxt_errno); break; case 0: /* A child. */ - nxt_thread_log_debug("execve(\"%s\")", name); + nxt_debug(task, "execve(\"%s\")", name); (void) execve(name, argv, envp); - nxt_thread_log_alert("execve(\"%s\") failed %E", name, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "execve(\"%s\") failed %E", + name, nxt_errno); exit(1); + nxt_unreachable(); break; default: /* A parent. */ - nxt_thread_log_debug("vfork(): %PI", pid); + nxt_debug(task, "vfork(): %PI", pid); break; } @@ -137,14 +206,11 @@ nxt_process_execute(char *name, char **argv, char **envp) nxt_int_t -nxt_process_daemon(void) +nxt_process_daemon(nxt_task_t *task) { nxt_fd_t fd; nxt_pid_t pid; const char *msg; - nxt_thread_t *thr; - - thr = nxt_thread(); /* * fork() followed by a parent process's exit() detaches a child process @@ -166,7 +232,7 @@ nxt_process_daemon(void) default: /* A parent. */ - nxt_log_debug(thr->log, "fork(): %PI", pid); + nxt_debug(task, "fork(): %PI", pid); exit(0); nxt_unreachable(); } @@ -174,14 +240,14 @@ nxt_process_daemon(void) nxt_pid = getpid(); /* Clean inherited cached thread tid. */ - thr->tid = 0; + task->thread->tid = 0; - nxt_log_debug(thr->log, "daemon"); + nxt_debug(task, "daemon"); /* Detach from controlling terminal. */ if (setsid() == -1) { - nxt_log_emerg(thr->log, "setsid() failed %E", nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "setsid() failed %E", nxt_errno); return NXT_ERROR; } @@ -217,7 +283,7 @@ nxt_process_daemon(void) fail: - nxt_log_emerg(thr->log, msg, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, msg, nxt_errno); return NXT_ERROR; } @@ -236,7 +302,7 @@ nxt_nanosleep(nxt_nsec_t ns) nxt_int_t -nxt_user_cred_get(nxt_user_cred_t *uc, const char *group) +nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) { struct group *grp; struct passwd *pwd; @@ -244,7 +310,8 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group) pwd = getpwnam(uc->user); if (nxt_slow_path(pwd == NULL)) { - nxt_thread_log_emerg("getpwnam(%s) failed %E", uc->user, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getpwnam(%s) failed %E", + uc->user, nxt_errno); return NXT_ERROR; } @@ -255,7 +322,8 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group) grp = getgrnam(group); if (nxt_slow_path(grp == NULL)) { - nxt_thread_log_emerg("getgrnam(%s) failed %E", group, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getgrnam(%s) failed %E", + group, nxt_errno); return NXT_ERROR; } @@ -263,7 +331,7 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group) } if (getuid() == 0) { - return nxt_user_groups_get(uc); + return nxt_user_groups_get(task, uc); } return NXT_OK; @@ -297,7 +365,7 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group) */ static nxt_int_t -nxt_user_groups_get(nxt_user_cred_t *uc) +nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) { int nsaved, ngroups; nxt_int_t ret; @@ -306,11 +374,11 @@ nxt_user_groups_get(nxt_user_cred_t *uc) nsaved = getgroups(0, NULL); if (nsaved == -1) { - nxt_thread_log_emerg("getgroups(0, NULL) failed %E", nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getgroups(0, NULL) failed %E", nxt_errno); return NXT_ERROR; } - nxt_thread_log_debug("getgroups(0, NULL): %d", nsaved); + nxt_debug(task, "getgroups(0, NULL): %d", nsaved); if (nsaved > NGROUPS_MAX) { /* MacOSX case. */ @@ -328,14 +396,15 @@ nxt_user_groups_get(nxt_user_cred_t *uc) nsaved = getgroups(nsaved, saved); if (nsaved == -1) { - nxt_thread_log_emerg("getgroups(%d) failed %E", nsaved, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getgroups(%d) failed %E", + nsaved, nxt_errno); goto fail; } - nxt_thread_log_debug("getgroups(): %d", nsaved); + nxt_debug(task, "getgroups(): %d", nsaved); if (initgroups(uc->user, uc->base_gid) != 0) { - nxt_thread_log_emerg("initgroups(%s, %d) failed", + nxt_log(task, NXT_LOG_CRIT, "initgroups(%s, %d) failed", uc->user, uc->base_gid); goto restore; } @@ -343,11 +412,11 @@ nxt_user_groups_get(nxt_user_cred_t *uc) ngroups = getgroups(0, NULL); if (ngroups == -1) { - nxt_thread_log_emerg("getgroups(0, NULL) failed %E", nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getgroups(0, NULL) failed %E", nxt_errno); goto restore; } - nxt_thread_log_debug("getgroups(0, NULL): %d", ngroups); + nxt_debug(task, "getgroups(0, NULL): %d", ngroups); uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t)); @@ -358,7 +427,8 @@ nxt_user_groups_get(nxt_user_cred_t *uc) ngroups = getgroups(ngroups, uc->gids); if (ngroups == -1) { - nxt_thread_log_emerg("getgroups(%d) failed %E", ngroups, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "getgroups(%d) failed %E", + ngroups, nxt_errno); goto restore; } @@ -377,9 +447,9 @@ nxt_user_groups_get(nxt_user_cred_t *uc) p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]); } - nxt_thread_log_debug("user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s", - uc->user, (uint64_t) uc->uid, - (uint64_t) uc->base_gid, p - msg, msg); + nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s", + uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid, + p - msg, msg); } #endif @@ -388,7 +458,8 @@ nxt_user_groups_get(nxt_user_cred_t *uc) restore: if (setgroups(nsaved, saved) != 0) { - nxt_thread_log_emerg("setgroups(%d) failed %E", nsaved, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "setgroups(%d) failed %E", + nsaved, nxt_errno); ret = NXT_ERROR; } @@ -401,34 +472,35 @@ fail: nxt_int_t -nxt_user_cred_set(nxt_user_cred_t *uc) +nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) { - nxt_thread_log_debug("user cred set: \"%s\" uid:%uL base gid:%uL", - uc->user, (uint64_t) uc->uid, uc->base_gid); + nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL", + uc->user, (uint64_t) uc->uid, uc->base_gid); if (setgid(uc->base_gid) != 0) { - nxt_thread_log_emerg("setgid(%d) failed %E", uc->base_gid, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "setgid(%d) failed %E", + uc->base_gid, nxt_errno); return NXT_ERROR; } if (uc->gids != NULL) { if (setgroups(uc->ngroups, uc->gids) != 0) { - nxt_thread_log_emerg("setgroups(%i) failed %E", - uc->ngroups, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "setgroups(%i) failed %E", + uc->ngroups, nxt_errno); return NXT_ERROR; } } else { /* MacOSX fallback. */ if (initgroups(uc->user, uc->base_gid) != 0) { - nxt_thread_log_emerg("initgroups(%s, %d) failed", - uc->user, uc->base_gid); + nxt_log(task, NXT_LOG_CRIT, "initgroups(%s, %d) failed", + uc->user, uc->base_gid); return NXT_ERROR; } } if (setuid(uc->uid) != 0) { - nxt_thread_log_emerg("setuid(%d) failed %E", uc->uid, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "setuid(%d) failed %E", uc->uid, nxt_errno); return NXT_ERROR; } diff --git a/src/nxt_process.h b/src/nxt_process.h index 702d39f2..92f673a2 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -4,62 +4,95 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_UNIX_PROCESS_H_INCLUDED_ -#define _NXT_UNIX_PROCESS_H_INCLUDED_ +#ifndef _NXT_PROCESS_H_INCLUDED_ +#define _NXT_PROCESS_H_INCLUDED_ -typedef pid_t nxt_pid_t; +typedef enum { + NXT_PROCESS_SINGLE = 0, + NXT_PROCESS_MASTER, + NXT_PROCESS_CONTROLLER, + NXT_PROCESS_ROUTER, + NXT_PROCESS_WORKER, +} nxt_process_type_t; -#define \ -nxt_sched_yield() \ - sched_yield() +typedef pid_t nxt_pid_t; +typedef uid_t nxt_uid_t; +typedef gid_t nxt_gid_t; -#define \ -nxt_process_id() \ - nxt_pid +typedef struct { + const char *user; + nxt_uid_t uid; + nxt_gid_t base_gid; + nxt_uint_t ngroups; + nxt_gid_t *gids; +} nxt_user_cred_t; +typedef struct nxt_process_init_s nxt_process_init_t; +typedef nxt_int_t (*nxt_process_star_t)(nxt_task_t *task, nxt_runtime_t *rt); -/* - * Solaris declares abort() as __NORETURN, - * raise(SIGABRT) is mostly the same. - */ -#define \ -nxt_abort() \ - (void) raise(SIGABRT) +struct nxt_process_init_s { + nxt_process_star_t start; + const char *name; + nxt_user_cred_t *user_cred; + nxt_port_t *port; + nxt_port_t *master_port; + nxt_port_handler_t *port_handlers; + const nxt_sig_event_t *signals; -typedef void (*nxt_process_start_t)(void *data); + nxt_process_type_t type:8; /* 3 bits */ +}; -NXT_EXPORT nxt_pid_t nxt_process_create(nxt_process_start_t start, void *data, - const char *name); -NXT_EXPORT nxt_pid_t nxt_process_execute(char *name, char **argv, char **envp); -NXT_EXPORT nxt_int_t nxt_process_daemon(void); + +typedef struct { + nxt_pid_t pid; + nxt_array_t *ports; /* of nxt_port_t */ + nxt_process_init_t *init; +} nxt_process_t; + + +NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task, + nxt_process_init_t *process); +NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name, + char **argv, char **envp); +NXT_EXPORT nxt_int_t nxt_process_daemon(nxt_task_t *task); NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns); -NXT_EXPORT void nxt_process_arguments(char **orig_argv, char ***orig_envp); +NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, + char ***orig_envp); #if (NXT_HAVE_SETPROCTITLE) -#define \ -nxt_process_title(title) \ - setproctitle("%s", title) +#define nxt_process_title(task, fmt, ...) \ + setproctitle(fmt, __VA_ARGS__) #elif (NXT_LINUX || NXT_SOLARIS || NXT_MACOSX) #define NXT_SETPROCTITLE_ARGV 1 -NXT_EXPORT void nxt_process_title(const char *title); +NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...); -#else +#endif -#define \ -nxt_process_title(title) -#endif +#define nxt_sched_yield() \ + sched_yield() +/* + * Solaris declares abort() as __NORETURN, + * raise(SIGABRT) is mostly the same. + */ + +#define nxt_abort() \ + (void) raise(SIGABRT) + +NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, + const char *group); +NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc); NXT_EXPORT extern nxt_pid_t nxt_pid; NXT_EXPORT extern nxt_pid_t nxt_ppid; @@ -67,21 +100,4 @@ NXT_EXPORT extern char **nxt_process_argv; NXT_EXPORT extern char ***nxt_process_environ; -typedef uid_t nxt_uid_t; -typedef gid_t nxt_gid_t; - - -typedef struct { - const char *user; - nxt_uid_t uid; - nxt_gid_t base_gid; - nxt_uint_t ngroups; - nxt_gid_t *gids; -} nxt_user_cred_t; - - -NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_user_cred_t *uc, const char *group); -NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_user_cred_t *uc); - - -#endif /* _NXT_UNIX_PROCESS_H_INCLUDED_ */ +#endif /* _NXT_PROCESS_H_INCLUDED_ */ diff --git a/src/nxt_process_title.c b/src/nxt_process_title.c index 76533a6e..3e72f772 100644 --- a/src/nxt_process_title.c +++ b/src/nxt_process_title.c @@ -43,7 +43,7 @@ static u_char *nxt_process_title_end; void -nxt_process_arguments(char **orig_argv, char ***orig_envp) +nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp) { u_char *p, *end, *argv_end, **argv, **env; size_t size, argv_size, environ_size, strings_size; @@ -126,7 +126,7 @@ nxt_process_arguments(char **orig_argv, char ***orig_envp) * There is no reason to modify environ if arguments * and environment are not contiguous. */ - nxt_thread_log_debug("arguments and environment are not contiguous"); + nxt_debug(task, "arguments and environment are not contiguous"); goto done; } @@ -187,9 +187,10 @@ done: void -nxt_process_title(const char *title) +nxt_process_title(nxt_task_t *task, const char *fmt, ...) { - u_char *p, *start, *end; + u_char *p, *start, *end; + va_list args; start = nxt_process_title_start; @@ -199,7 +200,9 @@ nxt_process_title(const char *title) end = nxt_process_title_end; - p = nxt_sprintf(start, end, "%s", title); + va_start(args, fmt); + p = nxt_vsprintf(start, end, fmt, args); + va_end(args); #if (NXT_SOLARIS) /* @@ -238,7 +241,7 @@ nxt_process_title(const char *title) */ nxt_memset(p, '\0', end - p); - nxt_thread_log_debug("setproctitle: \"%s\"", start); + nxt_debug(task, "setproctitle: \"%s\"", start); } #else /* !(NXT_SETPROCTITLE_ARGV) */ diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index e983431b..1b838041 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -15,7 +15,7 @@ #endif #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_application.h> @@ -45,7 +45,7 @@ static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); -extern nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_cycle_t *cycle); +extern nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt); nxt_application_module_t nxt_python_module = { @@ -130,7 +130,7 @@ static nxt_app_request_t *nxt_app_request; nxt_int_t -nxt_python_wsgi_init(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt) { char **argv; u_char *p, *dir; @@ -295,7 +295,8 @@ nxt_python_init(nxt_thread_t *thr) return NXT_ERROR; } - pModule = PyImport_ExecCodeModuleEx((char *) "_wsgi_nginext", co, (char *) script); + pModule = PyImport_ExecCodeModuleEx((char *) "_wsgi_nginext", co, + (char *) script); Py_XDECREF(co); #endif diff --git a/src/nxt_queue.h b/src/nxt_queue.h index e5506630..e8f7c245 100644 --- a/src/nxt_queue.h +++ b/src/nxt_queue.h @@ -68,7 +68,7 @@ nxt_queue_is_empty(queue) \ * * for (lnk = nxt_queue_last(queue); * lnk != nxt_queue_head(queue); - * lnk = nxt_queue_next(lnk)) + * lnk = nxt_queue_prev(lnk)) * { * tp = nxt_queue_link_data(lnk, nxt_type_t, link); */ diff --git a/src/nxt_router.c b/src/nxt_router.c new file mode 100644 index 00000000..caef8503 --- /dev/null +++ b/src/nxt_router.c @@ -0,0 +1,85 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_runtime.h> +#include <nxt_master_process.h> + + +static nxt_int_t nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); + + +nxt_int_t +nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) +{ + if (nxt_router_listen_socket(task, rt) != NXT_OK) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_sockaddr_t *sa; + nxt_listen_socket_t *ls; + + sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (sa == NULL) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons(8000); + + nxt_sockaddr_text(sa); + + ls = nxt_runtime_listen_socket_add(rt, sa); + if (ls == NULL) { + return NXT_ERROR; + } + + ls->read_after_accept = 1; + + ls->flags = NXT_NONBLOCK; + +#if 0 + /* STUB */ + wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); + if (wq == NULL) { + return NXT_ERROR; + } + nxt_work_queue_name(wq, "listen"); + /**/ + + ls->work_queue = wq; +#endif + ls->handler = nxt_stream_connection_init; + + /* + * Connection memory pool chunk size is tunned to + * allocate the most data in one mem_pool chunk. + */ + ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_event_conn_listen(task, ls) != NXT_OK) { + return NXT_ERROR; + } + + return NXT_OK; +} diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c new file mode 100644 index 00000000..190d0459 --- /dev/null +++ b/src/nxt_runtime.c @@ -0,0 +1,1499 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_runtime.h> +#include <nxt_port.h> +#include <nxt_master_process.h> + + +static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_processes(nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); +static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); +static void nxt_runtime_initial_start(nxt_task_t *task); +static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, + nxt_runtime_t *rt); +static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); +static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_str_t *addr); +static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_str_t *addr); +static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_str_t *addr); +static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_str_t *addr); +static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task, + nxt_file_name_t *pid_file); + +#if (NXT_THREADS) +static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, + nxt_runtime_cont_t cont); +#endif + + +nxt_int_t +nxt_runtime_create(nxt_task_t *task) +{ + nxt_int_t ret; + nxt_array_t *listen_sockets; + nxt_runtime_t *rt; + nxt_mem_pool_t *mp; + + mp = nxt_mem_pool_create(1024); + + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; + } + + /* This alloction cannot fail. */ + rt = nxt_mem_zalloc(mp, sizeof(nxt_runtime_t)); + + task->thread->runtime = rt; + rt->mem_pool = mp; + + rt->prefix = nxt_current_directory(mp); + if (nxt_slow_path(rt->prefix == NULL)) { + goto fail; + } + + rt->conf_prefix = rt->prefix; + + rt->services = nxt_services_init(mp); + if (nxt_slow_path(rt->services == NULL)) { + goto fail; + } + + listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); + if (nxt_slow_path(listen_sockets == NULL)) { + goto fail; + } + + rt->listen_sockets = listen_sockets; + + ret = nxt_runtime_inherited_listen_sockets(task, rt); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + if (nxt_runtime_hostname(task, rt) != NXT_OK) { + goto fail; + } + + if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) { + goto fail; + } + + if (nxt_runtime_event_engines(task, rt) != NXT_OK) { + goto fail; + } + + if (nxt_slow_path(nxt_runtime_processes(rt) != NXT_OK)) { + goto fail; + } + + if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { + goto fail; + } + + rt->start = nxt_runtime_initial_start; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_runtime_start, task, rt, NULL); + + return NXT_OK; + +fail: + + nxt_mem_pool_destroy(mp); + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) +{ + u_char *v, *p; + nxt_int_t type; + nxt_array_t *inherited_sockets; + nxt_socket_t s; + nxt_listen_socket_t *ls; + + v = (u_char *) getenv("NGINX"); + + if (v == NULL) { + return nxt_runtime_systemd_listen_sockets(task, rt); + } + + nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); + + inherited_sockets = nxt_array_create(rt->mem_pool, + 1, sizeof(nxt_listen_socket_t)); + if (inherited_sockets == NULL) { + return NXT_ERROR; + } + + rt->inherited_sockets = inherited_sockets; + + for (p = v; *p != '\0'; p++) { + + if (*p == ';') { + s = nxt_int_parse(v, p - v); + + if (nxt_slow_path(s < 0)) { + nxt_log(task, NXT_LOG_CRIT, "invalid socket number " + "\"%s\" in NGINX environment variable, " + "ignoring the rest of the variable", v); + return NXT_ERROR; + } + + v = p + 1; + + ls = nxt_array_zero_add(inherited_sockets); + if (nxt_slow_path(ls == NULL)) { + return NXT_ERROR; + } + + ls->socket = s; + + ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); + if (nxt_slow_path(ls->sockaddr == NULL)) { + return NXT_ERROR; + } + + type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); + if (nxt_slow_path(type == -1)) { + return NXT_ERROR; + } + + ls->sockaddr->type = (uint16_t) type; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) +{ + u_char *nfd, *pid; + nxt_int_t n; + nxt_array_t *inherited_sockets; + nxt_socket_t s; + nxt_listen_socket_t *ls; + + /* + * Number of listening sockets passed. The socket + * descriptors start from number 3 and are sequential. + */ + nfd = (u_char *) getenv("LISTEN_FDS"); + if (nfd == NULL) { + return NXT_OK; + } + + /* The pid of the service process. */ + pid = (u_char *) getenv("LISTEN_PID"); + if (pid == NULL) { + return NXT_OK; + } + + n = nxt_int_parse(nfd, nxt_strlen(nfd)); + if (n < 0) { + return NXT_OK; + } + + if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) { + return NXT_OK; + } + + nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); + + inherited_sockets = nxt_array_create(rt->mem_pool, + n, sizeof(nxt_listen_socket_t)); + if (inherited_sockets == NULL) { + return NXT_ERROR; + } + + rt->inherited_sockets = inherited_sockets; + + for (s = 3; s < n; s++) { + ls = nxt_array_zero_add(inherited_sockets); + if (nxt_slow_path(ls == NULL)) { + return NXT_ERROR; + } + + ls->socket = s; + + ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s); + if (nxt_slow_path(ls->sockaddr == NULL)) { + return NXT_ERROR; + } + + ls->sockaddr->type = SOCK_STREAM; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_event_engine_t *engine, **e; + const nxt_event_interface_t *interface; + + rt->engines = nxt_array_create(rt->mem_pool, 1, + sizeof(nxt_event_engine_t *)); + if (nxt_slow_path(rt->engines == NULL)) { + return NXT_ERROR; + } + + e = nxt_array_add(rt->engines); + if (nxt_slow_path(e == NULL)) { + return NXT_ERROR; + } + + interface = nxt_service_get(rt->services, "engine", NULL); + + if (nxt_slow_path(interface == NULL)) { + /* TODO: log */ + return NXT_ERROR; + } + + engine = nxt_event_engine_create(task, interface, + nxt_master_process_signals, 0, 0); + + if (nxt_slow_path(engine == NULL)) { + return NXT_ERROR; + } + + engine->id = rt->last_engine_id++; + *e = engine; + + return NXT_OK; +} + + +static nxt_int_t +nxt_runtime_processes(nxt_runtime_t *rt) +{ + rt->processes = nxt_array_create(rt->mem_pool, 4, sizeof(nxt_process_t)); + if (nxt_slow_path(rt->processes == NULL)) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) +{ +#if (NXT_THREADS) + nxt_int_t ret; + nxt_array_t *thread_pools; + + thread_pools = nxt_array_create(rt->mem_pool, 1, + sizeof(nxt_thread_pool_t *)); + + if (nxt_slow_path(thread_pools == NULL)) { + return NXT_ERROR; + } + + rt->thread_pools = thread_pools; + ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + +#endif + + return NXT_OK; +} + + +static void +nxt_runtime_start(nxt_task_t *task, void *obj, void *data) +{ + nxt_uint_t i; + nxt_runtime_t *rt; + + rt = obj; + + nxt_debug(task, "rt conf done"); + + nxt_mem_pool_debug_lock(rt->mem_pool, nxt_thread_tid(task->thread)); + + task->thread->log->ctx_handler = NULL; + task->thread->log->ctx = NULL; + + if (nxt_runtime_conf_init(task, rt) != NXT_OK) { + goto fail; + } + + for (i = 0; i < nxt_init_modules_n; i++) { + if (nxt_init_modules[i](task->thread, rt) != NXT_OK) { + goto fail; + } + } + + if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { + goto fail; + } + + if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { + goto fail; + } + +#if (NXT_THREADS) + + /* + * Thread pools should be destroyed before starting worker + * processes, because thread pool semaphores will stick in + * locked state in new processes after fork(). + */ + nxt_runtime_thread_pool_destroy(task, rt, rt->start); + +#else + + rt->start(task->thread, rt); + +#endif + + return; + +fail: + + nxt_runtime_quit(task); +} + + +static void +nxt_runtime_initial_start(nxt_task_t *task) +{ + nxt_int_t ret; + nxt_thread_t *thr; + nxt_runtime_t *rt; + const nxt_event_interface_t *interface; + + thr = task->thread; + rt = thr->runtime; + + if (rt->inherited_sockets == NULL && rt->daemon) { + + if (nxt_process_daemon(task) != NXT_OK) { + goto fail; + } + + /* + * An event engine should be updated after fork() + * even if an event facility was not changed because: + * 1) inherited kqueue descriptor is invalid, + * 2) the signal thread is not inherited. + */ + interface = nxt_service_get(rt->services, "engine", rt->engine); + if (interface == NULL) { + goto fail; + } + + ret = nxt_event_engine_change(task->thread->engine, interface, + rt->batch); + if (ret != NXT_OK) { + goto fail; + } + } + + ret = nxt_runtime_pid_file_create(task, rt->pid_file); + if (ret != NXT_OK) { + goto fail; + } + + if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) { + goto fail; + } + + thr->engine->max_connections = rt->engine_connections; + + if (rt->master_process) { + if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) { + return; + } + + } else { + nxt_single_process_start(thr, task, rt); + return; + } + +fail: + + nxt_runtime_quit(task); +} + + +static void +nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) +{ +#if (NXT_THREADS) + nxt_int_t ret; + + ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads, + 60000 * 1000000LL); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_runtime_quit(task); + return; + } + +#endif + + rt->type = NXT_PROCESS_SINGLE; + + nxt_runtime_listen_sockets_enable(task, rt); + + return; +} + + +void +nxt_runtime_quit(nxt_task_t *task) +{ + nxt_bool_t done; + nxt_runtime_t *rt; + nxt_event_engine_t *engine; + + rt = task->thread->runtime; + engine = task->thread->engine; + + nxt_debug(task, "exiting"); + + done = 1; + + if (!engine->shutdown) { + engine->shutdown = 1; + +#if (NXT_THREADS) + + if (!nxt_array_is_empty(rt->thread_pools)) { + nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit); + done = 0; + } + +#endif + + if (rt->type == NXT_PROCESS_MASTER) { + nxt_master_stop_worker_processes(task, rt); + done = 0; + } + } + + nxt_runtime_close_idle_connections(engine); + + if (done) { + nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit, + task, rt, engine); + } +} + + +static void +nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) +{ + nxt_queue_t *idle; + nxt_queue_link_t *link, *next; + nxt_event_conn_t *c; + + nxt_debug(&engine->task, "close idle connections"); + + idle = &engine->idle_connections; + + for (link = nxt_queue_head(idle); + link != nxt_queue_tail(idle); + link = next) + { + next = nxt_queue_next(link); + c = nxt_queue_link_data(link, nxt_event_conn_t, link); + + if (!c->socket.read_ready) { + nxt_queue_remove(link); + nxt_event_conn_close(engine, c); + } + } +} + + +static void +nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) +{ + nxt_runtime_t *rt; + nxt_event_engine_t *engine; + + rt = obj; + engine = data; + +#if (NXT_THREADS) + + nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts); + + if (!nxt_array_is_empty(rt->thread_pools)) { + return; + } + +#endif + + if (rt->type <= NXT_PROCESS_MASTER) { + if (rt->pid_file != NULL) { + nxt_file_delete(rt->pid_file); + } + } + + if (!engine->event.signal_support) { + nxt_event_engine_signals_stop(engine); + } + + nxt_debug(task, "exit"); + + exit(0); + nxt_unreachable(); +} + + +static nxt_int_t +nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_event_engine_t *engine; + const nxt_event_interface_t *interface; + + engine = task->thread->engine; + + if (engine->batch == rt->batch + && nxt_strcmp(engine->event.name, rt->engine) == 0) + { + return NXT_OK; + } + + interface = nxt_service_get(rt->services, "engine", rt->engine); + + if (interface != NULL) { + return nxt_event_engine_change(engine, interface, rt->batch); + } + + return NXT_ERROR; +} + + +void +nxt_runtime_event_engine_free(nxt_runtime_t *rt) +{ + nxt_event_engine_t *engine, **engines; + + engines = rt->engines->elts; + engine = engines[0]; + nxt_array_remove(rt->engines, &engines[0]); + + nxt_event_engine_free(engine); +} + + +#if (NXT_THREADS) + +static void nxt_runtime_thread_pool_init(void); +static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, + void *data); + + +nxt_int_t +nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, + nxt_uint_t max_threads, nxt_nsec_t timeout) +{ + nxt_thread_pool_t *thread_pool, **tp; + + tp = nxt_array_add(rt->thread_pools); + if (tp == NULL) { + return NXT_ERROR; + } + + thread_pool = nxt_thread_pool_create(max_threads, timeout, + nxt_runtime_thread_pool_init, + thr->engine, + nxt_runtime_thread_pool_exit); + + if (nxt_fast_path(thread_pool != NULL)) { + *tp = thread_pool; + } + + return NXT_OK; +} + + +static void +nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, + nxt_runtime_cont_t cont) +{ + nxt_uint_t n; + nxt_thread_pool_t **tp; + + rt->continuation = cont; + + n = rt->thread_pools->nelts; + + if (n == 0) { + cont(task); + return; + } + + tp = rt->thread_pools->elts; + + do { + nxt_thread_pool_destroy(*tp); + + tp++; + n--; + } while (n != 0); +} + + +static void +nxt_runtime_thread_pool_init(void) +{ +#if (NXT_REGEX) + nxt_regex_init(0); +#endif +} + + +static void +nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) +{ + nxt_uint_t i, n; + nxt_runtime_t *rt; + nxt_thread_pool_t *tp, **thread_pools; + nxt_thread_handle_t handle; + + tp = obj; + + if (data != NULL) { + handle = (nxt_thread_handle_t) (uintptr_t) data; + nxt_thread_wait(handle); + } + + rt = task->thread->runtime; + + thread_pools = rt->thread_pools->elts; + n = rt->thread_pools->nelts; + + nxt_debug(task, "thread pools: %ui", n); + + for (i = 0; i < n; i++) { + + if (tp == thread_pools[i]) { + nxt_array_remove(rt->thread_pools, &thread_pools[i]); + + if (n == 1) { + /* The last thread pool. */ + rt->continuation(task); + } + + return; + } + } +} + +#endif + + +static nxt_int_t +nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + nxt_str_t *prefix; + nxt_file_t *file; + nxt_file_name_str_t file_name; + const nxt_event_interface_t *interface; + + rt->daemon = 1; + rt->master_process = 1; + rt->engine_connections = 256; + rt->worker_processes = 1; + rt->auxiliary_threads = 2; + rt->user_cred.user = "nobody"; + rt->group = NULL; + rt->pid = "nginext.pid"; + rt->error_log = "error.log"; + + if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_runtime_controller_socket(task, rt) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { + return NXT_ERROR; + } + + /* An engine's parameters. */ + + interface = nxt_service_get(rt->services, "engine", rt->engine); + if (interface == NULL) { + return NXT_ERROR; + } + + rt->engine = interface->name; + + prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix; + + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", + prefix, rt->pid); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + rt->pid_file = file_name.start; + + prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix; + + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z", + prefix, rt->error_log); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + file = nxt_list_first(rt->log_files); + file->name = file_name.start; + + return NXT_OK; +} + + +static nxt_int_t +nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) +{ + char *p, **argv; + nxt_int_t n; + nxt_str_t addr; + nxt_sockaddr_t *sa; + + argv = nxt_process_argv; + + while (*argv != NULL) { + p = *argv++; + + if (nxt_strcmp(p, "--listen") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--listen\""); + return NXT_ERROR; + } + + p = *argv++; + + addr.length = nxt_strlen(p); + addr.start = (u_char *) p; + + sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr); + + if (sa == NULL) { + return NXT_ERROR; + } + + rt->controller_listen = sa; + + continue; + } + + if (nxt_strcmp(p, "--upstream") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--upstream\""); + return NXT_ERROR; + } + + p = *argv++; + + rt->upstream.length = nxt_strlen(p); + rt->upstream.start = (u_char *) p; + + continue; + } + + if (nxt_strcmp(p, "--workers") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--workers\""); + return NXT_ERROR; + } + + p = *argv++; + n = nxt_int_parse((u_char *) p, nxt_strlen(p)); + + if (n < 1) { + nxt_log(task, NXT_LOG_CRIT, + "invalid number of workers: \"%s\"", p); + return NXT_ERROR; + } + + rt->worker_processes = n; + + continue; + } + + if (nxt_strcmp(p, "--user") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--user\""); + return NXT_ERROR; + } + + p = *argv++; + + rt->user_cred.user = p; + + continue; + } + + if (nxt_strcmp(p, "--group") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--group\""); + return NXT_ERROR; + } + + p = *argv++; + + rt->group = p; + + continue; + } + + if (nxt_strcmp(p, "--pid") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--pid\""); + return NXT_ERROR; + } + + p = *argv++; + + rt->pid = p; + + continue; + } + + if (nxt_strcmp(p, "--log") == 0) { + if (*argv == NULL) { + nxt_log(task, NXT_LOG_CRIT, + "no argument for option \"--log\""); + return NXT_ERROR; + } + + p = *argv++; + + rt->error_log = p; + + continue; + } + + if (nxt_strcmp(p, "--no-daemonize") == 0) { + rt->daemon = 0; + continue; + } + } + + return NXT_OK; +} + + +static nxt_sockaddr_t * +nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_str_t *addr) +{ + u_char *p; + size_t length; + + length = addr->length; + p = addr->start; + + if (length >= 5 && nxt_memcmp(p, (u_char *) "unix:", 5) == 0) { + return nxt_runtime_sockaddr_unix_parse(task, mp, addr); + } + + if (length != 0 && *p == '[') { + return nxt_runtime_sockaddr_inet6_parse(task, mp, addr); + } + + return nxt_runtime_sockaddr_inet_parse(task, mp, addr); +} + + +static nxt_sockaddr_t * +nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_str_t *addr) +{ +#if (NXT_HAVE_UNIX_DOMAIN) + u_char *p; + size_t length, socklen; + nxt_sockaddr_t *sa; + + /* + * Actual sockaddr_un length can be lesser or even larger than defined + * struct sockaddr_un length (see comment in unix/nxt_socket.h). So + * limit maximum Unix domain socket address length by defined sun_path[] + * length because some OSes accept addresses twice larger than defined + * struct sockaddr_un. Also reserve space for a trailing zero to avoid + * ambiguity, since many OSes accept Unix domain socket addresses + * without a trailing zero. + */ + const size_t max_len = sizeof(struct sockaddr_un) + - offsetof(struct sockaddr_un, sun_path) - 1; + + /* cutting "unix:" */ + length = addr->length - 5; + p = addr->start + 5; + + if (length == 0) { + nxt_log(task, NXT_LOG_CRIT, + "unix domain socket \"%V\" name is invalid", addr); + return NULL; + } + + if (length > max_len) { + nxt_log(task, NXT_LOG_CRIT, + "unix domain socket \"%V\" name is too long", addr); + return NULL; + } + + socklen = offsetof(struct sockaddr_un, sun_path) + length + 1; + +#if (NXT_LINUX) + + /* + * Linux unix(7): + * + * abstract: an abstract socket address is distinguished by the fact + * that sun_path[0] is a null byte ('\0'). The socket's address in + * this namespace is given by the additional bytes in sun_path that + * are covered by the specified length of the address structure. + * (Null bytes in the name have no special significance.) + */ + if (p[0] == '@') { + p[0] = '\0'; + socklen--; + } + +#endif + + sa = nxt_sockaddr_alloc(mp, socklen, addr->length); + + if (nxt_slow_path(sa == NULL)) { + return NULL; + } + + sa->type = SOCK_STREAM; + + sa->u.sockaddr_un.sun_family = AF_UNIX; + nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length); + + return sa; + +#else /* !(NXT_HAVE_UNIX_DOMAIN) */ + + nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported", + addr); + + return NULL; + +#endif +} + + +static nxt_sockaddr_t * +nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_str_t *addr) +{ +#if (NXT_INET6) + u_char *p, *addr, *addr_end; + size_t length; + nxt_int_t port; + nxt_mem_pool_t *mp; + nxt_sockaddr_t *sa; + struct in6_addr *in6_addr; + + length = addr->length - 1; + p = addr->start + 1; + + addr_end = nxt_memchr(p, ']', length); + + if (addr_end == NULL) { + goto invalid_address; + } + + sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6)); + + if (nxt_slow_path(sa == NULL)) { + return NULL; + } + + in6_addr = &sa->u.sockaddr_in6.sin6_addr; + + if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) { + goto invalid_address; + } + + p = addr_end + 1; + length = (p + length) - p; + + if (length == 0) { + goto found; + } + + if (*p == ':') { + port = nxt_int_parse(p + 1, length - 1); + + if (port >= 1 && port <= 65535) { + goto found; + } + } + + nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); + + return NULL; + +found: + + sa->type = SOCK_STREAM; + + sa->u.sockaddr_in6.sin6_family = AF_INET6; + sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port); + + return sa; + +invalid_address: + + nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr); + + return NULL; + +#else + + nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr); + + return NULL; + +#endif +} + + +static nxt_sockaddr_t * +nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_str_t *addr) +{ + u_char *p, *ip; + size_t length; + in_addr_t s_addr; + nxt_int_t port; + nxt_sockaddr_t *sa; + + s_addr = INADDR_ANY; + + length = addr->length; + ip = addr->start; + + p = nxt_memchr(ip, ':', length); + + if (p == NULL) { + + /* single value port, or address */ + + port = nxt_int_parse(ip, length); + + if (port > 0) { + /* "*:XX" */ + + if (port < 1 || port > 65535) { + goto invalid_port; + } + + } else { + /* "x.x.x.x" */ + + s_addr = nxt_inet_addr(ip, length); + + if (s_addr == INADDR_NONE) { + goto invalid_port; + } + + port = 8080; + } + + } else { + + /* x.x.x.x:XX */ + + p++; + length = (ip + length) - p; + port = nxt_int_parse(p, length); + + if (port < 1 || port > 65535) { + goto invalid_port; + } + + length = (p - 1) - ip; + + if (length != 1 || ip[0] != '*') { + s_addr = nxt_inet_addr(ip, length); + + if (s_addr == INADDR_NONE) { + goto invalid_addr; + } + + /* "x.x.x.x:XX" */ + } + } + + sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (nxt_slow_path(sa == NULL)) { + return NULL; + } + + sa->type = SOCK_STREAM; + + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons((in_port_t) port); + sa->u.sockaddr_in.sin_addr.s_addr = s_addr; + + return sa; + +invalid_port: + + nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr); + + return NULL; + +invalid_addr: + + nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", addr); + + return NULL; +} + + +nxt_listen_socket_t * +nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa) +{ + nxt_mem_pool_t *mp; + nxt_listen_socket_t *ls; + + ls = nxt_array_zero_add(rt->listen_sockets); + if (ls == NULL) { + return NULL; + } + + mp = rt->mem_pool; + + ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, + sa->length); + if (ls->sockaddr == NULL) { + return NULL; + } + + ls->sockaddr->type = sa->type; + + nxt_sockaddr_text(ls->sockaddr); + + ls->socket = -1; + ls->backlog = NXT_LISTEN_BACKLOG; + + return ls; +} + + +static nxt_int_t +nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt) +{ + size_t length; + char hostname[NXT_MAXHOSTNAMELEN + 1]; + + if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) { + nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno); + return NXT_ERROR; + } + + /* + * Linux gethostname(2): + * + * If the null-terminated hostname is too large to fit, + * then the name is truncated, and no error is returned. + * + * For this reason an additional byte is reserved in the buffer. + */ + hostname[NXT_MAXHOSTNAMELEN] = '\0'; + + length = nxt_strlen(hostname); + rt->hostname.length = length; + + rt->hostname.start = nxt_mem_nalloc(rt->mem_pool, length); + + if (rt->hostname.start != NULL) { + nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length); + return NXT_OK; + } + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_runtime_log_files_init(nxt_runtime_t *rt) +{ + nxt_file_t *file; + nxt_list_t *log_files; + + log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t)); + + if (nxt_fast_path(log_files != NULL)) { + rt->log_files = log_files; + + /* Preallocate the main error_log. This allocation cannot fail. */ + file = nxt_list_zero_add(log_files); + + file->fd = NXT_FILE_INVALID; + file->log_level = NXT_LOG_CRIT; + + return NXT_OK; + } + + return NXT_ERROR; +} + + +nxt_file_t * +nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name) +{ + nxt_int_t ret; + nxt_str_t *prefix; + nxt_file_t *file; + nxt_file_name_str_t file_name; + + prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix; + + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z", + prefix, name); + + if (nxt_slow_path(ret != NXT_OK)) { + return NULL; + } + + nxt_list_each(file, rt->log_files) { + + /* STUB: hardecoded case sensitive/insensitive. */ + + if (file->name != NULL + && nxt_file_name_eq(file->name, file_name.start)) + { + return file; + } + + } nxt_list_loop; + + file = nxt_list_zero_add(rt->log_files); + + if (nxt_slow_path(file == NULL)) { + return NULL; + } + + file->fd = NXT_FILE_INVALID; + file->log_level = NXT_LOG_CRIT; + file->name = file_name.start; + + return file; +} + + +static nxt_int_t +nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + nxt_file_t *file; + + nxt_list_each(file, rt->log_files) { + + ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT, + NXT_FILE_OWNER_ACCESS); + + if (ret != NXT_OK) { + return NXT_ERROR; + } + + } nxt_list_loop; + + file = nxt_list_first(rt->log_files); + + return nxt_file_stderr(file); +} + + +nxt_int_t +nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + nxt_uint_t c, p, ncurr, nprev; + nxt_listen_socket_t *curr, *prev; + + curr = rt->listen_sockets->elts; + ncurr = rt->listen_sockets->nelts; + + if (rt->inherited_sockets != NULL) { + prev = rt->inherited_sockets->elts; + nprev = rt->inherited_sockets->nelts; + + } else { + prev = NULL; + nprev = 0; + } + + for (c = 0; c < ncurr; c++) { + + for (p = 0; p < nprev; p++) { + + if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { + + ret = nxt_listen_socket_update(task, &curr[c], &prev[p]); + if (ret != NXT_OK) { + return NXT_ERROR; + } + + goto next; + } + } + + if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { + return NXT_ERROR; + } + + next: + + continue; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_uint_t i, n; + nxt_listen_socket_t *ls; + + ls = rt->listen_sockets->elts; + n = rt->listen_sockets->nelts; + + for (i = 0; i < n; i++) { + if (ls[i].flags == NXT_NONBLOCK) { + if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { + return NXT_ERROR; + } + } + } + + return NXT_OK; +} + + +nxt_str_t * +nxt_current_directory(nxt_mem_pool_t *mp) +{ + size_t length; + u_char *p; + nxt_str_t *name; + char buf[NXT_MAX_PATH_LEN]; + + length = nxt_dir_current(buf, NXT_MAX_PATH_LEN); + + if (nxt_fast_path(length != 0)) { + name = nxt_str_alloc(mp, length + 1); + + if (nxt_fast_path(name != NULL)) { + p = nxt_cpymem(name->start, buf, length); + *p = '/'; + + return name; + } + } + + return NULL; +} + + +static nxt_int_t +nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) +{ + ssize_t length; + nxt_int_t n; + nxt_file_t file; + u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE]; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = pid_file; + + n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC, + NXT_FILE_DEFAULT_ACCESS); + + if (n != NXT_OK) { + return NXT_ERROR; + } + + length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid; + + if (nxt_file_write(&file, pid, length, 0) != length) { + return NXT_ERROR; + } + + nxt_file_close(task, &file); + + return NXT_OK; +} + + +nxt_process_t * +nxt_runtime_new_process(nxt_runtime_t *rt) +{ + nxt_process_t *process; + + /* TODO: memory failures. */ + + process = nxt_array_zero_add(rt->processes); + if (nxt_slow_path(process == NULL)) { + return NULL; + } + + process->ports = nxt_array_create(rt->mem_pool, 1, sizeof(nxt_port_t)); + if (nxt_slow_path(process->ports == NULL)) { + return NULL; + } + + return process; +} diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h new file mode 100644 index 00000000..3b384235 --- /dev/null +++ b/src/nxt_runtime.h @@ -0,0 +1,108 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_RUNTIME_H_INCLUDED_ +#define _NXT_RUNTIME_H_INCLUDED_ + + +typedef void (*nxt_runtime_cont_t)(nxt_task_t *task); + + +struct nxt_runtime_s { + nxt_mem_pool_t *mem_pool; + + nxt_array_t *inherited_sockets; /* of nxt_listen_socket_t */ + nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */ + + nxt_array_t *services; /* of nxt_service_t */ + nxt_array_t *engines; /* of nxt_event_engine_t */ + + nxt_runtime_cont_t start; + + nxt_str_t *conf_prefix; + nxt_str_t *prefix; + + nxt_str_t hostname; + + nxt_file_name_t *pid_file; + +#if (NXT_THREADS) + nxt_array_t *thread_pools; /* of nxt_thread_pool_t */ + nxt_runtime_cont_t continuation; +#endif + + nxt_array_t *processes; /* of nxt_process_t */ + + nxt_list_t *log_files; /* of nxt_file_t */ + + uint32_t last_engine_id; + + nxt_process_type_t type; + + nxt_timer_t timer; + + uint8_t daemon; + uint8_t batch; + uint8_t master_process; + const char *engine; + uint32_t engine_connections; + uint32_t worker_processes; + uint32_t auxiliary_threads; + nxt_user_cred_t user_cred; + const char *group; + const char *pid; + const char *error_log; + + nxt_sockaddr_t *controller_listen; + nxt_listen_socket_t *controller_socket; + nxt_str_t upstream; +}; + + + +typedef nxt_int_t (*nxt_module_init_t)(nxt_thread_t *thr, nxt_runtime_t *rt); + + +nxt_int_t nxt_runtime_create(nxt_task_t *task); +void nxt_runtime_quit(nxt_task_t *task); + +void nxt_runtime_event_engine_free(nxt_runtime_t *rt); + +#if (NXT_THREADS) +nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, + nxt_uint_t max_threads, nxt_nsec_t timeout); +#endif + + +nxt_process_t *nxt_runtime_new_process(nxt_runtime_t *rt); + +/* STUB */ +nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt); + +nxt_str_t *nxt_current_directory(nxt_mem_pool_t *mp); + +nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt, + nxt_sockaddr_t *sa); +nxt_int_t nxt_runtime_listen_sockets_create(nxt_task_t *task, + nxt_runtime_t *rt); +nxt_int_t nxt_runtime_listen_sockets_enable(nxt_task_t *task, + nxt_runtime_t *rt); +nxt_file_t *nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name); + +/* STUB */ +void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log, + const char *fmt, ...); + +void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); +nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt); + + +extern nxt_module_init_t nxt_init_modules[]; +extern nxt_uint_t nxt_init_modules_n; + + +#endif /* _NXT_RUNTIME_H_INCLIDED_ */ diff --git a/src/nxt_signal.h b/src/nxt_signal.h index 48d5c922..2194d1f4 100644 --- a/src/nxt_signal.h +++ b/src/nxt_signal.h @@ -8,11 +8,11 @@ #define _NXT_SIGNAL_H_INCLUDED_ -typedef struct { +struct nxt_sig_event_s { int signo; nxt_work_handler_t handler; const char *name; -} nxt_sig_event_t; +}; #define nxt_event_signal(sig, handler) \ { sig, handler, #sig } diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 742cb0ba..66acfbd0 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -229,7 +229,7 @@ nxt_sockaddr_text(nxt_sockaddr_t *sa) sa->address_length = p - (start + 1); sa->port_start = sa->address_length + 2; - + *p++ = ']'; port = sa->u.sockaddr_in6.sin6_port; diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c index 89201893..e89cd6fd 100644 --- a/src/nxt_stream_module.c +++ b/src/nxt_stream_module.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> static void nxt_stream_connection_peer(nxt_task_t *task, @@ -17,7 +17,7 @@ static void nxt_stream_connection_close(nxt_task_t *task, void *obj, void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) { - nxt_cycle_t *cycle; + nxt_runtime_t *rt; nxt_event_conn_t *c; nxt_upstream_peer_t *up; @@ -32,10 +32,10 @@ nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) up->data = c; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - if (cycle->upstream.length != 0) { - up->addr = cycle->upstream; + if (rt->upstream.length != 0) { + up->addr = rt->upstream; } else { nxt_str_set(&up->addr, "127.0.0.1:8080"); diff --git a/src/nxt_thread.h b/src/nxt_thread.h index b25bd24a..a8e04749 100644 --- a/src/nxt_thread.h +++ b/src/nxt_thread.h @@ -178,6 +178,7 @@ struct nxt_thread_s { nxt_thread_time_t time; + nxt_runtime_t *runtime; nxt_event_engine_t *engine; /* diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c index 7717a2de..480f7eb8 100644 --- a/src/nxt_upstream_round_robin.c +++ b/src/nxt_upstream_round_robin.c @@ -110,7 +110,7 @@ nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) /* STUB */ up->sockaddr = peer[0].sockaddr; - nxt_job_destroy(jbs); + nxt_job_destroy(task, jbs); up->ready_handler(task, up); //nxt_upstream_round_robin_get_peer(up); @@ -118,7 +118,7 @@ nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) fail: - nxt_job_destroy(jbs); + nxt_job_destroy(task, jbs); up->ready_handler(task, up); } diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h index 8dc9c1dd..90c344dc 100644 --- a/src/nxt_work_queue.h +++ b/src/nxt_work_queue.h @@ -28,7 +28,7 @@ struct nxt_task_s { * A work handler with just the obj and data arguments instead * of pointer to a possibly large a work struct allows to call * the handler not only via a work queue but also directly. - * The only obj argument is enough for the most cases expect the + * The only obj argument is enough for the most cases except the * source filters, so the data argument has been introduced and * is used where appropriate. */ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index a6ef8e80..59285113 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> @@ -21,7 +21,7 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -static nxt_port_handler_t nxt_worker_process_port_handlers[] = { +nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_worker_process_quit_handler, nxt_port_new_port_handler, nxt_port_change_log_file_handler, @@ -29,112 +29,29 @@ static nxt_port_handler_t nxt_worker_process_port_handlers[] = { }; -static const nxt_sig_event_t nxt_worker_process_signals[] = { +const nxt_sig_event_t nxt_worker_process_signals[] = { nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler), nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler), nxt_event_signal(SIGQUIT, nxt_worker_process_sigterm_handler), nxt_event_signal(SIGTERM, nxt_worker_process_sigquit_handler), nxt_event_signal(SIGCHLD, nxt_worker_process_signal_handler), nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler), - nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler), + nxt_event_signal(SIGUSR2, nxt_worker_process_signal_handler), nxt_event_signal_end, }; -void -nxt_worker_process_start(void *data) -{ - nxt_int_t n; - nxt_port_t *port; - nxt_cycle_t *cycle; - nxt_thread_t *thr; - const nxt_event_interface_t *interface; - - cycle = data; - - nxt_thread_init_data(nxt_thread_cycle_data); - nxt_thread_cycle_set(cycle); - - thr = nxt_thread(); - - nxt_log_error(NXT_LOG_INFO, thr->log, "worker process"); - - nxt_process_title("nginext: worker process"); - - cycle->type = NXT_PROCESS_WORKER; - - nxt_random_init(&nxt_random_data); - - if (getuid() == 0) { - /* Super-user. */ - - n = nxt_user_cred_set(&cycle->user_cred); - if (n != NXT_OK) { - goto fail; - } - } - - /* Update inherited master process event engine and signals processing. */ - thr->engine->signals->sigev = nxt_worker_process_signals; - - interface = nxt_service_get(cycle->services, "engine", cycle->engine); - if (interface == NULL) { - goto fail; - } - - if (nxt_event_engine_change(thr, &nxt_main_task, interface, cycle->batch) != NXT_OK) { - goto fail; - } - - if (nxt_cycle_listen_sockets_enable(&thr->engine->task, cycle) != NXT_OK) { - goto fail; - } - - port = cycle->ports->elts; - - /* A master process port. */ - nxt_port_read_close(&port[0]); - nxt_port_write_enable(&nxt_main_task, &port[0]); - - /* A worker process port. */ - nxt_port_create(thr, &port[cycle->current_process], - nxt_worker_process_port_handlers); - -#if (NXT_THREADS) - { - nxt_int_t ret; - - ret = nxt_cycle_thread_pool_create(thr, cycle, cycle->auxiliary_threads, - 60000 * 1000000LL); - - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } - - nxt_app_start(cycle); -#endif - - return; - -fail: - - exit(1); - nxt_unreachable(); -} - - static void nxt_worker_process_quit(nxt_task_t *task) { nxt_uint_t n; - nxt_cycle_t *cycle; nxt_queue_t *listen; + nxt_runtime_t *rt; nxt_queue_link_t *link, *next; nxt_listen_socket_t *ls; nxt_event_conn_listen_t *cls; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; nxt_debug(task, "close listen connections"); @@ -151,10 +68,10 @@ nxt_worker_process_quit(nxt_task_t *task) nxt_fd_event_close(task->thread->engine, &cls->socket); } - if (cycle->listen_sockets != NULL) { + if (rt->listen_sockets != NULL) { - ls = cycle->listen_sockets->elts; - n = cycle->listen_sockets->nelts; + ls = rt->listen_sockets->elts; + n = rt->listen_sockets->nelts; while (n != 0) { nxt_socket_close(task, ls->socket); @@ -164,10 +81,10 @@ nxt_worker_process_quit(nxt_task_t *task) n--; } - cycle->listen_sockets->nelts = 0; + rt->listen_sockets->nelts = 0; } - nxt_cycle_quit(task, cycle); + nxt_runtime_quit(task); } @@ -194,7 +111,7 @@ nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) /* A fast exit. */ - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } |