summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_engine.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to '')
-rw-r--r--src/nxt_event_engine.c120
1 files changed, 66 insertions, 54 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index dd0f5fe3..ead63b72 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -7,21 +7,20 @@
#include <nxt_main.h>
-static nxt_int_t nxt_event_engine_post_init(nxt_thread_t *thr,
+static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
+static nxt_int_t nxt_event_engine_signal_pipe_create(
nxt_event_engine_t *engine);
-static nxt_int_t nxt_event_engine_signal_pipe_create(nxt_thread_t *thr,
- nxt_event_engine_t *engine);
-static void nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj,
+static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj,
+static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj,
+static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj,
+static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj,
+static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
void *data);
-static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_thread_t *thr,
+static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task,
nxt_uint_t signo);
@@ -37,6 +36,13 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
return NULL;
}
+ engine->task.thread = thr;
+ engine->task.log = thr->log;
+ engine->task.ident = nxt_task_next_ident();
+
+ thr->engine = engine;
+ thr->fiber = &engine->fibers->fiber;
+
engine->batch = batch;
if (flags & NXT_ENGINE_FIBERS) {
@@ -91,7 +97,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
engine->event = event_set;
- if (nxt_event_engine_post_init(thr, engine) != NXT_OK) {
+ if (nxt_event_engine_post_init(engine) != NXT_OK) {
goto post_fail;
}
@@ -107,8 +113,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_queue_init(&engine->listen_connections);
nxt_queue_init(&engine->idle_connections);
- thr->engine = engine;
- thr->fiber = &engine->fibers->fiber;
+ engine->thread = thr;
#if !(NXT_THREADS)
@@ -140,7 +145,7 @@ fibers_fail:
static nxt_int_t
-nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine)
+nxt_event_engine_post_init(nxt_event_engine_t *engine)
{
if (engine->event->enable_post != NULL) {
return engine->event->enable_post(engine->event_set,
@@ -157,7 +162,7 @@ nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine)
#endif
- if (nxt_event_engine_signal_pipe_create(thr, engine) != NXT_OK) {
+ if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
return NXT_ERROR;
}
@@ -166,8 +171,7 @@ nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine)
static nxt_int_t
-nxt_event_engine_signal_pipe_create(nxt_thread_t *thr,
- nxt_event_engine_t *engine)
+nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
{
nxt_event_engine_pipe_t *pipe;
@@ -189,9 +193,9 @@ nxt_event_engine_signal_pipe_create(nxt_thread_t *thr,
}
pipe->event.fd = pipe->fds[0];
- pipe->event.read_work_queue = &thr->work_queue.main;
+ pipe->event.read_work_queue = &engine->task.thread->work_queue.main;
pipe->event.read_handler = nxt_event_engine_signal_pipe;
- pipe->event.write_work_queue = &thr->work_queue.main;
+ pipe->event.write_work_queue = &engine->task.thread->work_queue.main;
pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
pipe->event.log = &nxt_main_log;
@@ -221,7 +225,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
static void
-nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
{
nxt_event_engine_pipe_t *pipe;
@@ -234,11 +238,11 @@ nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data)
void
nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler,
- void *obj, void *data, nxt_log_t *log)
+ nxt_task_t *task, void *obj, void *data, nxt_log_t *log)
{
nxt_thread_log_debug("event engine post");
- nxt_locked_work_queue_add(&engine->work_queue, handler, obj, data, log);
+ nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data);
nxt_event_engine_signal(engine, 0);
}
@@ -267,7 +271,7 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
static void
-nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
{
int i, n;
u_char signo;
@@ -278,7 +282,7 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data)
ev = obj;
- nxt_log_debug(thr->log, "engine signal pipe");
+ nxt_debug(task, "engine signal pipe");
post = 0;
@@ -288,17 +292,17 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data)
for (i = 0; i < n; i++) {
signo = buf[i];
- nxt_log_debug(thr->log, "engine pipe signo:%d", signo);
+ nxt_debug(task, "engine pipe signo:%d", signo);
if (signo == 0) {
/* A post should be processed only once. */
post = 1;
} else {
- sigev = nxt_event_engine_signal_find(thr, signo);
+ sigev = nxt_event_engine_signal_find(task, signo);
if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(thr, (void *) (uintptr_t) signo,
+ sigev->handler(task, (void *) (uintptr_t) signo,
(void *) sigev->name);
}
}
@@ -307,70 +311,77 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data)
} while (n == sizeof(buf));
if (post) {
- nxt_event_engine_post_handler(thr, NULL, NULL);
+ nxt_event_engine_post_handler(task, NULL, NULL);
}
}
static void
-nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_locked_work_queue_move(thr, &thr->engine->work_queue,
- &thr->work_queue.main);
+ nxt_thread_t *thread;
+
+ thread = task->thread;
+
+ nxt_locked_work_queue_move(thread, &thread->engine->work_queue,
+ &thread->work_queue.main);
}
static void
-nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_fd_t *ev;
+ nxt_event_engine_t *engine;
- ev = obj;
+ engine = task->thread->engine;
- nxt_log_alert(ev->log, "engine pipe(%FD:%FD) event error",
- thr->engine->pipe->fds[0], thr->engine->pipe->fds[1]);
+ nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
+ engine->pipe->fds[0], engine->pipe->fds[1]);
- nxt_event_fd_close(thr->engine, &thr->engine->pipe->event);
- nxt_pipe_close(thr->engine->pipe->fds);
+ nxt_event_fd_close(engine, &engine->pipe->event);
+ nxt_pipe_close(engine->pipe->fds);
}
static void
-nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
{
uintptr_t signo;
const nxt_event_sig_t *sigev;
signo = (uintptr_t) obj;
- sigev = nxt_event_engine_signal_find(thr, signo);
+ sigev = nxt_event_engine_signal_find(task, signo);
if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(thr, (void *) (uintptr_t) signo, (void *) sigev->name);
+ sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name);
}
}
static const nxt_event_sig_t *
-nxt_event_engine_signal_find(nxt_thread_t *thr, nxt_uint_t signo)
+nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo)
{
const nxt_event_sig_t *sigev;
- for (sigev = thr->engine->signals->sigev; sigev->signo != 0; sigev++) {
+ for (sigev = task->thread->engine->signals->sigev;
+ sigev->signo != 0;
+ sigev++)
+ {
if (signo == (nxt_uint_t) sigev->signo) {
return sigev;
}
}
- nxt_log_alert(thr->log, "signal %ui handler not found", signo);
+ nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
return NULL;
}
nxt_int_t
-nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
- nxt_uint_t batch)
+nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
+ const nxt_event_set_ops_t *event_set, nxt_uint_t batch)
{
nxt_uint_t events;
nxt_event_engine_t *engine;
@@ -389,7 +400,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
* Add to thread main work queue the signal events possibly
* received before the blocking signal processing.
*/
- nxt_event_engine_signal_pipe(thr, &engine->pipe->event, NULL);
+ nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
}
if (engine->pipe != NULL && event_set->enable_post != NULL) {
@@ -399,7 +410,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
*/
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_event_engine_signal_pipe_close,
- engine->pipe, NULL, &nxt_main_log);
+ &engine->task, engine->pipe, NULL);
engine->pipe = NULL;
}
@@ -415,7 +426,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
engine->event = event_set;
- if (nxt_event_engine_post_init(thr, engine) != NXT_OK) {
+ if (nxt_event_engine_post_init(engine) != NXT_OK) {
return NXT_ERROR;
}
@@ -459,6 +470,7 @@ void
nxt_event_engine_start(nxt_event_engine_t *engine)
{
void *obj, *data;
+ nxt_task_t *task;
nxt_msec_t timeout, now;
nxt_thread_t *thr;
nxt_work_handler_t handler;
@@ -478,25 +490,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
for ( ;; ) {
for ( ;; ) {
- handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
+ handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
if (handler == NULL) {
break;
}
- handler(thr, obj, data);
+ handler(task, obj, data);
thr->log = &nxt_main_log;
}
for ( ;; ) {
- handler = nxt_thread_last_work_queue_pop(thr, &obj, &data,
- &thr->log);
+ handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data);
+
if (handler == NULL) {
break;
}
- handler(thr, obj, data);
+ handler(task, obj, data);
thr->log = &nxt_main_log;
}
@@ -508,7 +520,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
timeout = nxt_event_timer_find(engine);
- engine->event->poll(thr, engine->event_set, timeout);
+ engine->event->poll(task, engine->event_set, timeout);
/*
* Look up expired timers only if a new zero timer has been