From de532922d9ab42aa15b40d47c8db53ac2af38500 Mon Sep 17 00:00:00 2001 From: Igor Sysoev Date: Mon, 23 Jan 2017 19:56:03 +0300 Subject: Introducing tasks. --- src/nxt_master_process.c | 149 +++++++++++++++++++++++++---------------------- 1 file changed, 79 insertions(+), 70 deletions(-) (limited to 'src/nxt_master_process.c') diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index df2934c0..8ce6e670 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -10,28 +10,31 @@ #include -static nxt_int_t nxt_master_process_chan_create(nxt_cycle_t *cycle); +static nxt_int_t nxt_master_process_chan_create(nxt_task_t *task, + nxt_cycle_t *cycle); static void nxt_master_process_title(void); -static nxt_int_t nxt_master_start_worker_processes(nxt_cycle_t *cycle); -static nxt_int_t nxt_master_create_worker_process(nxt_cycle_t *cycle); -static void nxt_master_stop_previous_worker_processes(nxt_thread_t *thr, +static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, + nxt_cycle_t *cycle); +static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, + nxt_cycle_t *cycle); +static void nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sighup_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_new_cycle(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_master_process_sigterm_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle); +static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sigquit_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sigusr1_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_process_sigusr2_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, void *data); static char **nxt_master_process_upgrade_environment(nxt_cycle_t *cycle); static char **nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle); -static void nxt_master_process_sigchld_handler(nxt_thread_t *thr, void *obj, +static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_master_cleanup_worker_process(nxt_thread_t *thr, nxt_pid_t pid); +static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); const nxt_event_sig_t nxt_master_process_signals[] = { @@ -50,22 +53,23 @@ static nxt_bool_t nxt_exiting; nxt_int_t -nxt_master_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, + nxt_cycle_t *cycle) { cycle->type = NXT_PROCESS_MASTER; - if (nxt_master_process_chan_create(cycle) != NXT_OK) { + if (nxt_master_process_chan_create(task, cycle) != NXT_OK) { return NXT_ERROR; } nxt_master_process_title(); - return nxt_master_start_worker_processes(cycle); + return nxt_master_start_worker_processes(task, cycle); } static nxt_int_t -nxt_master_process_chan_create(nxt_cycle_t *cycle) +nxt_master_process_chan_create(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_process_chan_t *proc; @@ -86,7 +90,7 @@ nxt_master_process_chan_create(nxt_cycle_t *cycle) * A master process chan. A write chan is not closed * since it should be inherited by worker processes. */ - nxt_chan_read_enable(nxt_thread(), proc->chan); + nxt_chan_read_enable(task, proc->chan); return NXT_OK; } @@ -115,7 +119,7 @@ nxt_master_process_title(void) static nxt_int_t -nxt_master_start_worker_processes(nxt_cycle_t *cycle) +nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_int_t ret; nxt_uint_t n; @@ -125,7 +129,7 @@ nxt_master_start_worker_processes(nxt_cycle_t *cycle) n = cycle->worker_processes; while (n-- != 0) { - ret = nxt_master_create_worker_process(cycle); + ret = nxt_master_create_worker_process(task, cycle); if (ret != NXT_OK) { return ret; @@ -137,7 +141,7 @@ nxt_master_start_worker_processes(nxt_cycle_t *cycle) static nxt_int_t -nxt_master_create_worker_process(nxt_cycle_t *cycle) +nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_pid_t pid; nxt_process_chan_t *proc; @@ -174,41 +178,46 @@ nxt_master_create_worker_process(nxt_cycle_t *cycle) proc->pid = pid; nxt_chan_read_close(proc->chan); - nxt_chan_write_enable(nxt_thread(), proc->chan); + nxt_chan_write_enable(task, proc->chan); - nxt_process_new_chan(cycle, proc); + nxt_process_new_chan(task, cycle, proc); return NXT_OK; } } static void -nxt_master_process_sighup_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, void *data) { nxt_cycle_t *cycle; cycle = nxt_thread_cycle(); - nxt_log_error(NXT_LOG_NOTICE, thr->log, "signal %d (%s) recevied, %s", - (int) (uintptr_t) obj, data, - cycle->reconfiguring ? "ignored" : "reconfiguring"); + nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", + (int) (uintptr_t) obj, data, + cycle->reconfiguring ? "ignored" : "reconfiguring"); if (!cycle->reconfiguring) { - (void) nxt_cycle_create(thr, cycle, nxt_master_process_new_cycle, - cycle->config_name, 0); + (void) nxt_cycle_create(task->thread, task, cycle, + nxt_master_process_new_cycle, + cycle->config_name); } } static void -nxt_master_process_new_cycle(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_log_debug(thr->log, "new cycle"); + nxt_thread_t *thr; + + thr = task->thread; + + nxt_debug(task, "new cycle"); /* A safe place to free the previous cycle. */ nxt_mem_pool_destroy(cycle->previous->mem_pool); - switch (nxt_master_start_worker_processes(cycle)) { + switch (nxt_master_start_worker_processes(task, cycle)) { case NXT_OK: /* @@ -240,7 +249,7 @@ nxt_master_process_new_cycle(nxt_thread_t *thr, nxt_cycle_t *cycle) static void -nxt_master_stop_previous_worker_processes(nxt_thread_t *thr, void *obj, +nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj, void *data) { uint32_t generation; @@ -259,7 +268,8 @@ nxt_master_stop_previous_worker_processes(nxt_thread_t *thr, void *obj, for (i = 1; i < n; i++) { if (proc[i].generation == generation) { - (void) nxt_chan_write(proc[i].chan, NXT_CHAN_MSG_QUIT, -1, 0, NULL); + (void) nxt_chan_write(task, proc[i].chan, NXT_CHAN_MSG_QUIT, + -1, 0, NULL); } } @@ -268,43 +278,43 @@ nxt_master_stop_previous_worker_processes(nxt_thread_t *thr, void *obj, void -nxt_master_stop_worker_processes(nxt_cycle_t *cycle) +nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_process_chan_write(cycle, NXT_CHAN_MSG_QUIT, -1, 0, NULL); + nxt_process_chan_write(task, cycle, NXT_CHAN_MSG_QUIT, -1, 0, NULL); } static void -nxt_master_process_sigterm_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { - nxt_log_debug(thr->log, "sigterm handler signo:%d (%s)", - (int) (uintptr_t) obj, data); + nxt_debug(task, "sigterm handler signo:%d (%s)", + (int) (uintptr_t) obj, data); /* TODO: fast exit. */ nxt_exiting = 1; - nxt_cycle_quit(thr, NULL); + nxt_cycle_quit(task, NULL); } static void -nxt_master_process_sigquit_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) { - nxt_log_debug(thr->log, "sigquit handler signo:%d (%s)", - (int) (uintptr_t) obj, data); + nxt_debug(task, "sigquit handler signo:%d (%s)", + (int) (uintptr_t) obj, data); /* TODO: graceful exit. */ nxt_exiting = 1; - nxt_cycle_quit(thr, NULL); + nxt_cycle_quit(task, NULL); } static void -nxt_master_process_sigusr1_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_uint_t n; @@ -313,8 +323,8 @@ nxt_master_process_sigusr1_handler(nxt_thread_t *thr, void *obj, void *data) nxt_array_t *new_files; nxt_mem_pool_t *mp; - nxt_log_error(NXT_LOG_NOTICE, thr->log, "signal %d (%s) recevied, %s", - (int) (uintptr_t) obj, data, "log files rotation"); + nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s", + (int) (uintptr_t) obj, data, "log files rotation"); mp = nxt_mem_pool_create(1024); if (mp == NULL) { @@ -358,7 +368,7 @@ nxt_master_process_sigusr1_handler(nxt_thread_t *thr, void *obj, void *data) nxt_list_each(file, cycle->log_files) { - nxt_process_chan_change_log_file(cycle, n, new_file[n].fd); + nxt_process_chan_change_log_file(task, cycle, n, new_file[n].fd); /* * The old log file descriptor must be closed at the moment * when no other threads use it. dup2() allows to use the @@ -394,7 +404,7 @@ fail: static void -nxt_master_process_sigusr2_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, void *data) { char **env; nxt_int_t ret; @@ -427,10 +437,10 @@ nxt_master_process_sigusr2_handler(nxt_thread_t *thr, void *obj, void *data) ignore = 1; } - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "signal %d (%s) recevied, %s, parent pid: %PI", - (int) (uintptr_t) obj, data, - ignore ? "ignored" : "online binary file upgrade", ppid); + nxt_log(task, NXT_LOG_NOTICE, + "signal %d (%s) recevied, %s, parent pid: %PI", + (int) (uintptr_t) obj, data, + ignore ? "ignored" : "online binary file upgrade", ppid); if (ignore) { return; @@ -552,14 +562,14 @@ nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle) static void -nxt_master_process_sigchld_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { int status; nxt_err_t err; nxt_pid_t pid; - nxt_log_debug(thr->log, "sigchld handler signo:%d (%s)", - (int) (uintptr_t) obj, data); + nxt_debug(task, "sigchld handler signo:%d (%s)", + (int) (uintptr_t) obj, data); for ( ;; ) { pid = waitpid(-1, &status, WNOHANG); @@ -575,12 +585,12 @@ nxt_master_process_sigchld_handler(nxt_thread_t *thr, void *obj, void *data) continue; default: - nxt_log_alert(thr->log, "waitpid() failed: %E", err); + nxt_log(task, NXT_LOG_CRIT, "waitpid() failed: %E", err); return; } } - nxt_log_debug(thr->log, "waitpid(): %PI", pid); + nxt_debug(task, "waitpid(): %PI", pid); if (pid == 0) { return; @@ -588,27 +598,26 @@ nxt_master_process_sigchld_handler(nxt_thread_t *thr, void *obj, void *data) if (WTERMSIG(status)) { #ifdef WCOREDUMP - nxt_log_alert(thr->log, "process %PI exited on signal %d%s", - pid, WTERMSIG(status), - WCOREDUMP(status) ? " (core dumped)" : ""); + nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d%s", + pid, WTERMSIG(status), + WCOREDUMP(status) ? " (core dumped)" : ""); #else - nxt_log_alert(thr->log, "process %PI exited on signal %d", - pid, WTERMSIG(status)); + nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d", + pid, WTERMSIG(status)); #endif } else { - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "process %PI exited with code %d", - pid, WEXITSTATUS(status)); + nxt_trace(task, "process %PI exited with code %d", + pid, WEXITSTATUS(status)); } - nxt_master_cleanup_worker_process(thr, pid); + nxt_master_cleanup_worker_process(task, pid); } } static void -nxt_master_cleanup_worker_process(nxt_thread_t *thr, nxt_pid_t pid) +nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { nxt_uint_t i, n, generation; nxt_cycle_t *cycle; @@ -634,14 +643,14 @@ nxt_master_cleanup_worker_process(nxt_thread_t *thr, nxt_pid_t pid) nxt_array_remove(cycle->processes, &proc[i]); if (nxt_exiting) { - nxt_log_debug(thr->log, "processes %d", n); + nxt_debug(task, "processes %d", n); if (n == 2) { - nxt_cycle_quit(thr, cycle); + nxt_cycle_quit(task, cycle); } } else if (generation == cycle->process_generation) { - (void) nxt_master_create_worker_process(cycle); + (void) nxt_master_create_worker_process(task, cycle); } return; -- cgit