/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include static nxt_int_t nxt_select_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); static void nxt_select_free(nxt_event_engine_t *engine); static void nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static nxt_bool_t nxt_select_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_error_handler(nxt_task_t *task, void *obj, void *data); static void nxt_select_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); const nxt_event_interface_t nxt_select_engine = { "select", nxt_select_create, nxt_select_free, nxt_select_enable, nxt_select_disable, nxt_select_disable, nxt_select_close, nxt_select_enable_read, nxt_select_enable_write, nxt_select_disable_read, nxt_select_disable_write, nxt_select_block_read, nxt_select_block_write, nxt_select_oneshot_read, nxt_select_oneshot_write, nxt_select_enable_read, NULL, NULL, NULL, NULL, nxt_select_poll, &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, }; static nxt_int_t nxt_select_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { engine->u.select.nfds = -1; engine->u.select.update_nfds = 0; engine->u.select.events = nxt_zalloc(FD_SETSIZE * sizeof(nxt_fd_event_t *)); if (engine->u.select.events != NULL) { return NXT_OK; } nxt_select_free(engine); return NXT_ERROR; } static void nxt_select_free(nxt_event_engine_t *engine) { nxt_debug(&engine->task, "select free"); nxt_free(engine->u.select.events); nxt_memzero(&engine->u.select, sizeof(nxt_select_engine_t)); } static void nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_select_enable_read(engine, ev); nxt_select_enable_write(engine, ev); } static void nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { nxt_select_disable_read(engine, ev); } if (ev->write != NXT_EVENT_INACTIVE) { nxt_select_disable_write(engine, ev); } } static nxt_bool_t nxt_select_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_select_disable(engine, ev); return 0; } static void nxt_select_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_fd_t fd; fd = ev->fd; nxt_debug(ev->task, "select enable read: fd:%d", fd); if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler, ev->task, ev, ev->data); return; } ev->read = NXT_EVENT_ACTIVE; FD_SET(fd, &engine->u.select.main_read_fd_set); engine->u.select.events[fd] = ev; if (engine->u.select.nfds < fd) { engine->u.select.nfds = fd; engine->u.select.update_nfds = 0; } } static void nxt_select_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_fd_t fd; fd = ev->fd; nxt_debug(ev->task, "select enable write: fd:%d", fd); if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler, ev->task, ev, ev->data); return; } ev->write = NXT_EVENT_ACTIVE; FD_SET(fd, &engine->u.select.main_write_fd_set); engine->u.select.events[fd] = ev; if (engine->u.select.nfds < fd) { engine->u.select.nfds = fd; engine->u.select.update_nfds = 0; } } static void nxt_select_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_fd_event_t *ev; ev = obj; ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; ev->error_handler(task, ev, data); } static void nxt_select_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_fd_t fd; fd = ev->fd; nxt_debug(ev->task, "select disable read: fd:%d", fd); if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { return; } FD_CLR(fd, &engine->u.select.main_read_fd_set); ev->read = NXT_EVENT_INACTIVE; if (ev->write == NXT_EVENT_INACTIVE) { engine->u.select.events[fd] = NULL; engine->u.select.update_nfds = 1; } } static void nxt_select_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_fd_t fd; fd = ev->fd; nxt_debug(ev->task, "select disable write: fd:%d", fd); if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { return; } FD_CLR(fd, &engine->u.select.main_write_fd_set); ev->write = NXT_EVENT_INACTIVE; if (ev->read == NXT_EVENT_INACTIVE) { engine->u.select.events[fd] = NULL; engine->u.select.update_nfds = 1; } } static void nxt_select_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { nxt_select_disable_read(engine, ev); } } static void nxt_select_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->write != NXT_EVENT_INACTIVE) { nxt_select_disable_write(engine, ev); } } static void nxt_select_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_select_enable_read(engine, ev); ev->read = NXT_EVENT_ONESHOT; } static void nxt_select_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_select_enable_write(engine, ev); ev->write = NXT_EVENT_ONESHOT; } static void nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) { int nevents, nfds, found; nxt_err_t err; nxt_int_t i; nxt_uint_t fd, level; nxt_fd_event_t *ev; struct timeval tv, *tp; if (timeout == NXT_INFINITE_MSEC) { tp = NULL; } else { tv.tv_sec = (long) (timeout / 1000); tv.tv_usec = (long) ((timeout % 1000) * 1000); tp = &tv; } if (engine->u.select.update_nfds) { for (i = engine->u.select.nfds; i >= 0; i--) { if (engine->u.select.events[i] != NULL) { engine->u.select.nfds = i; engine->u.select.update_nfds = 0; break; } } } engine->u.select.work_read_fd_set = engine->u.select.main_read_fd_set; engine->u.select.work_write_fd_set = engine->u.select.main_write_fd_set; nfds = engine->u.select.nfds + 1; nxt_debug(&engine->task, "select() nfds:%d timeout:%M", nfds, timeout); nevents = select(nfds, &engine->u.select.work_read_fd_set, &engine->u.select.work_write_fd_set, NULL, tp); err = (nevents == -1) ? nxt_errno : 0; nxt_thread_time_update(engine->task.thread); nxt_debug(&engine->task, "select(): %d", nevents); if (nevents == -1) { level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; nxt_log(&engine->task, level, "select() failed %E", err); return; } for (fd = 0; fd < (nxt_uint_t) nfds && nevents != 0; fd++) { found = 0; if (FD_ISSET(fd, &engine->u.select.work_read_fd_set)) { ev = engine->u.select.events[fd]; nxt_debug(ev->task, "select() fd:%ui read rd:%d wr:%d", fd, ev->read, ev->write); ev->read_ready = 1; if (ev->read == NXT_EVENT_ONESHOT) { nxt_select_disable_read(engine, ev); } nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); found = 1; } if (FD_ISSET(fd, &engine->u.select.work_write_fd_set)) { ev = engine->u.select.events[fd]; nxt_debug(ev->task, "select() fd:%ui write rd:%d wr:%d", fd, ev->read, ev->write); ev->write_ready = 1; if (ev->write == NXT_EVENT_ONESHOT) { nxt_select_disable_write(engine, ev); } nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); found = 1; } nevents -= found; } }