diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_socket.c | 104 |
1 files changed, 71 insertions, 33 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 08f87e84..6d762bbd 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -223,8 +223,9 @@ nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) nxt_int_t -nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, + void *tracking) { nxt_port_send_msg_t msg, *res; @@ -236,6 +237,10 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.share = 0; + if (tracking != NULL) { + nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); + } + msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; @@ -244,6 +249,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; + msg.port_msg.tracking = tracking != NULL; msg.work.data = NULL; @@ -337,6 +343,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) port->max_size / PORT_MMAP_MIN_SIZE); } + if (msg->port_msg.tracking) { + iov[0].iov_len += sizeof(msg->tracking_msg); + } + nxt_sendbuf_mem_coalesce(task, &sb); plain_size = sb.size; @@ -385,6 +395,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; msg->share += n; msg->port_msg.nf = 1; + msg->port_msg.tracking = 0; if (msg->share >= port->max_share) { msg->share = 0; @@ -677,7 +688,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { nxt_log(task, NXT_LOG_CRIT, "port %d: too small message:%uz", port->socket.fd, msg->size); - goto fail; + + if (msg->fd != -1) { + nxt_fd_close(msg->fd); + } + + return; } /* adjust size to actual buffer used size */ @@ -686,52 +702,82 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = orig_b = msg->buf; b->mem.free += msg->size; - if (msg->port_msg.mmap) { - nxt_port_mmap_read(task, port, msg); - b = msg->buf; + if (msg->port_msg.tracking) { + msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; + + } else { + msg->cancelled = 0; } if (nxt_slow_path(msg->port_msg.nf != 0)) { + fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, msg->port_msg.mf == 0); - if (nxt_slow_path(fmsg == NULL)) { - nxt_assert(fmsg != NULL); - } + nxt_assert(fmsg != NULL); - nxt_buf_chain_add(&fmsg->buf, msg->buf); + if (nxt_fast_path(fmsg->cancelled == 0)) { - fmsg->size += msg->size; + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, msg); + b = msg->buf; + } - msg->buf = NULL; - b = NULL; + nxt_buf_chain_add(&fmsg->buf, msg->buf); - if (nxt_fast_path(msg->port_msg.mf == 0)) { - b = fmsg->buf; + fmsg->size += msg->size; + msg->buf = NULL; + b = NULL; - port->handler(task, fmsg); + if (nxt_fast_path(msg->port_msg.mf == 0)) { - msg->buf = fmsg->buf; - msg->fd = fmsg->fd; + b = fmsg->buf; + port->handler(task, fmsg); + + msg->buf = fmsg->buf; + msg->fd = fmsg->fd; + } + } + + if (nxt_fast_path(msg->port_msg.mf == 0)) { nxt_mp_free(port->mem_pool, fmsg); } } else { if (nxt_slow_path(msg->port_msg.mf != 0)) { - fmsg = nxt_port_frag_start(task, port, msg); - if (nxt_slow_path(fmsg == NULL)) { - nxt_assert(fmsg != NULL); + if (msg->port_msg.mmap && msg->cancelled == 0) { + nxt_port_mmap_read(task, msg); + b = msg->buf; } + fmsg = nxt_port_frag_start(task, port, msg); + + nxt_assert(fmsg != NULL); + fmsg->port_msg.nf = 0; fmsg->port_msg.mf = 0; - msg->buf = NULL; - msg->fd = -1; - b = NULL; + if (nxt_fast_path(msg->cancelled == 0)) { + msg->buf = NULL; + msg->fd = -1; + b = NULL; + + } else { + if (msg->fd != -1) { + nxt_fd_close(msg->fd); + } + } } else { - port->handler(task, msg); + if (nxt_fast_path(msg->cancelled == 0)) { + + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, msg); + b = msg->buf; + } + + port->handler(task, msg); + } } } @@ -754,14 +800,6 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, /* restore original buf */ msg->buf = orig_b; } - - return; - -fail: - - if (msg->fd != -1) { - nxt_fd_close(msg->fd); - } } |