diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_engine.c | 71 |
1 files changed, 33 insertions, 38 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index ea7fbe62..fbcf2384 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -25,8 +25,9 @@ static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_event_engine_t * -nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, - const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch) +nxt_event_engine_create(nxt_thread_t *thr, + const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, + nxt_uint_t flags, nxt_uint_t batch) { nxt_uint_t events; nxt_event_engine_t *engine; @@ -64,7 +65,6 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, engine->write_work_queue.cache = &engine->work_queue_cache; engine->shutdown_work_queue.cache = &engine->work_queue_cache; engine->close_work_queue.cache = &engine->work_queue_cache; - engine->final_work_queue.cache = &engine->work_queue_cache; nxt_work_queue_name(&engine->fast_work_queue, "fast"); nxt_work_queue_name(&engine->accept_work_queue, "accept"); @@ -74,7 +74,6 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, nxt_work_queue_name(&engine->write_work_queue, "write"); nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); nxt_work_queue_name(&engine->close_work_queue, "close"); - nxt_work_queue_name(&engine->final_work_queue, "final"); if (signals != NULL) { engine->signals = nxt_event_engine_signals(signals); @@ -84,7 +83,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, engine->signals->handler = nxt_event_engine_signal_handler; - if (!event_set->signal_support) { + if (!interface->signal_support) { if (nxt_event_engine_signals_start(engine) != NXT_OK) { goto signals_fail; } @@ -98,12 +97,11 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, */ events = (batch != 0) ? batch : 32; - engine->event_set = event_set->create(engine->signals, 4 * events, events); - if (engine->event_set == NULL) { + if (interface->create(engine, 4 * events, events) != NXT_OK) { goto event_set_fail; } - engine->event = event_set; + engine->event = *interface; if (nxt_event_engine_post_init(engine) != NXT_OK) { goto post_fail; @@ -121,11 +119,9 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, nxt_queue_init(&engine->listen_connections); nxt_queue_init(&engine->idle_connections); - engine->thread = thr; - #if !(NXT_THREADS) - if (engine->event->signal_support) { + if (interface->signal_support) { thr->time.signal = -1; } @@ -136,7 +132,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, timers_fail: post_fail: - event_set->free(engine->event_set); + interface->free(engine); event_set_fail: signals_fail: @@ -155,9 +151,8 @@ fibers_fail: static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine) { - if (engine->event->enable_post != NULL) { - return engine->event->enable_post(engine->event_set, - nxt_event_engine_post_handler); + if (engine->event.enable_post != NULL) { + return engine->event.enable_post(engine, nxt_event_engine_post_handler); } #if !(NXT_THREADS) @@ -201,13 +196,14 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) } pipe->event.fd = pipe->fds[0]; + pipe->event.task = &engine->task; pipe->event.read_work_queue = &engine->fast_work_queue; pipe->event.read_handler = nxt_event_engine_signal_pipe; pipe->event.write_work_queue = &engine->fast_work_queue; pipe->event.error_handler = nxt_event_engine_signal_pipe_error; - pipe->event.log = &nxt_main_log; + pipe->event.log = engine->task.log; - nxt_event_fd_enable_read(engine, &pipe->event); + nxt_fd_event_enable_read(engine, &pipe->event); return NXT_OK; } @@ -223,7 +219,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) if (pipe != NULL) { if (pipe->event.read_work_queue != NULL) { - nxt_event_fd_close(engine, &pipe->event); + nxt_fd_event_close(engine, &pipe->event); nxt_pipe_close(pipe->fds); } @@ -247,7 +243,7 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) void nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) { - nxt_thread_log_debug("event engine post"); + nxt_debug(&engine->task, "event engine post"); nxt_locked_work_queue_add(&engine->locked_work_queue, work); @@ -260,15 +256,15 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) { u_char buf; - nxt_thread_log_debug("event engine signal:%ui", signo); + nxt_debug(&engine->task, "event engine signal:%ui", signo); /* * A signal number may be sent in a signal context, so the signal * information cannot be passed via a locked work queue. */ - if (engine->event->signal != NULL) { - engine->event->signal(engine->event_set, signo); + if (engine->event.signal != NULL) { + engine->event.signal(engine, signo); return; } @@ -283,7 +279,7 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) int i, n; u_char signo; nxt_bool_t post; - nxt_event_fd_t *ev; + nxt_fd_event_t *ev; u_char buf[128]; ev = obj; @@ -342,7 +338,7 @@ nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", engine->pipe->fds[0], engine->pipe->fds[1]); - nxt_event_fd_close(engine, &engine->pipe->event); + nxt_fd_event_close(engine, &engine->pipe->event); nxt_pipe_close(engine->pipe->fds); } @@ -351,7 +347,7 @@ static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) { uintptr_t signo; - const nxt_event_sig_t *sigev; + const nxt_sig_event_t *sigev; signo = (uintptr_t) obj; @@ -371,7 +367,7 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, - const nxt_event_set_ops_t *event_set, nxt_uint_t batch) + const nxt_event_interface_t *interface, nxt_uint_t batch) { nxt_uint_t events; nxt_event_engine_t *engine; @@ -379,7 +375,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, engine = thr->engine; engine->batch = batch; - if (!engine->event->signal_support && event_set->signal_support) { + if (!engine->event.signal_support && interface->signal_support) { /* * Block signal processing if the current event * facility does not support signal processing. @@ -393,28 +389,27 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); } - if (engine->pipe != NULL && event_set->enable_post != NULL) { + if (engine->pipe != NULL && interface->enable_post != NULL) { /* * An engine pipe must be closed after all signal events * added above to engine fast work queue will be processed. */ - nxt_work_queue_add(&engine->final_work_queue, + nxt_work_queue_add(&engine->fast_work_queue, nxt_event_engine_signal_pipe_close, &engine->task, engine->pipe, NULL); engine->pipe = NULL; } - engine->event->free(engine->event_set); + engine->event.free(engine); events = (batch != 0) ? batch : 32; - engine->event_set = event_set->create(engine->signals, 4 * events, events); - if (engine->event_set == NULL) { + if (interface->create(engine, 4 * events, events) != NXT_OK) { return NXT_ERROR; } - engine->event = event_set; + engine->event = *interface; if (nxt_event_engine_post_init(engine) != NXT_OK) { return NXT_ERROR; @@ -422,7 +417,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, if (engine->signals != NULL) { - if (!engine->event->signal_support) { + if (!engine->event.signal_support) { return nxt_event_engine_signals_start(engine); } @@ -447,7 +442,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine) nxt_work_queue_cache_destroy(&engine->work_queue_cache); - engine->event->free(engine->event_set); + engine->event.free(engine); /* TODO: free timers */ @@ -473,7 +468,7 @@ nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, engine->current_work_queue++; wq = engine->current_work_queue; - if (wq > &engine->final_work_queue) { + if (wq > &engine->close_work_queue) { wq = &engine->fast_work_queue; engine->current_work_queue = wq; } @@ -519,7 +514,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine) /* A return point from fibers. */ } - thr->log = &nxt_main_log; + thr->log = engine->task.log; for ( ;; ) { @@ -537,7 +532,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine) timeout = nxt_timer_find(engine); - engine->event->poll(&engine->task, engine->event_set, timeout); + engine->event.poll(engine, timeout); now = nxt_thread_monotonic_time(thr) / 1000000; |