diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_kqueue.c | 134 |
1 files changed, 68 insertions, 66 deletions
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue.c index 25d9eefe..7370a401 100644 --- a/src/nxt_kqueue.c +++ b/src/nxt_kqueue.c @@ -87,9 +87,9 @@ static void nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, static struct kevent *nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks); static void nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks); static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks); -static void nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data); static nxt_int_t nxt_kqueue_add_signal(nxt_kqueue_event_set_t *kq, const nxt_event_sig_t *sigev); @@ -98,17 +98,17 @@ static nxt_int_t nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler); static void nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo); #endif -static void nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, +static void nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout); -static void nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj, +static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data); static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); @@ -547,14 +547,14 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks) ev = nxt_kevent_get_udata(kev->udata); nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_kqueue_fd_error_handler, - ev, ev->data, ev->log); + ev->task, ev, ev->data); break; case EVFILT_VNODE: fev = nxt_kevent_get_udata(kev->udata); nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_kqueue_file_error_handler, - fev, fev->data, thr->log); + fev->task, fev, fev->data); break; } } @@ -562,7 +562,7 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks) static void -nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_fd_t *ev; @@ -570,27 +570,27 @@ nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, void *data) if (ev->kq_eof && ev->kq_errno != 0) { ev->error = ev->kq_errno; - nxt_log_error(nxt_socket_error_level(ev->kq_errno, ev->log_error), - thr->log, "kevent() reported error on descriptor %d %E", - ev->fd, ev->kq_errno); + nxt_log(task, nxt_socket_error_level(ev->kq_errno, ev->log_error), + "kevent() reported error on descriptor %d %E", + ev->fd, ev->kq_errno); } ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; ev->error = ev->kq_errno; - ev->error_handler(thr, ev, data); + ev->error_handler(task, ev, data); } static void -nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_file_t *ev; ev = obj; - ev->handler(thr, ev, data); + ev->handler(task, ev, data); } @@ -696,16 +696,16 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo) static void -nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, +nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout) { int nevents; void *obj, *data; nxt_int_t i; nxt_err_t err; - nxt_log_t *log; nxt_uint_t level; nxt_bool_t error, eof; + nxt_task_t *event_task; struct kevent *kev; nxt_event_fd_t *ev; nxt_event_sig_t *sigev; @@ -726,21 +726,21 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, ks = &event_set->kqueue; - nxt_log_debug(thr->log, "kevent(%d) changes:%d timeout:%M", - ks->kqueue, ks->nchanges, timeout); + nxt_debug(task, "kevent(%d) changes:%d timeout:%M", + ks->kqueue, ks->nchanges, timeout); nevents = kevent(ks->kqueue, ks->changes, ks->nchanges, ks->events, ks->mevents, tp); err = (nevents == -1) ? nxt_errno : 0; - nxt_thread_time_update(thr); + nxt_thread_time_update(task->thread); - nxt_log_debug(thr->log, "kevent(%d): %d", ks->kqueue, nevents); + nxt_debug(task, "kevent(%d): %d", ks->kqueue, nevents); if (nevents == -1) { level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log_error(level, thr->log, "kevent(%d) failed %E", ks->kqueue, err); + nxt_log(task, level, "kevent(%d) failed %E", ks->kqueue, err); nxt_kqueue_error(ks); return; @@ -752,24 +752,25 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, kev = &ks->events[i]; - nxt_log_debug(thr->log, - (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? - "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": - "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", - kev->ident, kev->filter, kev->flags, kev->fflags, - kev->data, kev->udata); + nxt_debug(task, + (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? + "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": + "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", + kev->ident, kev->filter, kev->flags, kev->fflags, + kev->data, kev->udata); error = (kev->flags & EV_ERROR); if (nxt_slow_path(error)) { - nxt_log_alert(thr->log, "kevent(%d) error %E on ident:%d filter:%d", - ks->kqueue, kev->data, kev->ident, kev->filter); + nxt_log(task, NXT_LOG_CRIT, + "kevent(%d) error %E on ident:%d filter:%d", + ks->kqueue, kev->data, kev->ident, kev->filter); } - wq = &thr->work_queue.main; + event_task = task; + wq = &task->thread->work_queue.main; handler = nxt_kqueue_fd_error_handler; obj = nxt_kevent_get_udata(kev->udata); - log = thr->log; switch (kev->filter) { @@ -783,7 +784,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, ev->kq_eof = eof; if (ev->read == NXT_EVENT_BLOCKED) { - nxt_log_debug(ev->log, "blocked read event fd:%d", ev->fd); + nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); continue; } @@ -800,8 +801,8 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, wq = ev->read_work_queue; } + event_task = ev->task; data = ev->data; - log = ev->log; break; @@ -814,7 +815,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, ev->kq_eof = eof; if (ev->write == NXT_EVENT_BLOCKED) { - nxt_log_debug(ev->log, "blocked write event fd:%d", ev->fd); + nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); continue; } @@ -831,14 +832,15 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, wq = ev->write_work_queue; } + event_task = ev->task; data = ev->data; - log = ev->log; break; case EVFILT_VNODE: fev = obj; handler = fev->handler; + event_task = fev->task; data = fev->data; break; @@ -861,15 +863,16 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, default: #if (NXT_DEBUG) - nxt_log_alert(thr->log, - "unexpected kevent(%d) filter %d on ident %d", - ks->kqueue, kev->filter, kev->ident); + nxt_log(task, NXT_LOG_CRIT, + "unexpected kevent(%d) filter %d on ident %d", + ks->kqueue, kev->filter, kev->ident); #endif continue; } - nxt_thread_work_queue_add(thr, wq, handler, obj, data, log); + nxt_thread_work_queue_add(task->thread, wq, handler, + event_task, obj, data); } } @@ -880,7 +883,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, */ static void -nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; @@ -901,9 +904,9 @@ nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) c->socket.write_handler = nxt_kqueue_event_conn_connected; c->socket.error_handler = nxt_event_conn_connect_error; - nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer); - nxt_kqueue_enable_write(thr->engine->event_set, &c->socket); + nxt_kqueue_enable_write(task->thread->engine->event_set, &c->socket); return; case NXT_DECLINED: @@ -915,18 +918,19 @@ nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) break; } - nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task, + c, data); } static void -nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_log_debug(thr->log, "kqueue event conn connected fd:%d", c->socket.fd); + nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd); c->socket.write = NXT_EVENT_BLOCKED; @@ -934,30 +938,29 @@ nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) nxt_event_timer_disable(&c->write_timer); } - nxt_thread_work_queue_add(thr, c->write_work_queue, - c->write_state->ready_handler, - c, data, c->socket.log); + nxt_thread_work_queue_add(task->thread, c->write_work_queue, + c->write_state->ready_handler, task, c, data); } static void -nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_listen_t *cls; cls = obj; - nxt_log_debug(thr->log, "kevent fd:%d avail:%D", - cls->socket.fd, cls->socket.kq_available); + nxt_debug(task, "kevent fd:%d avail:%D", + cls->socket.fd, cls->socket.kq_available); cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); - nxt_kqueue_event_conn_io_accept(thr, cls, data); + nxt_kqueue_event_conn_io_accept(task, cls, data); } static void -nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) { socklen_t len; nxt_socket_t s; @@ -989,13 +992,13 @@ nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) if (s != -1) { c->socket.fd = s; - nxt_log_debug(thr->log, "accept(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); - nxt_event_conn_accept(thr, cls, c); + nxt_event_conn_accept(task, cls, c); return; } - nxt_event_conn_accept_error(thr, cls, "accept", nxt_errno); + nxt_event_conn_accept_error(task, cls, "accept", nxt_errno); } @@ -1005,25 +1008,24 @@ nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) */ static void -nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj, void *data) +nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_log_debug(thr->log, "kqueue event conn read fd:%d", c->socket.fd); + nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd); if (c->socket.kq_available == 0 && c->socket.kq_eof) { - nxt_log_debug(thr->log, "kevent fd:%d eof", c->socket.fd); + nxt_debug(task, "kevent fd:%d eof", c->socket.fd); c->socket.closed = 1; - nxt_thread_work_queue_add(thr, c->read_work_queue, - c->read_state->close_handler, - c, data, c->socket.log); + nxt_thread_work_queue_add(task->thread, c->read_work_queue, + c->read_state->close_handler, task, c, data); return; } - nxt_event_conn_io_read(thr, c, data); + nxt_event_conn_io_read(task, c, data); } |