summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_kqueue.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_kqueue.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_kqueue.c')
-rw-r--r--src/nxt_kqueue.c134
1 files changed, 68 insertions, 66 deletions
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue.c
index 25d9eefe..7370a401 100644
--- a/src/nxt_kqueue.c
+++ b/src/nxt_kqueue.c
@@ -87,9 +87,9 @@ static void nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
static struct kevent *nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks);
static void nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks);
static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks);
-static void nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
void *data);
static nxt_int_t nxt_kqueue_add_signal(nxt_kqueue_event_set_t *kq,
const nxt_event_sig_t *sigev);
@@ -98,17 +98,17 @@ static nxt_int_t nxt_kqueue_enable_post(nxt_event_set_t *event_set,
nxt_work_handler_t handler);
static void nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo);
#endif
-static void nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
+static void nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_msec_t timeout);
-static void nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj,
+static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj,
void *data);
static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c,
nxt_buf_t *b);
@@ -547,14 +547,14 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
ev = nxt_kevent_get_udata(kev->udata);
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_kqueue_fd_error_handler,
- ev, ev->data, ev->log);
+ ev->task, ev, ev->data);
break;
case EVFILT_VNODE:
fev = nxt_kevent_get_udata(kev->udata);
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_kqueue_file_error_handler,
- fev, fev->data, thr->log);
+ fev->task, fev, fev->data);
break;
}
}
@@ -562,7 +562,7 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
static void
-nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_event_fd_t *ev;
@@ -570,27 +570,27 @@ nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, void *data)
if (ev->kq_eof && ev->kq_errno != 0) {
ev->error = ev->kq_errno;
- nxt_log_error(nxt_socket_error_level(ev->kq_errno, ev->log_error),
- thr->log, "kevent() reported error on descriptor %d %E",
- ev->fd, ev->kq_errno);
+ nxt_log(task, nxt_socket_error_level(ev->kq_errno, ev->log_error),
+ "kevent() reported error on descriptor %d %E",
+ ev->fd, ev->kq_errno);
}
ev->read = NXT_EVENT_INACTIVE;
ev->write = NXT_EVENT_INACTIVE;
ev->error = ev->kq_errno;
- ev->error_handler(thr, ev, data);
+ ev->error_handler(task, ev, data);
}
static void
-nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_event_file_t *ev;
ev = obj;
- ev->handler(thr, ev, data);
+ ev->handler(task, ev, data);
}
@@ -696,16 +696,16 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
static void
-nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
+nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_msec_t timeout)
{
int nevents;
void *obj, *data;
nxt_int_t i;
nxt_err_t err;
- nxt_log_t *log;
nxt_uint_t level;
nxt_bool_t error, eof;
+ nxt_task_t *event_task;
struct kevent *kev;
nxt_event_fd_t *ev;
nxt_event_sig_t *sigev;
@@ -726,21 +726,21 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
ks = &event_set->kqueue;
- nxt_log_debug(thr->log, "kevent(%d) changes:%d timeout:%M",
- ks->kqueue, ks->nchanges, timeout);
+ nxt_debug(task, "kevent(%d) changes:%d timeout:%M",
+ ks->kqueue, ks->nchanges, timeout);
nevents = kevent(ks->kqueue, ks->changes, ks->nchanges,
ks->events, ks->mevents, tp);
err = (nevents == -1) ? nxt_errno : 0;
- nxt_thread_time_update(thr);
+ nxt_thread_time_update(task->thread);
- nxt_log_debug(thr->log, "kevent(%d): %d", ks->kqueue, nevents);
+ nxt_debug(task, "kevent(%d): %d", ks->kqueue, nevents);
if (nevents == -1) {
level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
- nxt_log_error(level, thr->log, "kevent(%d) failed %E", ks->kqueue, err);
+ nxt_log(task, level, "kevent(%d) failed %E", ks->kqueue, err);
nxt_kqueue_error(ks);
return;
@@ -752,24 +752,25 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
kev = &ks->events[i];
- nxt_log_debug(thr->log,
- (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
- "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
- "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
- kev->ident, kev->filter, kev->flags, kev->fflags,
- kev->data, kev->udata);
+ nxt_debug(task,
+ (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
+ "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
+ "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
+ kev->ident, kev->filter, kev->flags, kev->fflags,
+ kev->data, kev->udata);
error = (kev->flags & EV_ERROR);
if (nxt_slow_path(error)) {
- nxt_log_alert(thr->log, "kevent(%d) error %E on ident:%d filter:%d",
- ks->kqueue, kev->data, kev->ident, kev->filter);
+ nxt_log(task, NXT_LOG_CRIT,
+ "kevent(%d) error %E on ident:%d filter:%d",
+ ks->kqueue, kev->data, kev->ident, kev->filter);
}
- wq = &thr->work_queue.main;
+ event_task = task;
+ wq = &task->thread->work_queue.main;
handler = nxt_kqueue_fd_error_handler;
obj = nxt_kevent_get_udata(kev->udata);
- log = thr->log;
switch (kev->filter) {
@@ -783,7 +784,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
ev->kq_eof = eof;
if (ev->read == NXT_EVENT_BLOCKED) {
- nxt_log_debug(ev->log, "blocked read event fd:%d", ev->fd);
+ nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
continue;
}
@@ -800,8 +801,8 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
wq = ev->read_work_queue;
}
+ event_task = ev->task;
data = ev->data;
- log = ev->log;
break;
@@ -814,7 +815,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
ev->kq_eof = eof;
if (ev->write == NXT_EVENT_BLOCKED) {
- nxt_log_debug(ev->log, "blocked write event fd:%d", ev->fd);
+ nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
continue;
}
@@ -831,14 +832,15 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
wq = ev->write_work_queue;
}
+ event_task = ev->task;
data = ev->data;
- log = ev->log;
break;
case EVFILT_VNODE:
fev = obj;
handler = fev->handler;
+ event_task = fev->task;
data = fev->data;
break;
@@ -861,15 +863,16 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
default:
#if (NXT_DEBUG)
- nxt_log_alert(thr->log,
- "unexpected kevent(%d) filter %d on ident %d",
- ks->kqueue, kev->filter, kev->ident);
+ nxt_log(task, NXT_LOG_CRIT,
+ "unexpected kevent(%d) filter %d on ident %d",
+ ks->kqueue, kev->filter, kev->ident);
#endif
continue;
}
- nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
+ nxt_thread_work_queue_add(task->thread, wq, handler,
+ event_task, obj, data);
}
}
@@ -880,7 +883,7 @@ nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
*/
static void
-nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_work_handler_t handler;
@@ -901,9 +904,9 @@ nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
c->socket.write_handler = nxt_kqueue_event_conn_connected;
c->socket.error_handler = nxt_event_conn_connect_error;
- nxt_event_conn_timer(thr->engine, c, state, &c->write_timer);
+ nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer);
- nxt_kqueue_enable_write(thr->engine->event_set, &c->socket);
+ nxt_kqueue_enable_write(task->thread->engine->event_set, &c->socket);
return;
case NXT_DECLINED:
@@ -915,18 +918,19 @@ nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task,
+ c, data);
}
static void
-nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
- nxt_log_debug(thr->log, "kqueue event conn connected fd:%d", c->socket.fd);
+ nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd);
c->socket.write = NXT_EVENT_BLOCKED;
@@ -934,30 +938,29 @@ nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, void *data)
nxt_event_timer_disable(&c->write_timer);
}
- nxt_thread_work_queue_add(thr, c->write_work_queue,
- c->write_state->ready_handler,
- c, data, c->socket.log);
+ nxt_thread_work_queue_add(task->thread, c->write_work_queue,
+ c->write_state->ready_handler, task, c, data);
}
static void
-nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_listen_t *cls;
cls = obj;
- nxt_log_debug(thr->log, "kevent fd:%d avail:%D",
- cls->socket.fd, cls->socket.kq_available);
+ nxt_debug(task, "kevent fd:%d avail:%D",
+ cls->socket.fd, cls->socket.kq_available);
cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
- nxt_kqueue_event_conn_io_accept(thr, cls, data);
+ nxt_kqueue_event_conn_io_accept(task, cls, data);
}
static void
-nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
{
socklen_t len;
nxt_socket_t s;
@@ -989,13 +992,13 @@ nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data)
if (s != -1) {
c->socket.fd = s;
- nxt_log_debug(thr->log, "accept(%d): %d", cls->socket.fd, s);
+ nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);
- nxt_event_conn_accept(thr, cls, c);
+ nxt_event_conn_accept(task, cls, c);
return;
}
- nxt_event_conn_accept_error(thr, cls, "accept", nxt_errno);
+ nxt_event_conn_accept_error(task, cls, "accept", nxt_errno);
}
@@ -1005,25 +1008,24 @@ nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data)
*/
static void
-nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj, void *data)
+nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
- nxt_log_debug(thr->log, "kqueue event conn read fd:%d", c->socket.fd);
+ nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd);
if (c->socket.kq_available == 0 && c->socket.kq_eof) {
- nxt_log_debug(thr->log, "kevent fd:%d eof", c->socket.fd);
+ nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
c->socket.closed = 1;
- nxt_thread_work_queue_add(thr, c->read_work_queue,
- c->read_state->close_handler,
- c, data, c->socket.log);
+ nxt_thread_work_queue_add(task->thread, c->read_work_queue,
+ c->read_state->close_handler, task, c, data);
return;
}
- nxt_event_conn_io_read(thr, c, data);
+ nxt_event_conn_io_read(task, c, data);
}