summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_conn_accept.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-06-14 15:18:52 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-06-14 15:18:52 +0300
commit7574c64992b98d3dfbc3dd101bd0f7d78bad0823 (patch)
tree3a98c46e88d9023df34be3e6cce4f762d53aad36 /src/nxt_conn_accept.c
parent3e2632688f53c4cb08e7ac03c61e71facd038df4 (diff)
downloadunit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.gz
unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.bz2
nxt_event_conn_... functions and structures have been renamed
to nxt_conn_...
Diffstat (limited to 'src/nxt_conn_accept.c')
-rw-r--r--src/nxt_conn_accept.c366
1 files changed, 366 insertions, 0 deletions
diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c
new file mode 100644
index 00000000..eb0172f4
--- /dev/null
+++ b/src/nxt_conn_accept.c
@@ -0,0 +1,366 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+/*
+ * A listen socket handler calls an event facility specific io_accept()
+ * method. The method accept()s a new connection and then calls
+ * nxt_event_conn_accept() to handle the new connection and to prepare
+ * for a next connection to avoid just dropping next accept()ed socket
+ * if no more connections allowed. If there are no available connections
+ * an idle connection would be closed. If there are no idle connections
+ * then new connections will not be accept()ed for 1 second.
+ */
+
+
+static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
+ void *data);
+
+
+nxt_listen_event_t *
+nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
+{
+ nxt_listen_event_t *lev;
+ nxt_event_engine_t *engine;
+
+ lev = nxt_zalloc(sizeof(nxt_listen_event_t));
+
+ if (nxt_fast_path(lev != NULL)) {
+ lev->socket.fd = ls->socket;
+
+ engine = task->thread->engine;
+ lev->batch = engine->batch;
+
+ lev->socket.read_work_queue = &engine->accept_work_queue;
+ lev->socket.read_handler = nxt_conn_listen_handler;
+ lev->socket.error_handler = nxt_conn_listen_event_error;
+ lev->socket.log = &nxt_main_log;
+
+ lev->accept = engine->event.io->accept;
+
+ lev->listen = ls;
+ lev->work_queue = &engine->read_work_queue;
+
+ lev->timer.work_queue = &engine->fast_work_queue;
+ lev->timer.handler = nxt_conn_listen_timer_handler;
+ lev->timer.log = &nxt_main_log;
+
+ lev->task.thread = task->thread;
+ lev->task.log = &nxt_main_log;
+ lev->task.ident = nxt_task_next_ident();
+ lev->socket.task = &lev->task;
+ lev->timer.task = &lev->task;
+
+ if (nxt_conn_accept_alloc(task, lev) != NULL) {
+ nxt_fd_event_enable_accept(engine, &lev->socket);
+
+ nxt_queue_insert_head(&engine->listen_connections, &lev->link);
+ }
+
+ return lev;
+ }
+
+ return NULL;
+}
+
+
+static nxt_conn_t *
+nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+ nxt_sockaddr_t *sa, *remote;
+ nxt_mem_pool_t *mp;
+ nxt_event_engine_t *engine;
+ nxt_listen_socket_t *ls;
+
+ engine = task->thread->engine;
+
+ if (engine->connections < engine->max_connections) {
+
+ mp = nxt_mem_pool_create(lev->listen->mem_pool_size);
+
+ if (nxt_fast_path(mp != NULL)) {
+ /* This allocation cannot fail. */
+ c = nxt_conn_create(mp, lev->socket.task);
+
+ lev->next = c;
+ c->socket.read_work_queue = lev->socket.read_work_queue;
+ c->socket.write_ready = 1;
+ c->listen = lev;
+
+ ls = lev->listen;
+ /* This allocation cannot fail. */
+ remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
+ c->remote = remote;
+
+ sa = ls->sockaddr;
+ remote->type = sa->type;
+ /*
+ * Set address family for unspecified Unix domain,
+ * because these sockaddr's are not be passed to accept().
+ */
+ remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family;
+
+ return c;
+ }
+ }
+
+ return NULL;
+}
+
+
+static void
+nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_listen_event_t *lev;
+
+ lev = obj;
+ lev->ready = lev->batch;
+
+ lev->accept(task, lev, data);
+}
+
+
+void
+nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
+{
+ socklen_t len;
+ nxt_conn_t *c;
+ nxt_socket_t s;
+ struct sockaddr *sa;
+ nxt_listen_event_t *lev;
+
+ lev = obj;
+ c = lev->next;
+
+ lev->ready--;
+ lev->socket.read_ready = (lev->ready != 0);
+
+ len = c->remote->socklen;
+
+ if (len >= sizeof(struct sockaddr)) {
+ sa = &c->remote->u.sockaddr;
+
+ } else {
+ sa = NULL;
+ len = 0;
+ }
+
+ s = accept(lev->socket.fd, sa, &len);
+
+ if (s == -1) {
+ nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
+ return;
+ }
+
+ c->socket.fd = s;
+
+#if (NXT_LINUX)
+ /*
+ * Linux does not inherit non-blocking mode
+ * from listen socket for accept()ed socket.
+ */
+ if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
+ nxt_socket_close(task, s);
+ }
+
+#endif
+
+ nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
+
+ nxt_conn_accept(task, lev, c);
+}
+
+
+void
+nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
+{
+ nxt_conn_t *next;
+
+ nxt_sockaddr_text(c->remote);
+
+ nxt_debug(task, "client: %*s",
+ c->remote->address_length, nxt_sockaddr_address(c->remote));
+
+ nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
+
+ c->read_work_queue = lev->work_queue;
+ c->write_work_queue = lev->work_queue;
+
+ if (lev->listen->read_after_accept) {
+
+ //c->socket.read_ready = 1;
+// lev->listen->handler(task, c, lev->socket.data);
+ nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
+ task, c, lev->socket.data);
+
+ } else {
+ nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
+ task, c, lev->socket.data);
+ }
+
+ next = nxt_conn_accept_next(task, lev);
+
+ if (next != NULL && lev->socket.read_ready) {
+ nxt_work_queue_add(lev->socket.read_work_queue,
+ lev->accept, task, lev, next);
+ }
+}
+
+
+static nxt_conn_t *
+nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+
+ lev->next = NULL;
+
+ do {
+ c = nxt_conn_accept_alloc(task, lev);
+
+ if (nxt_fast_path(c != NULL)) {
+ return c;
+ }
+
+ } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
+
+ nxt_log(task, NXT_LOG_CRIT, "no available connections, "
+ "new connections are not accepted within 1s");
+
+ return NULL;
+}
+
+
+static nxt_int_t
+nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+ nxt_queue_t *idle;
+ nxt_queue_link_t *link;
+ nxt_event_engine_t *engine;
+
+ static nxt_log_moderation_t nxt_idle_close_log_moderation = {
+ NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
+ };
+
+ engine = task->thread->engine;
+
+ idle = &engine->idle_connections;
+
+ for (link = nxt_queue_last(idle);
+ link != nxt_queue_head(idle);
+ link = nxt_queue_next(link))
+ {
+ c = nxt_queue_link_data(link, nxt_conn_t, link);
+
+ if (!c->socket.read_ready) {
+ nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
+ task->log, "no available connections, "
+ "close idle connection");
+ nxt_queue_remove(link);
+ nxt_conn_close(engine, c);
+
+ return NXT_OK;
+ }
+ }
+
+ nxt_timer_add(engine, &lev->timer, 1000);
+
+ nxt_fd_event_disable_read(engine, &lev->socket);
+
+ return NXT_DECLINED;
+}
+
+
+void
+nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
+ const char *accept_syscall, nxt_err_t err)
+{
+ static nxt_log_moderation_t nxt_accept_log_moderation = {
+ NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
+ };
+
+ lev->socket.read_ready = 0;
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
+ return;
+
+ case ECONNABORTED:
+ nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
+ task->log, "%s(%d) failed %E",
+ accept_syscall, lev->socket.fd, err);
+ return;
+
+ case EMFILE:
+ case ENFILE:
+ case ENOBUFS:
+ case ENOMEM:
+ if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
+ "new connections are not accepted within 1s",
+ accept_syscall, lev->socket.fd, err);
+ }
+
+ return;
+
+ default:
+ nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
+ accept_syscall, lev->socket.fd, err);
+ return;
+ }
+}
+
+
+static void
+nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_listen_event_t *lev;
+
+ timer = obj;
+
+ lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
+ c = lev->next;
+
+ if (c == NULL) {
+ c = nxt_conn_accept_next(task, lev);
+
+ if (c == NULL) {
+ return;
+ }
+ }
+
+ nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
+
+ lev->accept(task, lev, c);
+}
+
+
+static void
+nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_fd_event_t *ev;
+
+ ev = obj;
+
+ nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
+}