/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data); static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data); static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data); nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_io_connect, nxt_event_conn_io_accept, nxt_event_conn_io_read, nxt_event_conn_io_recvbuf, nxt_event_conn_io_recv, nxt_event_conn_io_write, nxt_event_conn_io_write_chunk, #if (NXT_HAVE_LINUX_SENDFILE) nxt_linux_event_conn_io_sendfile, #elif (NXT_HAVE_FREEBSD_SENDFILE) nxt_freebsd_event_conn_io_sendfile, #elif (NXT_HAVE_MACOSX_SENDFILE) nxt_macosx_event_conn_io_sendfile, #elif (NXT_HAVE_SOLARIS_SENDFILEV) nxt_solaris_event_conn_io_sendfilev, #elif (NXT_HAVE_AIX_SEND_FILE) nxt_aix_event_conn_io_send_file, #elif (NXT_HAVE_HPUX_SENDFILE) nxt_hpux_event_conn_io_sendfile, #else nxt_event_conn_io_sendbuf, #endif nxt_event_conn_io_writev, nxt_event_conn_io_send, nxt_event_conn_io_shutdown, }; nxt_event_conn_t * nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) { nxt_thread_t *thr; nxt_event_conn_t *c; c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); if (nxt_slow_path(c == NULL)) { return NULL; } c->mem_pool = mp; c->socket.fd = -1; c->socket.log = &c->log; c->log = *log; /* The while loop skips possible uint32_t overflow. */ while (c->log.ident == 0) { c->log.ident = nxt_task_next_ident(); } thr = nxt_thread(); thr->engine->connections++; c->task.thread = thr; c->task.log = &c->log; c->task.ident = c->log.ident; c->socket.task = &c->task; c->read_timer.task = &c->task; c->write_timer.task = &c->task; c->io = thr->engine->event.io; c->max_chunk = NXT_INT32_T_MAX; c->sendfile = NXT_CONN_SENDFILE_UNSET; c->socket.read_work_queue = &thr->engine->fast_work_queue; c->socket.write_work_queue = &thr->engine->fast_work_queue; nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); return c; } void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) { int ret; socklen_t len; struct linger linger; nxt_event_conn_t *c; c = obj; nxt_debug(task, "event conn shutdown"); if (c->socket.timedout) { /* * Resetting of timed out connection on close * releases kernel memory associated with socket. * This also causes sending TCP/IP RST to a peer. */ linger.l_onoff = 1; linger.l_linger = 0; len = sizeof(struct linger); ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); if (nxt_slow_path(ret != 0)) { nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E", c->socket.fd, nxt_socket_errno); } } c->write_state->close_handler(task, c, data); } void nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) { int ret; socklen_t len; struct linger linger; nxt_work_queue_t *wq; nxt_work_handler_t handler; if (c->socket.timedout) { /* * Resetting of timed out connection on close * releases kernel memory associated with socket. * This also causes sending TCP/IP RST to a peer. */ linger.l_onoff = 1; linger.l_linger = 0; len = sizeof(struct linger); ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); if (nxt_slow_path(ret != 0)) { nxt_log(c->socket.task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E", c->socket.fd, nxt_socket_errno); } } if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) { wq = &engine->shutdown_work_queue; handler = nxt_conn_shutdown_handler; } else{ wq = &engine->close_work_queue; handler = nxt_conn_close_handler; } nxt_work_queue_add(wq, handler, c->socket.task, c, engine); } static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_engine_t *engine; c = obj; engine = data; nxt_debug(task, "event conn shutdown fd:%d", c->socket.fd); c->socket.shutdown = 1; nxt_socket_shutdown(c->socket.fd, SHUT_RDWR); nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler, task, c, engine); } static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) { nxt_uint_t events_pending, timers_pending; nxt_event_conn_t *c; nxt_event_engine_t *engine; c = obj; engine = data; nxt_debug(task, "event conn close fd:%d", c->socket.fd); timers_pending = nxt_timer_delete(engine, &c->read_timer); timers_pending += nxt_timer_delete(engine, &c->write_timer); events_pending = nxt_fd_event_close(engine, &c->socket); if (events_pending == 0) { nxt_socket_close(c->socket.fd); c->socket.fd = -1; if (timers_pending == 0) { nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler, task, c, c->socket.data); return; } } c->write_timer.handler = nxt_conn_close_timer_handler; c->write_timer.work_queue = &engine->fast_work_queue; nxt_timer_add(engine, &c->write_timer, 0); } static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) { nxt_timer_t *ev; nxt_event_conn_t *c; nxt_event_engine_t *engine; ev = obj; c = nxt_event_write_timer_conn(ev); nxt_debug(task, "event conn close handler fd:%d", c->socket.fd); if (c->socket.fd != -1) { nxt_socket_close(c->socket.fd); c->socket.fd = -1; } engine = task->thread->engine; nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler, task, c, c->socket.data); } void nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, const nxt_event_conn_state_t *state, nxt_timer_t *tev) { nxt_msec_t timer; if (state->timer_value != NULL) { timer = state->timer_value(c, state->timer_data); if (timer != 0) { tev->handler = state->timer_handler; nxt_timer_add(engine, tev, timer); } } } void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq) { c->read_work_queue = wq; c->write_work_queue = wq; c->read_timer.work_queue = wq; c->write_timer.work_queue = wq; }