diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_epoll.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_epoll.c | 129 |
1 files changed, 68 insertions, 61 deletions
diff --git a/src/nxt_epoll.c b/src/nxt_epoll.c index 65e9eb8d..0fe81092 100644 --- a/src/nxt_epoll.c +++ b/src/nxt_epoll.c @@ -65,37 +65,37 @@ static void nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op, uint32_t events); -static nxt_int_t nxt_epoll_commit_changes(nxt_thread_t *thr, +static nxt_int_t nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es); -static void nxt_epoll_error_handler(nxt_thread_t *thr, void *obj, +static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); #if (NXT_HAVE_SIGNALFD) static nxt_int_t nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals); -static void nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, +static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EVENTFD) static nxt_int_t nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler); -static void nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, +static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); static void nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo); #endif -static void nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, +static void nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout); #if (NXT_HAVE_ACCEPT4) -static void nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, +static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EPOLL_EDGE) -static void nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, +static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_epoll_edge_event_conn_connected(nxt_thread_t *thr, void *obj, +static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data); static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); @@ -427,7 +427,7 @@ nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) es = &event_set->epoll; if (es->nchanges != 0) { - (void) nxt_epoll_commit_changes(nxt_thread(), &event_set->epoll); + (void) nxt_epoll_commit_changes(ev->task, &event_set->epoll); } } @@ -614,7 +614,7 @@ nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op, es->epoll, ev->fd, op, events); if (es->nchanges >= es->mchanges) { - (void) nxt_epoll_commit_changes(nxt_thread(), es); + (void) nxt_epoll_commit_changes(ev->task, es); } ch = &es->changes[es->nchanges++]; @@ -626,13 +626,13 @@ nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op, static nxt_int_t -nxt_epoll_commit_changes(nxt_thread_t *thr, nxt_epoll_event_set_t *es) +nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es) { nxt_int_t ret; nxt_event_fd_t *ev; nxt_epoll_change_t *ch, *end; - nxt_log_debug(thr->log, "epoll %d changes:%ui", es->epoll, es->nchanges); + nxt_debug(task, "epoll %d changes:%ui", es->epoll, es->nchanges); ret = NXT_OK; ch = es->changes; @@ -641,16 +641,17 @@ nxt_epoll_commit_changes(nxt_thread_t *thr, nxt_epoll_event_set_t *es) do { ev = ch->event.data.ptr; - nxt_log_debug(ev->log, "epoll_ctl(%d): fd:%d op:%d ev:%XD", - es->epoll, ch->fd, ch->op, ch->event.events); + nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", + es->epoll, ch->fd, ch->op, ch->event.events); if (epoll_ctl(es->epoll, ch->op, ch->fd, &ch->event) != 0) { - nxt_log_alert(ev->log, "epoll_ctl(%d, %d, %d) failed %E", - es->epoll, ch->op, ch->fd, nxt_errno); + nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + es->epoll, ch->op, ch->fd, nxt_errno); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, + nxt_thread_work_queue_add(task->thread, + &task->thread->work_queue.main, nxt_epoll_error_handler, - ev, ev->data, ev->log); + ev->task, ev, ev->data); ret = NXT_ERROR; } @@ -666,7 +667,7 @@ nxt_epoll_commit_changes(nxt_thread_t *thr, nxt_epoll_event_set_t *es) static void -nxt_epoll_error_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_fd_t *ev; @@ -675,7 +676,7 @@ nxt_epoll_error_handler(nxt_thread_t *thr, void *obj, void *data) ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; - ev->error_handler(thr, ev, data); + ev->error_handler(ev->task, ev, data); } @@ -685,6 +686,7 @@ static nxt_int_t nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) { int fd; + nxt_thread_t *thr; struct epoll_event ee; if (sigprocmask(SIG_BLOCK, &signals->sigmask, NULL) != 0) { @@ -722,6 +724,9 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) es->signalfd.read_handler = nxt_epoll_signalfd_handler; es->signalfd.log = &nxt_main_log; + thr = nxt_thread(); + es->signalfd.task = &thr->engine->task; + ee.events = EPOLLIN; ee.data.ptr = &es->signalfd; @@ -737,7 +742,7 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) static void -nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) { int n; nxt_event_fd_t *ev; @@ -747,20 +752,20 @@ nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, void *data) ev = obj; handler = data; - nxt_log_debug(thr->log, "signalfd handler"); + nxt_debug(task, "signalfd handler"); n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); - nxt_log_debug(thr->log, "read signalfd(%d): %d", ev->fd, n); + nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); if (n != sizeof(struct signalfd_siginfo)) { - nxt_log_alert(thr->log, "read signalfd(%d) failed %E", - ev->fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", + ev->fd, nxt_errno); } - nxt_log_debug(thr->log, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); + nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); - handler(thr, (void *) (uintptr_t) sfd.ssi_signo, NULL); + handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); } #endif @@ -771,6 +776,7 @@ nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, void *data) static nxt_int_t nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) { + nxt_thread_t *thr; struct epoll_event ee; nxt_epoll_event_set_t *es; @@ -804,6 +810,9 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) es->eventfd.data = es; es->eventfd.log = &nxt_main_log; + thr = nxt_thread(); + es->eventfd.task = &thr->engine->task; + ee.events = EPOLLIN | EPOLLET; ee.data.ptr = &es->eventfd; @@ -819,7 +828,7 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) static void -nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) { int n; uint64_t events; @@ -827,7 +836,7 @@ nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, void *data) es = data; - nxt_log_debug(thr->log, "eventfd handler, times:%ui", es->neventfd); + nxt_debug(task, "eventfd handler, times:%ui", es->neventfd); /* * The maximum value after write() to a eventfd() descriptor will @@ -843,16 +852,15 @@ nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, void *data) n = read(es->eventfd.fd, &events, sizeof(uint64_t)); - nxt_log_debug(thr->log, "read(%d): %d events:%uL", - es->eventfd.fd, n, events); + nxt_debug(task, "read(%d): %d events:%uL", es->eventfd.fd, n, events); if (n != sizeof(uint64_t)) { - nxt_log_alert(thr->log, "read eventfd(%d) failed %E", - es->eventfd.fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", + es->eventfd.fd, nxt_errno); } } - es->post_handler(thr, NULL, NULL); + es->post_handler(task, NULL, NULL); } @@ -881,7 +889,7 @@ nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo) static void -nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, +nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout) { int nevents; @@ -897,26 +905,25 @@ nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, es = &event_set->epoll; if (es->nchanges != 0) { - if (nxt_epoll_commit_changes(thr, es) != NXT_OK) { + if (nxt_epoll_commit_changes(task, es) != NXT_OK) { /* Error handlers have been enqueued on failure. */ timeout = 0; } } - nxt_log_debug(thr->log, "epoll_wait(%d) timeout:%M", es->epoll, timeout); + nxt_debug(task, "epoll_wait(%d) timeout:%M", es->epoll, timeout); nevents = epoll_wait(es->epoll, es->events, es->mevents, timeout); err = (nevents == -1) ? nxt_errno : 0; - nxt_thread_time_update(thr); + nxt_thread_time_update(task->thread); - nxt_log_debug(thr->log, "epoll_wait(%d): %d", es->epoll, nevents); + nxt_debug(task, "epoll_wait(%d): %d", es->epoll, nevents); if (nevents == -1) { level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log_error(level, thr->log, "epoll_wait(%d) failed %E", - es->epoll, err); + nxt_log(task, level, "epoll_wait(%d) failed %E", es->epoll, err); return; } @@ -926,8 +933,8 @@ nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, events = event->events; ev = event->data.ptr; - nxt_log_debug(ev->log, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", - ev->fd, events, ev, ev->read, ev->write); + nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", + ev->fd, events, ev, ev->read, ev->write); /* * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or @@ -953,9 +960,9 @@ nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, error = 0; - nxt_thread_work_queue_add(thr, ev->read_work_queue, + nxt_thread_work_queue_add(task->thread, ev->read_work_queue, ev->read_handler, - ev, ev->data, ev->log); + ev->task, ev, ev->data); } else if (event_set->epoll.mode == 0) { /* Level-triggered mode. */ @@ -974,9 +981,9 @@ nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, error = 0; - nxt_thread_work_queue_add(thr, ev->write_work_queue, + nxt_thread_work_queue_add(task->thread, ev->write_work_queue, ev->write_handler, - ev, ev->data, ev->log); + ev->task, ev, ev->data); } else if (event_set->epoll.mode == 0) { /* Level-triggered mode. */ @@ -995,7 +1002,7 @@ nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, #if (NXT_HAVE_ACCEPT4) static void -nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, void *data) +nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) { socklen_t len; nxt_socket_t s; @@ -1024,13 +1031,13 @@ nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, void *data) if (s != -1) { c->socket.fd = s; - nxt_log_debug(thr->log, "accept4(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); - nxt_event_conn_accept(thr, cls, c); + nxt_event_conn_accept(task, cls, c); return; } - nxt_event_conn_accept_error(thr, cls, "accept4", nxt_errno); + nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); } #endif @@ -1049,8 +1056,7 @@ nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, void *data) */ static void -nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, - void *data) +nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; @@ -1071,9 +1077,9 @@ nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, c->socket.write_handler = nxt_epoll_edge_event_conn_connected; c->socket.error_handler = nxt_event_conn_connect_error; - nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer); - nxt_epoll_enable(thr->engine->event_set, &c->socket); + nxt_epoll_enable(task->thread->engine->event_set, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; return; @@ -1115,18 +1121,19 @@ nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, break; } - nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, + task, c, data); } static void -nxt_epoll_edge_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) +nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_log_debug(thr->log, "epoll event conn connected fd:%d", c->socket.fd); + nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); if (!c->socket.epoll_error) { c->socket.write = NXT_EVENT_BLOCKED; @@ -1135,12 +1142,12 @@ nxt_epoll_edge_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) nxt_event_timer_disable(&c->write_timer); } - nxt_event_conn_io_handle(thr, c->write_work_queue, - c->write_state->ready_handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, + c->write_state->ready_handler, task, c, data); return; } - nxt_event_conn_connect_test(thr, c, data); + nxt_event_conn_connect_test(task, c, data); } |