diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_cycle.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_cycle.c | 141 |
1 files changed, 69 insertions, 72 deletions
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index 8539bc05..4b6eeb40 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -18,14 +18,14 @@ static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data); -static void nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_cycle_conf_test(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle); -static void nxt_cycle_close_idle_connections(nxt_thread_t *thr); -static void nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data); -static nxt_int_t nxt_cycle_event_engine_change(nxt_thread_t *thr, +static void nxt_cycle_start(nxt_task_t *task, void *obj, void *data); +static void nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle); +static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle); +static void nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task); +static void nxt_cycle_exit(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_cycle_event_engine_change(nxt_thread_t *thr, + nxt_task_t *task, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_conf_init(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_sockaddr_t *nxt_cycle_sockaddr_parse(nxt_str_t *addr, @@ -36,7 +36,8 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet6_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, nxt_log_t *log); static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, nxt_log_t *log); -static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle); +static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, + nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle); @@ -49,7 +50,7 @@ static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone); #if (NXT_THREADS) static void nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, - nxt_cycle_t *cycle, nxt_cycle_cont_t cont); + nxt_task_t *task, nxt_cycle_t *cycle, nxt_cycle_cont_t cont); #endif @@ -57,8 +58,8 @@ nxt_thread_declare_data(nxt_cycle_t *, nxt_thread_cycle_data); nxt_int_t -nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous, - nxt_cycle_cont_t start, nxt_str_t *config_name, nxt_bool_t test_config) +nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous, + nxt_cycle_cont_t start, nxt_str_t *config_name) { nxt_int_t ret; nxt_cycle_t *cycle; @@ -78,7 +79,6 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous, cycle->mem_pool = mp; cycle->previous = previous; cycle->config_name = config_name; - cycle->test_config = test_config; if (previous == NULL) { cycle->prefix = nxt_current_directory(mp); @@ -148,14 +148,13 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous, nxt_thread_init_data(nxt_thread_cycle_data); nxt_thread_cycle_set(cycle); - cycle->start = test_config ? nxt_cycle_conf_test: - nxt_cycle_initial_start; + cycle->start = nxt_cycle_initial_start; } nxt_log_debug(thr->log, "new cycle: %p", cycle); nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start, - cycle, NULL, &nxt_main_log); + task, cycle, NULL); return NXT_OK; @@ -410,31 +409,31 @@ nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle) static void -nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data) +nxt_cycle_start(nxt_task_t *task, void *obj, void *data) { nxt_uint_t i; nxt_cycle_t *cycle; cycle = obj; - nxt_log_debug(thr->log, "cycle conf done"); + nxt_debug(task, "cycle conf done"); - nxt_mem_pool_debug_lock(cycle->mem_pool, nxt_thread_tid(thr)); + nxt_mem_pool_debug_lock(cycle->mem_pool, nxt_thread_tid(task->thread)); - thr->log->ctx_handler = NULL; - thr->log->ctx = NULL; + task->thread->log->ctx_handler = NULL; + task->thread->log->ctx = NULL; - if (nxt_cycle_conf_init(thr, cycle) != NXT_OK) { + if (nxt_cycle_conf_init(task->thread, cycle) != NXT_OK) { goto fail; } for (i = 0; i < nxt_init_modules_n; i++) { - if (nxt_init_modules[i](thr, cycle) != NXT_OK) { + if (nxt_init_modules[i](task->thread, cycle) != NXT_OK) { goto fail; } } - if (nxt_cycle_conf_apply(thr, cycle) != NXT_OK) { + if (nxt_cycle_conf_apply(task->thread, task, cycle) != NXT_OK) { goto fail; } @@ -447,11 +446,11 @@ nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data) * processes, because thread pool semaphores will stick in * locked state in new processes after fork(). */ - nxt_cycle_thread_pool_destroy(thr, cycle, cycle->start); + nxt_cycle_thread_pool_destroy(task->thread, task, cycle, cycle->start); #else - cycle->start(thr, cycle); + cycle->start(task->thread, cycle); #endif @@ -459,16 +458,19 @@ nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data) fail: - nxt_cycle_quit(thr, cycle); + nxt_cycle_quit(task, cycle); } static void -nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_int_t ret; + nxt_thread_t *thr; const nxt_event_set_ops_t *event_set; + thr = task->thread; + if (cycle->inherited_sockets == NULL && cycle->daemon) { if (nxt_process_daemon() != NXT_OK) { @@ -486,7 +488,7 @@ nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle) goto fail; } - ret = nxt_event_engine_change(thr, event_set, cycle->batch); + ret = nxt_event_engine_change(thr, task, event_set, cycle->batch); if (ret != NXT_OK) { goto fail; } @@ -497,39 +499,31 @@ nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle) goto fail; } - if (nxt_cycle_event_engine_change(thr, cycle) != NXT_OK) { + if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) { goto fail; } thr->engine->max_connections = cycle->engine_connections; if (cycle->master_process) { - if (nxt_master_process_start(thr, cycle) != NXT_ERROR) { + if (nxt_master_process_start(thr, task, cycle) != NXT_ERROR) { return; } } else { - nxt_single_process_start(thr, cycle); + nxt_single_process_start(thr, task, cycle); return; } fail: - nxt_cycle_quit(thr, cycle); + nxt_cycle_quit(task, cycle); } static void -nxt_cycle_conf_test(nxt_thread_t *thr, nxt_cycle_t *cycle) -{ - (void) nxt_cycle_pid_file_create(cycle->pid_file, cycle->test_config); - - nxt_cycle_quit(thr, cycle); -} - - -static void -nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, + nxt_cycle_t *cycle) { #if (NXT_THREADS) nxt_int_t ret; @@ -538,7 +532,7 @@ nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle) 60000 * 1000000LL); if (nxt_slow_path(ret != NXT_OK)) { - nxt_cycle_quit(thr, cycle); + nxt_cycle_quit(task, cycle); return; } @@ -546,18 +540,21 @@ nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle) cycle->type = NXT_PROCESS_SINGLE; - nxt_cycle_listen_sockets_enable(thr, cycle); + nxt_cycle_listen_sockets_enable(task, cycle); return; } void -nxt_cycle_quit(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle) { - nxt_bool_t done; + nxt_bool_t done; + nxt_thread_t *thr; + + thr = task->thread; - nxt_log_debug(thr->log, "exiting"); + nxt_debug(task, "exiting"); if (cycle == NULL) { cycle = nxt_thread_cycle(); @@ -571,29 +568,29 @@ nxt_cycle_quit(nxt_thread_t *thr, nxt_cycle_t *cycle) #if (NXT_THREADS) if (!nxt_array_is_empty(cycle->thread_pools)) { - nxt_cycle_thread_pool_destroy(thr, cycle, nxt_cycle_quit); + nxt_cycle_thread_pool_destroy(thr, task, cycle, nxt_cycle_quit); done = 0; } #endif if (!cycle->test_config && cycle->type == NXT_PROCESS_MASTER) { - nxt_master_stop_worker_processes(cycle); + nxt_master_stop_worker_processes(task, cycle); done = 0; } } - nxt_cycle_close_idle_connections(thr); + nxt_cycle_close_idle_connections(thr, task); if (done) { nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit, - cycle, NULL, &nxt_main_log); + task, cycle, NULL); } } static void -nxt_cycle_close_idle_connections(nxt_thread_t *thr) +nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task) { nxt_queue_t *idle; nxt_queue_link_t *link, *next; @@ -612,14 +609,14 @@ nxt_cycle_close_idle_connections(nxt_thread_t *thr) if (!c->socket.read_ready) { nxt_queue_remove(link); - nxt_event_conn_close(thr, c); + nxt_event_conn_close(task, c); } } } static void -nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data) +nxt_cycle_exit(nxt_task_t *task, void *obj, void *data) { nxt_cycle_t *cycle; @@ -627,7 +624,7 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data) #if (NXT_THREADS) - nxt_log_debug(thr->log, "thread pools: %d", cycle->thread_pools->nelts); + nxt_debug(task, "thread pools: %d", cycle->thread_pools->nelts); if (!nxt_array_is_empty(cycle->thread_pools)) { return; @@ -639,11 +636,11 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data) nxt_cycle_pid_file_delete(cycle); } - if (!thr->engine->event->signal_support) { - nxt_event_engine_signals_stop(thr->engine); + if (!task->thread->engine->event->signal_support) { + nxt_event_engine_signals_stop(task->thread->engine); } - nxt_log_debug(thr->log, "exit"); + nxt_debug(task, "exit"); exit(0); nxt_unreachable(); @@ -651,7 +648,8 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data) static nxt_int_t -nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, + nxt_cycle_t *cycle) { const nxt_event_set_ops_t *event_set; @@ -663,7 +661,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_cycle_t *cycle) event_set = nxt_service_get(cycle->services, "engine", cycle->engine); if (event_set != NULL) { - return nxt_event_engine_change(thr, event_set, cycle->batch); + return nxt_event_engine_change(thr, task, event_set, cycle->batch); } return NXT_ERROR; @@ -686,8 +684,7 @@ nxt_cycle_event_engine_free(nxt_cycle_t *cycle) #if (NXT_THREADS) static void nxt_cycle_thread_pool_init(void); -static void nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, - void *data); +static void nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data); nxt_int_t @@ -715,8 +712,8 @@ nxt_cycle_thread_pool_create(nxt_thread_t *thr, nxt_cycle_t *cycle, static void -nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_cycle_t *cycle, - nxt_cycle_cont_t cont) +nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_task_t *task, + nxt_cycle_t *cycle, nxt_cycle_cont_t cont) { nxt_uint_t n; nxt_thread_pool_t **tp; @@ -726,7 +723,7 @@ nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_cycle_t *cycle, n = cycle->thread_pools->nelts; if (n == 0) { - cont(thr, cycle); + cont(task, cycle); return; } @@ -751,7 +748,7 @@ nxt_cycle_thread_pool_init(void) static void -nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) +nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data) { nxt_uint_t i, n; nxt_cycle_t *cycle; @@ -770,7 +767,7 @@ nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) thread_pools = cycle->thread_pools->elts; n = cycle->thread_pools->nelts; - nxt_log_debug(thr->log, "thread pools: %ui, cycle %p", n, cycle); + nxt_debug(task, "thread pools: %ui, cycle %p", n, cycle); for (i = 0; i < n; i++) { @@ -779,7 +776,7 @@ nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) if (n == 1) { /* The last thread pool. */ - cycle->continuation(thr, cycle); + cycle->continuation(task, cycle); } return; @@ -1239,7 +1236,7 @@ invalid_addr: static nxt_int_t -nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) { if (nxt_cycle_log_files_create(cycle) != NXT_OK) { return NXT_ERROR; @@ -1249,7 +1246,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle) return NXT_ERROR; } - if (nxt_cycle_event_engine_change(thr, cycle) != NXT_OK) { + if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) { return NXT_ERROR; } @@ -1571,7 +1568,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle) nxt_int_t -nxt_cycle_listen_sockets_enable(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_uint_t i, n; nxt_listen_socket_t *ls; @@ -1580,7 +1577,7 @@ nxt_cycle_listen_sockets_enable(nxt_thread_t *thr, nxt_cycle_t *cycle) n = cycle->listen_sockets->nelts; for (i = 0; i < n; i++) { - if (nxt_event_conn_listen(thr, &ls[i]) != NXT_OK) { + if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { return NXT_ERROR; } } |