diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn_accept.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_conn_accept.c | 133 |
1 files changed, 70 insertions, 63 deletions
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c index ef0eea2c..a1339235 100644 --- a/src/nxt_event_conn_accept.c +++ b/src/nxt_event_conn_accept.c @@ -18,23 +18,24 @@ */ -static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_thread_t *thr, +static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_handler(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data); -static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_thread_t *thr, +static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls); -static nxt_int_t nxt_event_conn_accept_close_idle(nxt_thread_t *thr, +static nxt_int_t nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_event_error(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_listen_timer_handler(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data); nxt_int_t -nxt_event_conn_listen(nxt_thread_t *thr, nxt_listen_socket_t *ls) +nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) { + nxt_event_engine_t *engine; nxt_event_conn_listen_t *cls; cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); @@ -42,13 +43,14 @@ nxt_event_conn_listen(nxt_thread_t *thr, nxt_listen_socket_t *ls) if (nxt_fast_path(cls != NULL)) { cls->socket.fd = ls->socket; - cls->batch = thr->engine->batch; + engine = task->thread->engine; + cls->batch = engine->batch; if (cls->batch != 0) { - cls->socket.read_work_queue = &thr->engine->accept_work_queue; + cls->socket.read_work_queue = &engine->accept_work_queue; } else { - cls->socket.read_work_queue = &thr->work_queue.main; + cls->socket.read_work_queue = &task->thread->work_queue.main; cls->batch = 1; } @@ -56,20 +58,26 @@ nxt_event_conn_listen(nxt_thread_t *thr, nxt_listen_socket_t *ls) cls->socket.error_handler = nxt_event_conn_listen_event_error; cls->socket.log = &nxt_main_log; - cls->accept = thr->engine->event->io->accept; + cls->accept = engine->event->io->accept; cls->listen = ls; - cls->timer.work_queue = &thr->work_queue.main; + cls->timer.work_queue = &task->thread->work_queue.main; cls->timer.handler = nxt_event_conn_listen_timer_handler; cls->timer.log = &nxt_main_log; nxt_event_timer_ident(&cls->timer, cls->socket.fd); - if (nxt_event_conn_accept_alloc(thr, cls) != NULL) { - nxt_event_fd_enable_accept(thr->engine, &cls->socket); + cls->task.thread = task->thread; + cls->task.log = &nxt_main_log; + cls->task.ident = nxt_task_next_ident(); + cls->socket.task = &cls->task; + cls->timer.task = &cls->task; - nxt_queue_insert_head(&thr->engine->listen_connections, &cls->link); + if (nxt_event_conn_accept_alloc(task, cls) != NULL) { + nxt_event_fd_enable_accept(engine, &cls->socket); + + nxt_queue_insert_head(&engine->listen_connections, &cls->link); } return NXT_OK; @@ -80,14 +88,17 @@ nxt_event_conn_listen(nxt_thread_t *thr, nxt_listen_socket_t *ls) static nxt_event_conn_t * -nxt_event_conn_accept_alloc(nxt_thread_t *thr, nxt_event_conn_listen_t *cls) +nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) { nxt_sockaddr_t *sa, *remote; nxt_mem_pool_t *mp; nxt_event_conn_t *c; + nxt_event_engine_t *engine; nxt_listen_socket_t *ls; - if (thr->engine->connections < thr->engine->max_connections) { + engine = task->thread->engine; + + if (engine->connections < engine->max_connections) { mp = nxt_mem_pool_create(cls->listen->mem_pool_size); @@ -123,19 +134,19 @@ nxt_event_conn_accept_alloc(nxt_thread_t *thr, nxt_event_conn_listen_t *cls) static void -nxt_event_conn_listen_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_listen_t *cls; cls = obj; cls->ready = cls->batch; - cls->accept(thr, cls, data); + cls->accept(task, cls, data); } void -nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) { socklen_t len; nxt_socket_t s; @@ -162,7 +173,7 @@ nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) s = accept(cls->socket.fd, sa, &len); if (s == -1) { - nxt_event_conn_accept_error(thr, cls, "accept", nxt_socket_errno); + nxt_event_conn_accept_error(task, cls, "accept", nxt_socket_errno); return; } @@ -179,14 +190,14 @@ nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) #endif - nxt_log_debug(thr->log, "accept(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); - nxt_event_conn_accept(thr, cls, c); + nxt_event_conn_accept(task, cls, c); } void -nxt_event_conn_accept(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, +nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, nxt_event_conn_t *c) { nxt_event_conn_t *next; @@ -197,10 +208,9 @@ nxt_event_conn_accept(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, /* This allocation cannot fail. */ (void) nxt_sockaddr_text(c->mem_pool, c->remote, 0); - nxt_log_debug(c->socket.log, "client: %*s", - c->remote->text_len, c->remote->text); + nxt_debug(task, "client: %*s", c->remote->text_len, c->remote->text); - nxt_queue_insert_head(&thr->engine->idle_connections, &c->link); + nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); c->read_work_queue = c->listen->work_queue; c->write_work_queue = c->listen->work_queue; @@ -208,41 +218,39 @@ nxt_event_conn_accept(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, if (c->listen->read_after_accept) { //c->socket.read_ready = 1; - thr->log = c->socket.log; - c->listen->handler(thr, c, NULL); - thr->log = cls->socket.log; + c->listen->handler(task, c, NULL); } else { - nxt_thread_work_queue_add(thr, c->write_work_queue, - c->listen->handler, c, NULL, c->socket.log); + nxt_thread_work_queue_add(task->thread, c->write_work_queue, + c->listen->handler, task, c, NULL); } - next = nxt_event_conn_accept_next(thr, cls); + next = nxt_event_conn_accept_next(task, cls); if (next != NULL && cls->socket.read_ready) { - nxt_thread_work_queue_add(thr, cls->socket.read_work_queue, - cls->accept, cls, next, cls->socket.log); + nxt_thread_work_queue_add(task->thread, cls->socket.read_work_queue, + cls->accept, task, cls, next); } } static nxt_event_conn_t * -nxt_event_conn_accept_next(nxt_thread_t *thr, nxt_event_conn_listen_t *cls) +nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls) { nxt_event_conn_t *c; cls->socket.data = NULL; do { - c = nxt_event_conn_accept_alloc(thr, cls); + c = nxt_event_conn_accept_alloc(task, cls); if (nxt_fast_path(c != NULL)) { return c; } - } while (nxt_event_conn_accept_close_idle(thr, cls) == NXT_OK); + } while (nxt_event_conn_accept_close_idle(task, cls) == NXT_OK); - nxt_log_alert(cls->socket.log, "no available connections, " + nxt_log(task, NXT_LOG_CRIT, "no available connections, " "new connections are not accepted within 1s"); return NULL; @@ -250,8 +258,7 @@ nxt_event_conn_accept_next(nxt_thread_t *thr, nxt_event_conn_listen_t *cls) static nxt_int_t -nxt_event_conn_accept_close_idle(nxt_thread_t *thr, - nxt_event_conn_listen_t *cls) +nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls) { nxt_queue_t *idle; nxt_queue_link_t *link; @@ -261,7 +268,7 @@ nxt_event_conn_accept_close_idle(nxt_thread_t *thr, NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION }; - idle = &thr->engine->idle_connections; + idle = &task->thread->engine->idle_connections; for (link = nxt_queue_last(idle); link != nxt_queue_head(idle); @@ -271,24 +278,25 @@ nxt_event_conn_accept_close_idle(nxt_thread_t *thr, if (!c->socket.read_ready) { nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, - thr->log, "no available connections, " + task->log, "no available connections, " "close idle connection"); nxt_queue_remove(link); - nxt_event_conn_close(thr, c); + nxt_event_conn_close(task, c); return NXT_OK; } } - nxt_event_timer_add(thr->engine, &cls->timer, 1000); - nxt_event_fd_disable_read(thr->engine, &cls->socket); + nxt_event_timer_add(task->thread->engine, &cls->timer, 1000); + + nxt_event_fd_disable_read(task->thread->engine, &cls->socket); return NXT_DECLINED; } void -nxt_event_conn_accept_error(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, +nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err) { static nxt_log_moderation_t nxt_accept_log_moderation = { @@ -300,13 +308,12 @@ nxt_event_conn_accept_error(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, switch (err) { case NXT_EAGAIN: - nxt_log_debug(thr->log, "%s(%d) %E", - accept_syscall, cls->socket.fd, err); + nxt_debug(task, "%s(%d) %E", accept_syscall, cls->socket.fd, err); return; case ECONNABORTED: - nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_INFO, - thr->log, "%s(%d) failed %E", + nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, + task->log, "%s(%d) failed %E", accept_syscall, cls->socket.fd, err); return; @@ -314,24 +321,24 @@ nxt_event_conn_accept_error(nxt_thread_t *thr, nxt_event_conn_listen_t *cls, case ENFILE: case ENOBUFS: case ENOMEM: - if (nxt_event_conn_accept_close_idle(thr, cls) != NXT_OK) { - nxt_log_alert(thr->log, "%s(%d) failed %E, " - "new connections are not accepted within 1s", - accept_syscall, cls->socket.fd, err); + if (nxt_event_conn_accept_close_idle(task, cls) != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " + "new connections are not accepted within 1s", + accept_syscall, cls->socket.fd, err); } return; default: - nxt_log_alert(thr->log, "%s(%d) failed %E", - accept_syscall, cls->socket.fd, err); + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", + accept_syscall, cls->socket.fd, err); return; } } static void -nxt_event_conn_listen_timer_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_timer_t *ev; @@ -343,25 +350,25 @@ nxt_event_conn_listen_timer_handler(nxt_thread_t *thr, void *obj, void *data) c = cls->socket.data; if (c == NULL) { - c = nxt_event_conn_accept_next(thr, cls); + c = nxt_event_conn_accept_next(task, cls); if (c == NULL) { return; } } - nxt_event_fd_enable_accept(thr->engine, &cls->socket); + nxt_event_fd_enable_accept(task->thread->engine, &cls->socket); - cls->accept(thr, cls, c); + cls->accept(task, cls, c); } static void -nxt_event_conn_listen_event_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) { nxt_event_fd_t *ev; ev = obj; - nxt_log_alert(thr->log, "accept(%d) event error", ev->fd); + nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); } |