diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_process_chan.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_process_chan.c')
-rw-r--r-- | src/nxt_process_chan.c | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/src/nxt_process_chan.c b/src/nxt_process_chan.c index 4fcc1a15..2986f62b 100644 --- a/src/nxt_process_chan.c +++ b/src/nxt_process_chan.c @@ -9,9 +9,9 @@ #include <nxt_process_chan.h> -static void nxt_process_chan_handler(nxt_thread_t *thr, +static void nxt_process_chan_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg); -static void nxt_process_new_chan_buf_completion(nxt_thread_t *thr, void *obj, +static void nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj, void *data); @@ -25,13 +25,13 @@ nxt_process_chan_create(nxt_thread_t *thr, nxt_process_chan_t *proc, proc->chan->data = handlers; nxt_chan_write_close(proc->chan); - nxt_chan_read_enable(thr, proc->chan); + nxt_chan_read_enable(&thr->engine->task, proc->chan); } void -nxt_process_chan_write(nxt_cycle_t *cycle, nxt_uint_t type, nxt_fd_t fd, - uint32_t stream, nxt_buf_t *b) +nxt_process_chan_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { nxt_uint_t i, n; nxt_process_chan_t *proc; @@ -41,42 +41,43 @@ nxt_process_chan_write(nxt_cycle_t *cycle, nxt_uint_t type, nxt_fd_t fd, for (i = 0; i < n; i++) { if (nxt_pid != proc[i].pid) { - (void) nxt_chan_write(proc[i].chan, type, fd, stream, b); + (void) nxt_chan_write(task, proc[i].chan, type, fd, stream, b); } } } static void -nxt_process_chan_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) +nxt_process_chan_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { nxt_process_chan_handler_t *handlers; if (nxt_fast_path(msg->type <= NXT_CHAN_MSG_MAX)) { - nxt_log_debug(thr->log, "chan %d: message type:%uD", - msg->chan->socket.fd, msg->type); + nxt_debug(task, "chan %d: message type:%uD", + msg->chan->socket.fd, msg->type); handlers = msg->chan->data; - handlers[msg->type](thr, msg); + handlers[msg->type](task, msg); return; } - nxt_log_alert(thr->log, "chan %d: unknown message type:%uD", - msg->chan->socket.fd, msg->type); + nxt_log(task, NXT_LOG_CRIT, "chan %d: unknown message type:%uD", + msg->chan->socket.fd, msg->type); } void -nxt_process_chan_quit_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) +nxt_process_chan_quit_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { - nxt_cycle_quit(thr, NULL); + nxt_cycle_quit(task, NULL); } void -nxt_process_new_chan(nxt_cycle_t *cycle, nxt_process_chan_t *proc) +nxt_process_new_chan(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_process_chan_t *proc) { nxt_buf_t *b; nxt_uint_t i, n; @@ -116,14 +117,14 @@ nxt_process_new_chan(nxt_cycle_t *cycle, nxt_process_chan_t *proc) new_chan->max_size = p[i].chan->max_size; new_chan->max_share = p[i].chan->max_share; - (void) nxt_chan_write(p[i].chan, NXT_CHAN_MSG_NEW_CHAN, + (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_NEW_CHAN, proc->chan->socket.fd, 0, b); } } static void -nxt_process_new_chan_buf_completion(nxt_thread_t *thr, void *obj, void *data) +nxt_process_new_chan_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_chan_t *chan; @@ -138,7 +139,7 @@ nxt_process_new_chan_buf_completion(nxt_thread_t *thr, void *obj, void *data) void -nxt_process_chan_new_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) +nxt_process_chan_new_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { nxt_chan_t *chan; nxt_cycle_t *cycle; @@ -162,8 +163,8 @@ nxt_process_chan_new_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) new_chan = (nxt_proc_msg_new_chan_t *) msg->buf->mem.pos; msg->buf->mem.pos = msg->buf->mem.free; - nxt_log_debug(thr->log, "new chan %d received for process %PI engine %uD", - msg->fd, new_chan->pid, new_chan->engine); + nxt_debug(task, "new chan %d received for process %PI engine %uD", + msg->fd, new_chan->pid, new_chan->engine); proc->pid = new_chan->pid; proc->engine = new_chan->engine; @@ -172,13 +173,13 @@ nxt_process_chan_new_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) chan->max_share = new_chan->max_share; /* A read chan is not passed at all. */ - nxt_chan_write_enable(thr, chan); + nxt_chan_write_enable(task, chan); } void -nxt_process_chan_change_log_file(nxt_cycle_t *cycle, nxt_uint_t slot, - nxt_fd_t fd) +nxt_process_chan_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, + nxt_uint_t slot, nxt_fd_t fd) { nxt_buf_t *b; nxt_uint_t i, n; @@ -206,13 +207,14 @@ nxt_process_chan_change_log_file(nxt_cycle_t *cycle, nxt_uint_t slot, *(nxt_uint_t *) b->mem.pos = slot; b->mem.free += sizeof(nxt_uint_t); - (void) nxt_chan_write(p[i].chan, NXT_CHAN_MSG_CHANGE_FILE, fd, 0, b); + (void) nxt_chan_write(task, p[i].chan, NXT_CHAN_MSG_CHANGE_FILE, + fd, 0, b); } } void -nxt_process_chan_change_log_file_handler(nxt_thread_t *thr, +nxt_process_chan_change_log_file_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { nxt_buf_t *b; @@ -227,7 +229,7 @@ nxt_process_chan_change_log_file_handler(nxt_thread_t *thr, log_file = nxt_list_elt(cycle->log_files, slot); - nxt_log_debug(thr->log, "change log file %FD:%FD", msg->fd, log_file->fd); + nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); /* * The old log file descriptor must be closed at the moment when no @@ -244,20 +246,20 @@ nxt_process_chan_change_log_file_handler(nxt_thread_t *thr, void -nxt_process_chan_data_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) +nxt_process_chan_data_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { nxt_buf_t *b; b = msg->buf; - nxt_log_debug(thr->log, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); + nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); b->mem.pos = b->mem.free; } void -nxt_process_chan_empty_handler(nxt_thread_t *thr, nxt_chan_recv_msg_t *msg) +nxt_process_chan_empty_handler(nxt_task_t *task, nxt_chan_recv_msg_t *msg) { - nxt_log_debug(thr->log, "chan empty handler"); + nxt_debug(task, "chan empty handler"); } |