summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_cycle.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_cycle.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_cycle.c')
-rw-r--r--src/nxt_cycle.c141
1 files changed, 69 insertions, 72 deletions
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index 8539bc05..4b6eeb40 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -18,14 +18,14 @@ static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr,
static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle);
-static void nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle);
-static void nxt_cycle_conf_test(nxt_thread_t *thr, nxt_cycle_t *cycle);
-static void nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle);
-static void nxt_cycle_close_idle_connections(nxt_thread_t *thr);
-static void nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data);
-static nxt_int_t nxt_cycle_event_engine_change(nxt_thread_t *thr,
+static void nxt_cycle_start(nxt_task_t *task, void *obj, void *data);
+static void nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle);
+static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *cycle);
+static void nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task);
+static void nxt_cycle_exit(nxt_task_t *task, void *obj, void *data);
+static nxt_int_t nxt_cycle_event_engine_change(nxt_thread_t *thr,
+ nxt_task_t *task, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_conf_init(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_sockaddr_t *nxt_cycle_sockaddr_parse(nxt_str_t *addr,
@@ -36,7 +36,8 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet6_parse(nxt_str_t *addr,
nxt_mem_pool_t *mp, nxt_log_t *log);
static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr,
nxt_mem_pool_t *mp, nxt_log_t *log);
-static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle);
+static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task,
+ nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle);
@@ -49,7 +50,7 @@ static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone);
#if (NXT_THREADS)
static void nxt_cycle_thread_pool_destroy(nxt_thread_t *thr,
- nxt_cycle_t *cycle, nxt_cycle_cont_t cont);
+ nxt_task_t *task, nxt_cycle_t *cycle, nxt_cycle_cont_t cont);
#endif
@@ -57,8 +58,8 @@ nxt_thread_declare_data(nxt_cycle_t *, nxt_thread_cycle_data);
nxt_int_t
-nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous,
- nxt_cycle_cont_t start, nxt_str_t *config_name, nxt_bool_t test_config)
+nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,
+ nxt_cycle_cont_t start, nxt_str_t *config_name)
{
nxt_int_t ret;
nxt_cycle_t *cycle;
@@ -78,7 +79,6 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous,
cycle->mem_pool = mp;
cycle->previous = previous;
cycle->config_name = config_name;
- cycle->test_config = test_config;
if (previous == NULL) {
cycle->prefix = nxt_current_directory(mp);
@@ -148,14 +148,13 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_cycle_t *previous,
nxt_thread_init_data(nxt_thread_cycle_data);
nxt_thread_cycle_set(cycle);
- cycle->start = test_config ? nxt_cycle_conf_test:
- nxt_cycle_initial_start;
+ cycle->start = nxt_cycle_initial_start;
}
nxt_log_debug(thr->log, "new cycle: %p", cycle);
nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start,
- cycle, NULL, &nxt_main_log);
+ task, cycle, NULL);
return NXT_OK;
@@ -410,31 +409,31 @@ nxt_cycle_thread_pools(nxt_thread_t *thr, nxt_cycle_t *cycle)
static void
-nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data)
+nxt_cycle_start(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t i;
nxt_cycle_t *cycle;
cycle = obj;
- nxt_log_debug(thr->log, "cycle conf done");
+ nxt_debug(task, "cycle conf done");
- nxt_mem_pool_debug_lock(cycle->mem_pool, nxt_thread_tid(thr));
+ nxt_mem_pool_debug_lock(cycle->mem_pool, nxt_thread_tid(task->thread));
- thr->log->ctx_handler = NULL;
- thr->log->ctx = NULL;
+ task->thread->log->ctx_handler = NULL;
+ task->thread->log->ctx = NULL;
- if (nxt_cycle_conf_init(thr, cycle) != NXT_OK) {
+ if (nxt_cycle_conf_init(task->thread, cycle) != NXT_OK) {
goto fail;
}
for (i = 0; i < nxt_init_modules_n; i++) {
- if (nxt_init_modules[i](thr, cycle) != NXT_OK) {
+ if (nxt_init_modules[i](task->thread, cycle) != NXT_OK) {
goto fail;
}
}
- if (nxt_cycle_conf_apply(thr, cycle) != NXT_OK) {
+ if (nxt_cycle_conf_apply(task->thread, task, cycle) != NXT_OK) {
goto fail;
}
@@ -447,11 +446,11 @@ nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data)
* processes, because thread pool semaphores will stick in
* locked state in new processes after fork().
*/
- nxt_cycle_thread_pool_destroy(thr, cycle, cycle->start);
+ nxt_cycle_thread_pool_destroy(task->thread, task, cycle, cycle->start);
#else
- cycle->start(thr, cycle);
+ cycle->start(task->thread, cycle);
#endif
@@ -459,16 +458,19 @@ nxt_cycle_start(nxt_thread_t *thr, void *obj, void *data)
fail:
- nxt_cycle_quit(thr, cycle);
+ nxt_cycle_quit(task, cycle);
}
static void
-nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_initial_start(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_int_t ret;
+ nxt_thread_t *thr;
const nxt_event_set_ops_t *event_set;
+ thr = task->thread;
+
if (cycle->inherited_sockets == NULL && cycle->daemon) {
if (nxt_process_daemon() != NXT_OK) {
@@ -486,7 +488,7 @@ nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
goto fail;
}
- ret = nxt_event_engine_change(thr, event_set, cycle->batch);
+ ret = nxt_event_engine_change(thr, task, event_set, cycle->batch);
if (ret != NXT_OK) {
goto fail;
}
@@ -497,39 +499,31 @@ nxt_cycle_initial_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
goto fail;
}
- if (nxt_cycle_event_engine_change(thr, cycle) != NXT_OK) {
+ if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) {
goto fail;
}
thr->engine->max_connections = cycle->engine_connections;
if (cycle->master_process) {
- if (nxt_master_process_start(thr, cycle) != NXT_ERROR) {
+ if (nxt_master_process_start(thr, task, cycle) != NXT_ERROR) {
return;
}
} else {
- nxt_single_process_start(thr, cycle);
+ nxt_single_process_start(thr, task, cycle);
return;
}
fail:
- nxt_cycle_quit(thr, cycle);
+ nxt_cycle_quit(task, cycle);
}
static void
-nxt_cycle_conf_test(nxt_thread_t *thr, nxt_cycle_t *cycle)
-{
- (void) nxt_cycle_pid_file_create(cycle->pid_file, cycle->test_config);
-
- nxt_cycle_quit(thr, cycle);
-}
-
-
-static void
-nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
+ nxt_cycle_t *cycle)
{
#if (NXT_THREADS)
nxt_int_t ret;
@@ -538,7 +532,7 @@ nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
60000 * 1000000LL);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_cycle_quit(thr, cycle);
+ nxt_cycle_quit(task, cycle);
return;
}
@@ -546,18 +540,21 @@ nxt_single_process_start(nxt_thread_t *thr, nxt_cycle_t *cycle)
cycle->type = NXT_PROCESS_SINGLE;
- nxt_cycle_listen_sockets_enable(thr, cycle);
+ nxt_cycle_listen_sockets_enable(task, cycle);
return;
}
void
-nxt_cycle_quit(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle)
{
- nxt_bool_t done;
+ nxt_bool_t done;
+ nxt_thread_t *thr;
+
+ thr = task->thread;
- nxt_log_debug(thr->log, "exiting");
+ nxt_debug(task, "exiting");
if (cycle == NULL) {
cycle = nxt_thread_cycle();
@@ -571,29 +568,29 @@ nxt_cycle_quit(nxt_thread_t *thr, nxt_cycle_t *cycle)
#if (NXT_THREADS)
if (!nxt_array_is_empty(cycle->thread_pools)) {
- nxt_cycle_thread_pool_destroy(thr, cycle, nxt_cycle_quit);
+ nxt_cycle_thread_pool_destroy(thr, task, cycle, nxt_cycle_quit);
done = 0;
}
#endif
if (!cycle->test_config && cycle->type == NXT_PROCESS_MASTER) {
- nxt_master_stop_worker_processes(cycle);
+ nxt_master_stop_worker_processes(task, cycle);
done = 0;
}
}
- nxt_cycle_close_idle_connections(thr);
+ nxt_cycle_close_idle_connections(thr, task);
if (done) {
nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit,
- cycle, NULL, &nxt_main_log);
+ task, cycle, NULL);
}
}
static void
-nxt_cycle_close_idle_connections(nxt_thread_t *thr)
+nxt_cycle_close_idle_connections(nxt_thread_t *thr, nxt_task_t *task)
{
nxt_queue_t *idle;
nxt_queue_link_t *link, *next;
@@ -612,14 +609,14 @@ nxt_cycle_close_idle_connections(nxt_thread_t *thr)
if (!c->socket.read_ready) {
nxt_queue_remove(link);
- nxt_event_conn_close(thr, c);
+ nxt_event_conn_close(task, c);
}
}
}
static void
-nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data)
+nxt_cycle_exit(nxt_task_t *task, void *obj, void *data)
{
nxt_cycle_t *cycle;
@@ -627,7 +624,7 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data)
#if (NXT_THREADS)
- nxt_log_debug(thr->log, "thread pools: %d", cycle->thread_pools->nelts);
+ nxt_debug(task, "thread pools: %d", cycle->thread_pools->nelts);
if (!nxt_array_is_empty(cycle->thread_pools)) {
return;
@@ -639,11 +636,11 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data)
nxt_cycle_pid_file_delete(cycle);
}
- if (!thr->engine->event->signal_support) {
- nxt_event_engine_signals_stop(thr->engine);
+ if (!task->thread->engine->event->signal_support) {
+ nxt_event_engine_signals_stop(task->thread->engine);
}
- nxt_log_debug(thr->log, "exit");
+ nxt_debug(task, "exit");
exit(0);
nxt_unreachable();
@@ -651,7 +648,8 @@ nxt_cycle_exit(nxt_thread_t *thr, void *obj, void *data)
static nxt_int_t
-nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
+ nxt_cycle_t *cycle)
{
const nxt_event_set_ops_t *event_set;
@@ -663,7 +661,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_cycle_t *cycle)
event_set = nxt_service_get(cycle->services, "engine", cycle->engine);
if (event_set != NULL) {
- return nxt_event_engine_change(thr, event_set, cycle->batch);
+ return nxt_event_engine_change(thr, task, event_set, cycle->batch);
}
return NXT_ERROR;
@@ -686,8 +684,7 @@ nxt_cycle_event_engine_free(nxt_cycle_t *cycle)
#if (NXT_THREADS)
static void nxt_cycle_thread_pool_init(void);
-static void nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj,
- void *data);
+static void nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
nxt_int_t
@@ -715,8 +712,8 @@ nxt_cycle_thread_pool_create(nxt_thread_t *thr, nxt_cycle_t *cycle,
static void
-nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_cycle_t *cycle,
- nxt_cycle_cont_t cont)
+nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_task_t *task,
+ nxt_cycle_t *cycle, nxt_cycle_cont_t cont)
{
nxt_uint_t n;
nxt_thread_pool_t **tp;
@@ -726,7 +723,7 @@ nxt_cycle_thread_pool_destroy(nxt_thread_t *thr, nxt_cycle_t *cycle,
n = cycle->thread_pools->nelts;
if (n == 0) {
- cont(thr, cycle);
+ cont(task, cycle);
return;
}
@@ -751,7 +748,7 @@ nxt_cycle_thread_pool_init(void)
static void
-nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
+nxt_cycle_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t i, n;
nxt_cycle_t *cycle;
@@ -770,7 +767,7 @@ nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
thread_pools = cycle->thread_pools->elts;
n = cycle->thread_pools->nelts;
- nxt_log_debug(thr->log, "thread pools: %ui, cycle %p", n, cycle);
+ nxt_debug(task, "thread pools: %ui, cycle %p", n, cycle);
for (i = 0; i < n; i++) {
@@ -779,7 +776,7 @@ nxt_cycle_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
if (n == 1) {
/* The last thread pool. */
- cycle->continuation(thr, cycle);
+ cycle->continuation(task, cycle);
}
return;
@@ -1239,7 +1236,7 @@ invalid_addr:
static nxt_int_t
-nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
{
if (nxt_cycle_log_files_create(cycle) != NXT_OK) {
return NXT_ERROR;
@@ -1249,7 +1246,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- if (nxt_cycle_event_engine_change(thr, cycle) != NXT_OK) {
+ if (nxt_cycle_event_engine_change(thr, task, cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -1571,7 +1568,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle)
nxt_int_t
-nxt_cycle_listen_sockets_enable(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_uint_t i, n;
nxt_listen_socket_t *ls;
@@ -1580,7 +1577,7 @@ nxt_cycle_listen_sockets_enable(nxt_thread_t *thr, nxt_cycle_t *cycle)
n = cycle->listen_sockets->nelts;
for (i = 0; i < n; i++) {
- if (nxt_event_conn_listen(thr, &ls[i]) != NXT_OK) {
+ if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) {
return NXT_ERROR;
}
}