diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_chan.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_chan.c')
-rw-r--r-- | src/nxt_chan.c | 82 |
1 files changed, 46 insertions, 36 deletions
diff --git a/src/nxt_chan.c b/src/nxt_chan.c index cc6ba786..baa4b9d4 100644 --- a/src/nxt_chan.c +++ b/src/nxt_chan.c @@ -7,13 +7,13 @@ #include <nxt_main.h> -static void nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan, +static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan); static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b); -static void nxt_chan_error_handler(nxt_thread_t *thr, void *obj, void *data); +static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data); nxt_chan_t * @@ -129,13 +129,19 @@ nxt_chan_destroy(nxt_chan_t *chan) void -nxt_chan_write_enable(nxt_thread_t *thr, nxt_chan_t *chan) +nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan) { chan->socket.fd = chan->pair[1]; chan->socket.log = &nxt_main_log; chan->socket.write_ready = 1; - chan->socket.write_work_queue = &thr->work_queue.main; + chan->task.thread = task->thread; + chan->task.log = chan->socket.log; + chan->task.ident = nxt_task_next_ident(); + + chan->socket.task = &chan->task; + + chan->socket.write_work_queue = &task->thread->work_queue.main; chan->socket.write_handler = nxt_chan_write_handler; chan->socket.error_handler = nxt_chan_error_handler; } @@ -150,10 +156,9 @@ nxt_chan_write_close(nxt_chan_t *chan) nxt_int_t -nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, - nxt_buf_t *b) +nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_thread_t *thr; nxt_queue_link_t *link; nxt_chan_send_msg_t *msg; @@ -190,8 +195,7 @@ nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_queue_insert_tail(&chan->messages, &msg->link); if (chan->socket.write_ready) { - thr = nxt_thread(); - nxt_chan_write_handler(thr, chan, NULL); + nxt_chan_write_handler(task, chan, NULL); } return NXT_OK; @@ -199,7 +203,7 @@ nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, static void -nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_uint_t niob; @@ -215,7 +219,7 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data) link = nxt_queue_first(&chan->messages); if (link == nxt_queue_tail(&chan->messages)) { - nxt_event_fd_block_write(thr->engine, &chan->socket); + nxt_event_fd_block_write(task->thread->engine, &chan->socket); return; } @@ -231,7 +235,7 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data) sb.size = sizeof(nxt_chan_msg_t); sb.limit = chan->max_size; - niob = nxt_sendbuf_mem_coalesce(&sb); + niob = nxt_sendbuf_mem_coalesce(task, &sb); msg->chan_msg.last = sb.last; @@ -239,13 +243,13 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data) if (n > 0) { if (nxt_slow_path((size_t) n != sb.size)) { - nxt_log_alert(thr->log, - "chan %d: short write: %z instead of %uz", - chan->socket.fd, n, sb.size); + nxt_log(task, NXT_LOG_CRIT, + "chan %d: short write: %z instead of %uz", + chan->socket.fd, n, sb.size); goto fail; } - msg->buf = nxt_sendbuf_completion(thr, + msg->buf = nxt_sendbuf_completion(task, chan->socket.write_work_queue, msg->buf, n - sizeof(nxt_chan_msg_t)); @@ -279,30 +283,36 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data) } while (chan->socket.write_ready); if (nxt_event_fd_is_disabled(chan->socket.write)) { - nxt_event_fd_enable_write(thr->engine, &chan->socket); + nxt_event_fd_enable_write(task->thread->engine, &chan->socket); } return; fail: - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_chan_error_handler, - &chan->socket, NULL, chan->socket.log); + nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main, + nxt_chan_error_handler, task, &chan->socket, + NULL); } void -nxt_chan_read_enable(nxt_thread_t *thr, nxt_chan_t *chan) +nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan) { chan->socket.fd = chan->pair[0]; chan->socket.log = &nxt_main_log; - chan->socket.read_work_queue = &thr->work_queue.main; + chan->task.thread = task->thread; + chan->task.log = chan->socket.log; + chan->task.ident = nxt_task_next_ident(); + + chan->socket.task = &chan->task; + + chan->socket.read_work_queue = &task->thread->work_queue.main; chan->socket.read_handler = nxt_chan_read_handler; chan->socket.error_handler = nxt_chan_error_handler; - nxt_event_fd_enable_read(thr->engine, &chan->socket); + nxt_event_fd_enable_read(task->thread->engine, &chan->socket); } @@ -315,7 +325,7 @@ nxt_chan_read_close(nxt_chan_t *chan) static void -nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_fd_t fd; @@ -340,7 +350,7 @@ nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data) n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2); if (n > 0) { - nxt_chan_read_msg_process(thr, chan, &msg, fd, b, n); + nxt_chan_read_msg_process(task, chan, &msg, fd, b, n); if (b->mem.pos == b->mem.free) { @@ -362,30 +372,30 @@ nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data) if (n == NXT_AGAIN) { nxt_chan_buf_free(chan, b); - nxt_event_fd_enable_read(thr->engine, &chan->socket); + nxt_event_fd_enable_read(task->thread->engine, &chan->socket); return; } /* n == 0 || n == NXT_ERROR */ - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_chan_error_handler, - &chan->socket, NULL, chan->socket.log); + nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main, + nxt_chan_error_handler, task, + &chan->socket, NULL); return; } } static void -nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan, +nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) { nxt_buf_t *sync; nxt_chan_recv_msg_t recv_msg; if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) { - nxt_log_alert(chan->socket.log, "chan %d: too small message:%uz", - chan->socket.fd, size); + nxt_log(chan->socket.task, NXT_LOG_CRIT, + "chan %d: too small message:%uz", chan->socket.fd, size); goto fail; } @@ -406,7 +416,7 @@ nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan, b->next = sync; } - chan->handler(thr, &recv_msg); + chan->handler(task, &recv_msg); return; @@ -450,7 +460,7 @@ nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b) static void -nxt_chan_error_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data) { /* TODO */ } |