diff options
-rw-r--r-- | src/nxt_h1proto.c | 4 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 65 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 63 | ||||
-rw-r--r-- | src/nxt_sendbuf.h | 2 |
4 files changed, 67 insertions, 67 deletions
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index e8d8e472..a216dafb 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -845,8 +845,8 @@ nxt_h1p_sent(nxt_task_t *task, void *obj, void *data) engine = task->thread->engine; - c->write = nxt_sendbuf_completion0(task, &engine->fast_work_queue, - c->write); + c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write); + if (c->write != NULL) { nxt_conn_write(engine, c); } diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 05a4845f..e688530e 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -8,6 +8,8 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); +static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, + nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); @@ -379,7 +381,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; } - msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size, + msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { @@ -462,6 +464,67 @@ unlock_mutex: } +static nxt_buf_t * +nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, + size_t sent, nxt_bool_t mmap_mode) +{ + size_t size; + + while (b != NULL) { + + nxt_prefetch(b->next); + + if (!nxt_buf_is_sync(b)) { + + size = nxt_buf_used_size(b); + + if (size != 0) { + + if (sent == 0) { + break; + } + + if (nxt_buf_is_port_mmap(b) && mmap_mode) { + /* + * buffer has been sent to other side which is now + * responsible for shared memory bucket release + */ + b->is_port_mmap_sent = 1; + } + + if (sent < size) { + + if (nxt_buf_is_mem(b)) { + b->mem.pos += sent; + } + + if (nxt_buf_is_file(b)) { + b->file_pos += sent; + } + + break; + } + + /* b->mem.free is NULL in file-only buffer. */ + b->mem.pos = b->mem.free; + + if (nxt_buf_is_file(b)) { + b->file_pos = b->file_end; + } + + sent -= size; + } + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + + b = b->next; + } + + return b; +} + + void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) { diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 704236ca..ed6fca36 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -376,68 +376,7 @@ nxt_sendbuf_update(nxt_buf_t *b, size_t sent) nxt_buf_t * -nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, - size_t sent, nxt_bool_t mmap_mode) -{ - size_t size; - - while (b != NULL) { - - nxt_prefetch(b->next); - - if (!nxt_buf_is_sync(b)) { - - size = nxt_buf_used_size(b); - - if (size != 0) { - - if (sent == 0) { - break; - } - - if (nxt_buf_is_port_mmap(b) && mmap_mode) { - /* - * buffer has been sent to other side which is now - * responsible for shared memory bucket release - */ - b->is_port_mmap_sent = 1; - } - - if (sent < size) { - - if (nxt_buf_is_mem(b)) { - b->mem.pos += sent; - } - - if (nxt_buf_is_file(b)) { - b->file_pos += sent; - } - - break; - } - - /* b->mem.free is NULL in file-only buffer. */ - b->mem.pos = b->mem.free; - - if (nxt_buf_is_file(b)) { - b->file_pos = b->file_end; - } - - sent -= size; - } - } - - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; - } - - return b; -} - - -nxt_buf_t * -nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) +nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 0406faf6..46d2a0fe 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -123,8 +123,6 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, - nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); -nxt_buf_t *nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b); |