diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-03-09 18:03:27 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-03-09 18:03:27 +0300 |
commit | 6f2c9acd1841ca20a1388b34aef64e9f00459090 (patch) | |
tree | c0b9c1063ec464027d1ca29a793f6c0b7a6878d5 /src/nxt_master_process.c | |
parent | 5745e4826427155e29c1d520fe77811a0f570689 (diff) | |
download | unit-6f2c9acd1841ca20a1388b34aef64e9f00459090.tar.gz unit-6f2c9acd1841ca20a1388b34aef64e9f00459090.tar.bz2 |
Processes refactoring.
The cycle has been renamed to the runtime.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_master_process.c | 480 |
1 files changed, 166 insertions, 314 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 1e613b45..12c61d16 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -5,46 +5,39 @@ */ #include <nxt_main.h> -#include <nxt_cycle.h> +#include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_master_process_title(void); + nxt_runtime_t *rt); +static void nxt_master_process_title(nxt_task_t *task); +static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task, + nxt_runtime_t *rt); +static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, + nxt_runtime_t *rt); static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, - nxt_cycle_t *cycle); + nxt_runtime_t *rt); static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, - nxt_cycle_t *cycle); -static void nxt_master_stop_previous_worker_processes(nxt_task_t *task, - void *obj, void *data); -static void nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle); + nxt_runtime_t *rt, nxt_process_init_t *init); static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, - void *data); -static char **nxt_master_process_upgrade_environment(nxt_cycle_t *cycle); -static char **nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle); static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); const nxt_sig_event_t nxt_master_process_signals[] = { - nxt_event_signal(SIGHUP, nxt_master_process_sighup_handler), nxt_event_signal(SIGINT, nxt_master_process_sigterm_handler), nxt_event_signal(SIGQUIT, nxt_master_process_sigquit_handler), nxt_event_signal(SIGTERM, nxt_master_process_sigterm_handler), nxt_event_signal(SIGCHLD, nxt_master_process_sigchld_handler), nxt_event_signal(SIGUSR1, nxt_master_process_sigusr1_handler), - nxt_event_signal(SIGUSR2, nxt_master_process_sigusr2_handler), nxt_event_signal_end, }; @@ -54,27 +47,47 @@ static nxt_bool_t nxt_exiting; nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, - nxt_cycle_t *cycle) + nxt_runtime_t *rt) { - cycle->type = NXT_PROCESS_MASTER; + nxt_int_t ret; + + rt->type = NXT_PROCESS_MASTER; - if (nxt_master_process_port_create(task, cycle) != NXT_OK) { + if (nxt_master_process_port_create(task, rt) != NXT_OK) { return NXT_ERROR; } - nxt_master_process_title(); + nxt_master_process_title(task); + + ret = nxt_master_start_controller_process(task, rt); + if (ret != NXT_OK) { + return ret; + } + + ret = nxt_master_start_router_process(task, rt); + if (ret != NXT_OK) { + return ret; + } - return nxt_master_start_worker_processes(task, cycle); + return nxt_master_start_worker_processes(task, rt); } static nxt_int_t -nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - nxt_port_t *port; + nxt_int_t ret; + nxt_port_t *port; + nxt_process_t *process; - port = nxt_array_zero_add(cycle->ports); + process = nxt_runtime_new_process(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->pid = nxt_pid; + + port = nxt_array_zero_add(process->ports); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -98,7 +111,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) static void -nxt_master_process_title(void) +nxt_master_process_title(nxt_task_t *task) { u_char *p, *end; nxt_uint_t i; @@ -115,22 +128,75 @@ nxt_master_process_title(void) *p = '\0'; - nxt_process_title((char *) title); + nxt_process_title(task, (char *) title); } static nxt_int_t -nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - nxt_uint_t n; + nxt_process_init_t *init; - cycle->process_generation++; + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } - n = cycle->worker_processes; + init->start = nxt_controller_start; + init->name = "controller process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_CONTROLLER; + + return nxt_master_create_worker_process(task, rt, init); +} + + +static nxt_int_t +nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_process_init_t *init; + + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } + + init->start = nxt_router_start; + init->name = "router process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_ROUTER; + + return nxt_master_create_worker_process(task, rt, init); +} + + +static nxt_int_t +nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + nxt_uint_t n; + nxt_process_init_t *init; + + init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } + + init->start = nxt_app_start; + init->name = "worker process"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_worker_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_WORKER; + + n = rt->worker_processes; while (n-- != 0) { - ret = nxt_master_create_worker_process(task, cycle); + ret = nxt_master_create_worker_process(task, rt, init); if (ret != NXT_OK) { return ret; @@ -142,18 +208,34 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) static nxt_int_t -nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) +nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init) { - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; + nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; + nxt_process_t *process, *master_process; - port = nxt_array_zero_add(cycle->ports); + /* + * TODO: remove process, init, ports from array on memory and fork failures. + */ + + process = nxt_runtime_new_process(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->init = init; + + master_process = rt->processes->elts; + init->master_port = master_process->ports->elts; + + port = nxt_array_zero_add(process->ports); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } - cycle->current_process = cycle->ports->nelts - 1; + init->port = port; ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { @@ -161,10 +243,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) } port->engine = 0; - port->generation = cycle->process_generation; - pid = nxt_process_create(nxt_worker_process_start, cycle, - "start worker process"); + pid = nxt_process_create(task, init); switch (pid) { @@ -177,110 +257,42 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) default: /* The master process created a new process. */ + process->pid = pid; port->pid = pid; nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_send_new_port(task, cycle, port); + nxt_port_send_new_port(task, rt, port); return NXT_OK; } } -static void -nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_cycle_t *cycle; - - cycle = nxt_thread_cycle(); - - nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", - (int) (uintptr_t) obj, data, - cycle->reconfiguring ? "ignored" : "reconfiguring"); - - if (!cycle->reconfiguring) { - (void) nxt_cycle_create(task->thread, task, cycle, - nxt_master_process_new_cycle); - } -} - - -static void -nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_thread_t *thr; - - thr = task->thread; - - nxt_debug(task, "new cycle"); - - /* A safe place to free the previous cycle. */ - nxt_mem_pool_destroy(cycle->previous->mem_pool); - - switch (nxt_master_start_worker_processes(task, cycle)) { - - case NXT_OK: - /* - * The master process, allow old worker processes to accept new - * connections yet 500ms in parallel with new worker processes. - */ - cycle->timer.handler = nxt_master_stop_previous_worker_processes; - cycle->timer.log = &nxt_main_log; - - cycle->timer.work_queue = &thr->engine->fast_work_queue; - - nxt_timer_add(thr->engine, &cycle->timer, 500); - - return; - - case NXT_ERROR: - /* - * The master process, one or more new worker processes - * could not be created, there is no fallback. - */ - return; - - default: /* NXT_AGAIN */ - /* A worker process, return to the event engine work queue loop. */ - return; - } -} - - -static void -nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, - void *data) +void +nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) { - uint32_t generation; - nxt_uint_t i, n; - nxt_port_t *port; - nxt_cycle_t *cycle; + nxt_uint_t i, n, nprocesses, nports; + nxt_port_t *port; + nxt_process_t *process; - cycle = nxt_thread_cycle(); + process = rt->processes->elts; + nprocesses = rt->processes->nelts; - port = cycle->ports->elts; - n = cycle->ports->nelts; + for (i = 0; i < nprocesses; i++) { - generation = cycle->process_generation - 1; + if (nxt_pid != process[i].pid) { + process[i].init = NULL; - /* The port[0] is the master process port. */ + port = process[i].ports->elts; + nports = process[i].ports->nelts; - for (i = 1; i < n; i++) { - if (port[i].generation == generation) { - (void) nxt_port_socket_write(task, &port[i], - NXT_PORT_MSG_QUIT, -1, 0, NULL); + for (n = 0; n < nports; n++) { + (void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT, + -1, 0, NULL); + } } } - - cycle->reconfiguring = 0; -} - - -void -nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) -{ - nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL); } @@ -295,7 +307,7 @@ nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) nxt_exiting = 1; - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } @@ -309,7 +321,7 @@ nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) nxt_exiting = 1; - nxt_cycle_quit(task, NULL); + nxt_runtime_quit(task); } @@ -319,7 +331,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_uint_t n; nxt_file_t *file, *new_file; - nxt_cycle_t *cycle; + nxt_runtime_t *rt; nxt_array_t *new_files; nxt_mem_pool_t *mp; @@ -331,9 +343,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) return; } - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - n = nxt_list_nelts(cycle->log_files); + n = nxt_list_nelts(rt->log_files); new_files = nxt_array_create(mp, n, sizeof(nxt_file_t)); if (new_files == NULL) { @@ -341,7 +353,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) return; } - nxt_list_each(file, cycle->log_files) { + nxt_list_each(file, rt->log_files) { /* This allocation cannot fail. */ new_file = nxt_array_add(new_files); @@ -350,7 +362,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) new_file->fd = NXT_FILE_INVALID; new_file->log_level = NXT_LOG_CRIT; - ret = nxt_file_open(new_file, NXT_FILE_APPEND, NXT_FILE_CREATE_OR_OPEN, + ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT, NXT_FILE_OWNER_ACCESS); if (ret != NXT_OK) { @@ -366,9 +378,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) if (ret == NXT_OK) { n = 0; - nxt_list_each(file, cycle->log_files) { + nxt_list_each(file, rt->log_files) { - nxt_port_change_log_file(task, cycle, n, new_file[n].fd); + nxt_port_change_log_file(task, rt, n, new_file[n].fd); /* * The old log file descriptor must be closed at the moment * when no other threads use it. dup2() allows to use the @@ -392,7 +404,7 @@ fail: while (n != 0) { if (new_file->fd != NXT_FILE_INVALID) { - nxt_file_close(new_file); + nxt_file_close(task, new_file); } new_file++; @@ -404,164 +416,6 @@ fail: static void -nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, void *data) -{ - char **env; - nxt_int_t ret; - nxt_pid_t pid, ppid; - nxt_bool_t ignore; - nxt_cycle_t *cycle; - - cycle = nxt_thread_cycle(); - - /* Is upgrade or reconfiguring in progress? */ - ignore = (cycle->new_binary != 0) || cycle->reconfiguring; - - ppid = getppid(); - - if (ppid == nxt_ppid && ppid != 1) { - /* - * Ignore the upgrade signal in a new master process if an old - * master process is still running. After the old process's exit - * getppid() will return 1 (init process pid) or pid of zsched (zone - * scheduler) if the processes run in Solaris zone. There is little - * race condition between the parent process exit and getting getppid() - * for the very start of the new master process execution, so init or - * zsched pid may be stored in nxt_ppid. For this reason pid 1 is - * tested explicitly. There is no workaround for this race condition - * in Solaris zons. To eliminate this race condition in Solaris - * zone the old master process should be quit only when both - * "nginext.pid.oldbin" (created by the old master process) and - * "nginext.pid" (created by the new master process) files exists. - */ - ignore = 1; - } - - nxt_log(task, NXT_LOG_NOTICE, - "signal %d (%s) recevied, %s, parent pid: %PI", - (int) (uintptr_t) obj, data, - ignore ? "ignored" : "online binary file upgrade", ppid); - - if (ignore) { - return; - } - - env = nxt_master_process_upgrade_environment(cycle); - if (nxt_slow_path(env == NULL)) { - return; - } - - cycle->new_binary = -1; - - ret = nxt_cycle_pid_file_create(cycle->oldbin_file, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - pid = nxt_process_execute(nxt_process_argv[0], nxt_process_argv, env); - - if (pid == -1) { - cycle->new_binary = 0; - (void) nxt_file_delete(cycle->oldbin_file); - - } else { - cycle->new_binary = pid; - } - -fail: - - /* Zero slot is NGINX variable slot, all other slots must not be free()d. */ - nxt_free(env[0]); - nxt_free(env); -} - - -static char ** -nxt_master_process_upgrade_environment(nxt_cycle_t *cycle) -{ - size_t len; - char **env; - u_char *p, *end; - nxt_uint_t n; - nxt_listen_socket_t *ls; - - env = nxt_master_process_upgrade_environment_create(cycle); - if (nxt_slow_path(env == NULL)) { - return NULL; - } - - ls = cycle->listen_sockets->elts; - n = cycle->listen_sockets->nelts; - - len = sizeof("NGINX=") + n * (NXT_INT_T_LEN + 1); - - p = nxt_malloc(len); - - if (nxt_slow_path(p == NULL)) { - nxt_free(env); - return NULL; - } - - env[0] = (char *) p; - end = p + len; - - p = nxt_cpymem(p, "NGINX=", sizeof("NGINX=") - 1); - - do { - p = nxt_sprintf(p, end, "%ud;", ls->socket); - - ls++; - n--; - } while (n != 0); - - *p = '\0'; - - return env; -} - - -static char ** -nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle) -{ - char **env; - nxt_uint_t n; - - /* 2 is for "NGINX" variable and the last NULL slot. */ - n = 2; - -#if (NXT_SETPROCTITLE_ARGV) - n++; -#endif - - env = nxt_malloc(n * sizeof(char *)); - if (nxt_slow_path(env == NULL)) { - return NULL; - } - - /* Zero slot is reserved for "NGINX" variable. */ - n = 1; - - /* TODO: copy env values */ - -#if (NXT_SETPROCTITLE_ARGV) - - /* 300 spare bytes for new process title. */ - env[n++] = (char *) - "SPARE=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; - -#endif - - env[n] = NULL; - - return env; -} - - -static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { int status; @@ -619,38 +473,36 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_uint_t i, n, generation; - nxt_port_t *port; - nxt_cycle_t *cycle; + nxt_uint_t i, n; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *init; - cycle = nxt_thread_cycle(); + rt = task->thread->runtime; - if (cycle->new_binary == pid) { - cycle->new_binary = 0; + process = rt->processes->elts; + n = rt->processes->nelts; - (void) nxt_file_rename(cycle->oldbin_file, cycle->pid_file); - return; - } + /* A process[0] is the master process. */ - port = cycle->ports->elts; - n = cycle->ports->nelts; + for (i = 1; i < n; i++) { - for (i = 0; i < n; i++) { + if (pid == process[i].pid) { + init = process[i].init; - if (pid == port[i].pid) { - generation = port[i].generation; + /* TODO: free ports fds. */ - nxt_array_remove(cycle->ports, &port[i]); + nxt_array_remove(rt->processes, &process[i]); if (nxt_exiting) { nxt_debug(task, "processes %d", n); if (n == 2) { - nxt_cycle_quit(task, cycle); + nxt_runtime_quit(task); } - } else if (generation == cycle->process_generation) { - (void) nxt_master_create_worker_process(task, cycle); + } else if (init != NULL) { + (void) nxt_master_create_worker_process(task, rt, init); } return; |