summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_chan.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_chan.c82
1 files changed, 46 insertions, 36 deletions
diff --git a/src/nxt_chan.c b/src/nxt_chan.c
index cc6ba786..baa4b9d4 100644
--- a/src/nxt_chan.c
+++ b/src/nxt_chan.c
@@ -7,13 +7,13 @@
#include <nxt_main.h>
-static void nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan,
+static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan);
static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b);
-static void nxt_chan_error_handler(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_chan_t *
@@ -129,13 +129,19 @@ nxt_chan_destroy(nxt_chan_t *chan)
void
-nxt_chan_write_enable(nxt_thread_t *thr, nxt_chan_t *chan)
+nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)
{
chan->socket.fd = chan->pair[1];
chan->socket.log = &nxt_main_log;
chan->socket.write_ready = 1;
- chan->socket.write_work_queue = &thr->work_queue.main;
+ chan->task.thread = task->thread;
+ chan->task.log = chan->socket.log;
+ chan->task.ident = nxt_task_next_ident();
+
+ chan->socket.task = &chan->task;
+
+ chan->socket.write_work_queue = &task->thread->work_queue.main;
chan->socket.write_handler = nxt_chan_write_handler;
chan->socket.error_handler = nxt_chan_error_handler;
}
@@ -150,10 +156,9 @@ nxt_chan_write_close(nxt_chan_t *chan)
nxt_int_t
-nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream,
- nxt_buf_t *b)
+nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
- nxt_thread_t *thr;
nxt_queue_link_t *link;
nxt_chan_send_msg_t *msg;
@@ -190,8 +195,7 @@ nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream,
nxt_queue_insert_tail(&chan->messages, &msg->link);
if (chan->socket.write_ready) {
- thr = nxt_thread();
- nxt_chan_write_handler(thr, chan, NULL);
+ nxt_chan_write_handler(task, chan, NULL);
}
return NXT_OK;
@@ -199,7 +203,7 @@ nxt_chan_write(nxt_chan_t *chan, nxt_uint_t type, nxt_fd_t fd, uint32_t stream,
static void
-nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_uint_t niob;
@@ -215,7 +219,7 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data)
link = nxt_queue_first(&chan->messages);
if (link == nxt_queue_tail(&chan->messages)) {
- nxt_event_fd_block_write(thr->engine, &chan->socket);
+ nxt_event_fd_block_write(task->thread->engine, &chan->socket);
return;
}
@@ -231,7 +235,7 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data)
sb.size = sizeof(nxt_chan_msg_t);
sb.limit = chan->max_size;
- niob = nxt_sendbuf_mem_coalesce(&sb);
+ niob = nxt_sendbuf_mem_coalesce(task, &sb);
msg->chan_msg.last = sb.last;
@@ -239,13 +243,13 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data)
if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size)) {
- nxt_log_alert(thr->log,
- "chan %d: short write: %z instead of %uz",
- chan->socket.fd, n, sb.size);
+ nxt_log(task, NXT_LOG_CRIT,
+ "chan %d: short write: %z instead of %uz",
+ chan->socket.fd, n, sb.size);
goto fail;
}
- msg->buf = nxt_sendbuf_completion(thr,
+ msg->buf = nxt_sendbuf_completion(task,
chan->socket.write_work_queue,
msg->buf,
n - sizeof(nxt_chan_msg_t));
@@ -279,30 +283,36 @@ nxt_chan_write_handler(nxt_thread_t *thr, void *obj, void *data)
} while (chan->socket.write_ready);
if (nxt_event_fd_is_disabled(chan->socket.write)) {
- nxt_event_fd_enable_write(thr->engine, &chan->socket);
+ nxt_event_fd_enable_write(task->thread->engine, &chan->socket);
}
return;
fail:
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_chan_error_handler,
- &chan->socket, NULL, chan->socket.log);
+ nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
+ nxt_chan_error_handler, task, &chan->socket,
+ NULL);
}
void
-nxt_chan_read_enable(nxt_thread_t *thr, nxt_chan_t *chan)
+nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)
{
chan->socket.fd = chan->pair[0];
chan->socket.log = &nxt_main_log;
- chan->socket.read_work_queue = &thr->work_queue.main;
+ chan->task.thread = task->thread;
+ chan->task.log = chan->socket.log;
+ chan->task.ident = nxt_task_next_ident();
+
+ chan->socket.task = &chan->task;
+
+ chan->socket.read_work_queue = &task->thread->work_queue.main;
chan->socket.read_handler = nxt_chan_read_handler;
chan->socket.error_handler = nxt_chan_error_handler;
- nxt_event_fd_enable_read(thr->engine, &chan->socket);
+ nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
}
@@ -315,7 +325,7 @@ nxt_chan_read_close(nxt_chan_t *chan)
static void
-nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_fd_t fd;
@@ -340,7 +350,7 @@ nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data)
n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2);
if (n > 0) {
- nxt_chan_read_msg_process(thr, chan, &msg, fd, b, n);
+ nxt_chan_read_msg_process(task, chan, &msg, fd, b, n);
if (b->mem.pos == b->mem.free) {
@@ -362,30 +372,30 @@ nxt_chan_read_handler(nxt_thread_t *thr, void *obj, void *data)
if (n == NXT_AGAIN) {
nxt_chan_buf_free(chan, b);
- nxt_event_fd_enable_read(thr->engine, &chan->socket);
+ nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
return;
}
/* n == 0 || n == NXT_ERROR */
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_chan_error_handler,
- &chan->socket, NULL, chan->socket.log);
+ nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
+ nxt_chan_error_handler, task,
+ &chan->socket, NULL);
return;
}
}
static void
-nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan,
+nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
{
nxt_buf_t *sync;
nxt_chan_recv_msg_t recv_msg;
if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) {
- nxt_log_alert(chan->socket.log, "chan %d: too small message:%uz",
- chan->socket.fd, size);
+ nxt_log(chan->socket.task, NXT_LOG_CRIT,
+ "chan %d: too small message:%uz", chan->socket.fd, size);
goto fail;
}
@@ -406,7 +416,7 @@ nxt_chan_read_msg_process(nxt_thread_t *thr, nxt_chan_t *chan,
b->next = sync;
}
- chan->handler(thr, &recv_msg);
+ chan->handler(task, &recv_msg);
return;
@@ -450,7 +460,7 @@ nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b)
static void
-nxt_chan_error_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data)
{
/* TODO */
}