diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_event_engine.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_event_engine.c')
-rw-r--r-- | src/nxt_event_engine.c | 526 |
1 files changed, 526 insertions, 0 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c new file mode 100644 index 00000000..dd0f5fe3 --- /dev/null +++ b/src/nxt_event_engine.c @@ -0,0 +1,526 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static nxt_int_t nxt_event_engine_post_init(nxt_thread_t *thr, + nxt_event_engine_t *engine); +static nxt_int_t nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, + nxt_event_engine_t *engine); +static void nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, + void *data); +static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_thread_t *thr, + nxt_uint_t signo); + + +nxt_event_engine_t * +nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, + const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch) +{ + nxt_uint_t events; + nxt_event_engine_t *engine; + + engine = nxt_zalloc(sizeof(nxt_event_engine_t)); + if (engine == NULL) { + return NULL; + } + + engine->batch = batch; + + if (flags & NXT_ENGINE_FIBERS) { + engine->fibers = nxt_fiber_main_create(engine); + if (engine->fibers == NULL) { + goto fibers_fail; + } + } + + nxt_thread_work_queue_create(thr, 0); + + nxt_work_queue_name(&engine->accept_work_queue, "accept"); + nxt_work_queue_name(&engine->read_work_queue, "read"); + nxt_work_queue_name(&engine->socket_work_queue, "socket"); + nxt_work_queue_name(&engine->connect_work_queue, "connect"); + nxt_work_queue_name(&engine->write_work_queue, "write"); + nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); + nxt_work_queue_name(&engine->close_work_queue, "close"); + +#if (NXT_THREADS) + + nxt_locked_work_queue_create(&engine->work_queue, 7); + +#endif + + if (signals != NULL) { + engine->signals = nxt_event_engine_signals(signals); + if (engine->signals == NULL) { + goto signals_fail; + } + + engine->signals->handler = nxt_event_engine_signal_handler; + + if (!event_set->signal_support) { + if (nxt_event_engine_signals_start(engine) != NXT_OK) { + goto signals_fail; + } + } + } + + /* + * Number of event set and timers changes should be at least twice + * more than number of events to avoid premature flushes of the changes. + * Fourfold is for sure. + */ + events = (batch != 0) ? batch : 32; + + engine->event_set = event_set->create(engine->signals, 4 * events, events); + if (engine->event_set == NULL) { + goto event_set_fail; + } + + engine->event = event_set; + + if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { + goto post_fail; + } + + if (nxt_event_timers_init(&engine->timers, 4 * events) != NXT_OK) { + goto timers_fail; + } + + nxt_thread_time_update(thr); + engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; + + engine->max_connections = 0xffffffff; + + nxt_queue_init(&engine->listen_connections); + nxt_queue_init(&engine->idle_connections); + + thr->engine = engine; + thr->fiber = &engine->fibers->fiber; + +#if !(NXT_THREADS) + + if (engine->event->signal_support) { + thr->time.signal = -1; + } + +#endif + + return engine; + +timers_fail: +post_fail: + + event_set->free(engine->event_set); + +event_set_fail: +signals_fail: + + nxt_free(engine->signals); + nxt_thread_work_queue_destroy(thr); + nxt_free(engine->fibers); + +fibers_fail: + + nxt_free(engine); + return NULL; +} + + +static nxt_int_t +nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine) +{ + if (engine->event->enable_post != NULL) { + return engine->event->enable_post(engine->event_set, + nxt_event_engine_post_handler); + } + +#if !(NXT_THREADS) + + /* Only signals may are posted in single-threaded mode. */ + + if (engine->event->signal_support) { + return NXT_OK; + } + +#endif + + if (nxt_event_engine_signal_pipe_create(thr, engine) != NXT_OK) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, + nxt_event_engine_t *engine) +{ + nxt_event_engine_pipe_t *pipe; + + pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); + if (pipe == NULL) { + return NXT_ERROR; + } + + engine->pipe = pipe; + + /* + * An event engine pipe is in blocking mode for writer + * and in non-blocking node for reader. + */ + + if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) { + nxt_free(pipe); + return NXT_ERROR; + } + + pipe->event.fd = pipe->fds[0]; + pipe->event.read_work_queue = &thr->work_queue.main; + pipe->event.read_handler = nxt_event_engine_signal_pipe; + pipe->event.write_work_queue = &thr->work_queue.main; + pipe->event.error_handler = nxt_event_engine_signal_pipe_error; + pipe->event.log = &nxt_main_log; + + nxt_event_fd_enable_read(engine, &pipe->event); + + return NXT_OK; +} + + +static void +nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) +{ + nxt_event_engine_pipe_t *pipe; + + pipe = engine->pipe; + + if (pipe != NULL) { + + if (pipe->event.read_work_queue != NULL) { + nxt_event_fd_close(engine, &pipe->event); + nxt_pipe_close(pipe->fds); + } + + nxt_free(pipe); + } +} + + +static void +nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_engine_pipe_t *pipe; + + pipe = obj; + + nxt_pipe_close(pipe->fds); + nxt_free(pipe); +} + + +void +nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler, + void *obj, void *data, nxt_log_t *log) +{ + nxt_thread_log_debug("event engine post"); + + nxt_locked_work_queue_add(&engine->work_queue, handler, obj, data, log); + + nxt_event_engine_signal(engine, 0); +} + + +void +nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) +{ + u_char buf; + + nxt_thread_log_debug("event engine signal:%ui", signo); + + /* + * A signal number may be sent in a signal context, so the signal + * information cannot be passed via a locked work queue. + */ + + if (engine->event->signal != NULL) { + engine->event->signal(engine->event_set, signo); + return; + } + + buf = (u_char) signo; + (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); +} + + +static void +nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) +{ + int i, n; + u_char signo; + nxt_bool_t post; + nxt_event_fd_t *ev; + const nxt_event_sig_t *sigev; + u_char buf[128]; + + ev = obj; + + nxt_log_debug(thr->log, "engine signal pipe"); + + post = 0; + + do { + n = nxt_fd_read(ev->fd, buf, sizeof(buf)); + + for (i = 0; i < n; i++) { + signo = buf[i]; + + nxt_log_debug(thr->log, "engine pipe signo:%d", signo); + + if (signo == 0) { + /* A post should be processed only once. */ + post = 1; + + } else { + sigev = nxt_event_engine_signal_find(thr, signo); + + if (nxt_fast_path(sigev != NULL)) { + sigev->handler(thr, (void *) (uintptr_t) signo, + (void *) sigev->name); + } + } + } + + } while (n == sizeof(buf)); + + if (post) { + nxt_event_engine_post_handler(thr, NULL, NULL); + } +} + + +static void +nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_locked_work_queue_move(thr, &thr->engine->work_queue, + &thr->work_queue.main); +} + + +static void +nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_fd_t *ev; + + ev = obj; + + nxt_log_alert(ev->log, "engine pipe(%FD:%FD) event error", + thr->engine->pipe->fds[0], thr->engine->pipe->fds[1]); + + nxt_event_fd_close(thr->engine, &thr->engine->pipe->event); + nxt_pipe_close(thr->engine->pipe->fds); +} + + +static void +nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, void *data) +{ + uintptr_t signo; + const nxt_event_sig_t *sigev; + + signo = (uintptr_t) obj; + + sigev = nxt_event_engine_signal_find(thr, signo); + + if (nxt_fast_path(sigev != NULL)) { + sigev->handler(thr, (void *) (uintptr_t) signo, (void *) sigev->name); + } +} + + +static const nxt_event_sig_t * +nxt_event_engine_signal_find(nxt_thread_t *thr, nxt_uint_t signo) +{ + const nxt_event_sig_t *sigev; + + for (sigev = thr->engine->signals->sigev; sigev->signo != 0; sigev++) { + if (signo == (nxt_uint_t) sigev->signo) { + return sigev; + } + } + + nxt_log_alert(thr->log, "signal %ui handler not found", signo); + + return NULL; +} + + +nxt_int_t +nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, + nxt_uint_t batch) +{ + nxt_uint_t events; + nxt_event_engine_t *engine; + + engine = thr->engine; + engine->batch = batch; + + if (!engine->event->signal_support && event_set->signal_support) { + /* + * Block signal processing if the current event + * facility does not support signal processing. + */ + nxt_event_engine_signals_stop(engine); + + /* + * Add to thread main work queue the signal events possibly + * received before the blocking signal processing. + */ + nxt_event_engine_signal_pipe(thr, &engine->pipe->event, NULL); + } + + if (engine->pipe != NULL && event_set->enable_post != NULL) { + /* + * An engine pipe must be closed after all signal events + * added above to thread main work queue will be processed. + */ + nxt_thread_work_queue_add(thr, &thr->work_queue.main, + nxt_event_engine_signal_pipe_close, + engine->pipe, NULL, &nxt_main_log); + + engine->pipe = NULL; + } + + engine->event->free(engine->event_set); + + events = (batch != 0) ? batch : 32; + + engine->event_set = event_set->create(engine->signals, 4 * events, events); + if (engine->event_set == NULL) { + return NXT_ERROR; + } + + engine->event = event_set; + + if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { + return NXT_ERROR; + } + + if (engine->signals != NULL) { + + if (!engine->event->signal_support) { + return nxt_event_engine_signals_start(engine); + } + +#if (NXT_THREADS) + /* + * Reset the PID flag to start the signal thread if + * some future event facility will not support signals. + */ + engine->signals->process = 0; +#endif + } + + return NXT_OK; +} + + +void +nxt_event_engine_free(nxt_event_engine_t *engine) +{ + nxt_event_engine_signal_pipe_free(engine); + nxt_free(engine->signals); + + nxt_locked_work_queue_destroy(&engine->work_queue); + nxt_thread_work_queue_destroy(nxt_thread()); + + engine->event->free(engine->event_set); + + /* TODO: free timers */ + + nxt_free(engine); +} + + +void +nxt_event_engine_start(nxt_event_engine_t *engine) +{ + void *obj, *data; + nxt_msec_t timeout, now; + nxt_thread_t *thr; + nxt_work_handler_t handler; + + thr = nxt_thread(); + + if (engine->fibers) { + /* + * _setjmp() cannot be wrapped in a function since return from + * the function clobbers stack used by future _setjmp() returns. + */ + _setjmp(engine->fibers->fiber.jmp); + + /* A return point from fibers. */ + } + + for ( ;; ) { + + for ( ;; ) { + handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); + + if (handler == NULL) { + break; + } + + handler(thr, obj, data); + + thr->log = &nxt_main_log; + } + + for ( ;; ) { + handler = nxt_thread_last_work_queue_pop(thr, &obj, &data, + &thr->log); + if (handler == NULL) { + break; + } + + handler(thr, obj, data); + + thr->log = &nxt_main_log; + } + + /* Attach some event engine work queues in preferred order. */ + + nxt_work_queue_attach(thr, &engine->accept_work_queue); + nxt_work_queue_attach(thr, &engine->read_work_queue); + + timeout = nxt_event_timer_find(engine); + + engine->event->poll(thr, engine->event_set, timeout); + + /* + * Look up expired timers only if a new zero timer has been + * just added before the event poll or if the event poll slept + * at least 1 millisecond, because all old eligible timers were + * processed in the previous iterations. + */ + + now = nxt_thread_monotonic_time(thr) / 1000000; + + if (timeout == 0 || now != engine->timers.now) { + nxt_event_timer_expire(thr, now); + } + } +} |