diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
commit | 7574c64992b98d3dfbc3dd101bd0f7d78bad0823 (patch) | |
tree | 3a98c46e88d9023df34be3e6cce4f762d53aad36 /src/nxt_conn_accept.c | |
parent | 3e2632688f53c4cb08e7ac03c61e71facd038df4 (diff) | |
download | unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.gz unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.bz2 |
nxt_event_conn_... functions and structures have been renamed
to nxt_conn_...
Diffstat (limited to 'src/nxt_conn_accept.c')
-rw-r--r-- | src/nxt_conn_accept.c | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c new file mode 100644 index 00000000..eb0172f4 --- /dev/null +++ b/src/nxt_conn_accept.c @@ -0,0 +1,366 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * A listen socket handler calls an event facility specific io_accept() + * method. The method accept()s a new connection and then calls + * nxt_event_conn_accept() to handle the new connection and to prepare + * for a next connection to avoid just dropping next accept()ed socket + * if no more connections allowed. If there are no available connections + * an idle connection would be closed. If there are no idle connections + * then new connections will not be accept()ed for 1 second. + */ + + +static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, + void *data); +static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, + nxt_listen_event_t *lev); +static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, + void *data); + + +nxt_listen_event_t * +nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) +{ + nxt_listen_event_t *lev; + nxt_event_engine_t *engine; + + lev = nxt_zalloc(sizeof(nxt_listen_event_t)); + + if (nxt_fast_path(lev != NULL)) { + lev->socket.fd = ls->socket; + + engine = task->thread->engine; + lev->batch = engine->batch; + + lev->socket.read_work_queue = &engine->accept_work_queue; + lev->socket.read_handler = nxt_conn_listen_handler; + lev->socket.error_handler = nxt_conn_listen_event_error; + lev->socket.log = &nxt_main_log; + + lev->accept = engine->event.io->accept; + + lev->listen = ls; + lev->work_queue = &engine->read_work_queue; + + lev->timer.work_queue = &engine->fast_work_queue; + lev->timer.handler = nxt_conn_listen_timer_handler; + lev->timer.log = &nxt_main_log; + + lev->task.thread = task->thread; + lev->task.log = &nxt_main_log; + lev->task.ident = nxt_task_next_ident(); + lev->socket.task = &lev->task; + lev->timer.task = &lev->task; + + if (nxt_conn_accept_alloc(task, lev) != NULL) { + nxt_fd_event_enable_accept(engine, &lev->socket); + + nxt_queue_insert_head(&engine->listen_connections, &lev->link); + } + + return lev; + } + + return NULL; +} + + +static nxt_conn_t * +nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_sockaddr_t *sa, *remote; + nxt_mem_pool_t *mp; + nxt_event_engine_t *engine; + nxt_listen_socket_t *ls; + + engine = task->thread->engine; + + if (engine->connections < engine->max_connections) { + + mp = nxt_mem_pool_create(lev->listen->mem_pool_size); + + if (nxt_fast_path(mp != NULL)) { + /* This allocation cannot fail. */ + c = nxt_conn_create(mp, lev->socket.task); + + lev->next = c; + c->socket.read_work_queue = lev->socket.read_work_queue; + c->socket.write_ready = 1; + c->listen = lev; + + ls = lev->listen; + /* This allocation cannot fail. */ + remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); + c->remote = remote; + + sa = ls->sockaddr; + remote->type = sa->type; + /* + * Set address family for unspecified Unix domain, + * because these sockaddr's are not be passed to accept(). + */ + remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; + + return c; + } + } + + return NULL; +} + + +static void +nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_listen_event_t *lev; + + lev = obj; + lev->ready = lev->batch; + + lev->accept(task, lev, data); +} + + +void +nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) +{ + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; + + lev = obj; + c = lev->next; + + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); + + len = c->remote->socklen; + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept(lev->socket.fd, sa, &len); + + if (s == -1) { + nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); + return; + } + + c->socket.fd = s; + +#if (NXT_LINUX) + /* + * Linux does not inherit non-blocking mode + * from listen socket for accept()ed socket. + */ + if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { + nxt_socket_close(task, s); + } + +#endif + + nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); + + nxt_conn_accept(task, lev, c); +} + + +void +nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) +{ + nxt_conn_t *next; + + nxt_sockaddr_text(c->remote); + + nxt_debug(task, "client: %*s", + c->remote->address_length, nxt_sockaddr_address(c->remote)); + + nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); + + c->read_work_queue = lev->work_queue; + c->write_work_queue = lev->work_queue; + + if (lev->listen->read_after_accept) { + + //c->socket.read_ready = 1; +// lev->listen->handler(task, c, lev->socket.data); + nxt_work_queue_add(c->read_work_queue, lev->listen->handler, + task, c, lev->socket.data); + + } else { + nxt_work_queue_add(c->write_work_queue, lev->listen->handler, + task, c, lev->socket.data); + } + + next = nxt_conn_accept_next(task, lev); + + if (next != NULL && lev->socket.read_ready) { + nxt_work_queue_add(lev->socket.read_work_queue, + lev->accept, task, lev, next); + } +} + + +static nxt_conn_t * +nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + + lev->next = NULL; + + do { + c = nxt_conn_accept_alloc(task, lev); + + if (nxt_fast_path(c != NULL)) { + return c; + } + + } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); + + nxt_log(task, NXT_LOG_CRIT, "no available connections, " + "new connections are not accepted within 1s"); + + return NULL; +} + + +static nxt_int_t +nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_queue_t *idle; + nxt_queue_link_t *link; + nxt_event_engine_t *engine; + + static nxt_log_moderation_t nxt_idle_close_log_moderation = { + NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION + }; + + engine = task->thread->engine; + + idle = &engine->idle_connections; + + for (link = nxt_queue_last(idle); + link != nxt_queue_head(idle); + link = nxt_queue_next(link)) + { + c = nxt_queue_link_data(link, nxt_conn_t, link); + + if (!c->socket.read_ready) { + nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, + task->log, "no available connections, " + "close idle connection"); + nxt_queue_remove(link); + nxt_conn_close(engine, c); + + return NXT_OK; + } + } + + nxt_timer_add(engine, &lev->timer, 1000); + + nxt_fd_event_disable_read(engine, &lev->socket); + + return NXT_DECLINED; +} + + +void +nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, + const char *accept_syscall, nxt_err_t err) +{ + static nxt_log_moderation_t nxt_accept_log_moderation = { + NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION + }; + + lev->socket.read_ready = 0; + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); + return; + + case ECONNABORTED: + nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, + task->log, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + + case EMFILE: + case ENFILE: + case ENOBUFS: + case ENOMEM: + if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " + "new connections are not accepted within 1s", + accept_syscall, lev->socket.fd, err); + } + + return; + + default: + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + } +} + + +static void +nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_listen_event_t *lev; + + timer = obj; + + lev = nxt_timer_data(timer, nxt_listen_event_t, timer); + c = lev->next; + + if (c == NULL) { + c = nxt_conn_accept_next(task, lev); + + if (c == NULL) { + return; + } + } + + nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); + + lev->accept(task, lev, c); +} + + +static void +nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_fd_event_t *ev; + + ev = obj; + + nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); +} |