summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c104
1 files changed, 71 insertions, 33 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 08f87e84..6d762bbd 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -223,8 +223,9 @@ nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
nxt_int_t
-nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
+nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
+ void *tracking)
{
nxt_port_send_msg_t msg, *res;
@@ -236,6 +237,10 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.share = 0;
+ if (tracking != NULL) {
+ nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
+ }
+
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
@@ -244,6 +249,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mmap = 0;
msg.port_msg.nf = 0;
msg.port_msg.mf = 0;
+ msg.port_msg.tracking = tracking != NULL;
msg.work.data = NULL;
@@ -337,6 +343,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
port->max_size / PORT_MMAP_MIN_SIZE);
}
+ if (msg->port_msg.tracking) {
+ iov[0].iov_len += sizeof(msg->tracking_msg);
+ }
+
nxt_sendbuf_mem_coalesce(task, &sb);
plain_size = sb.size;
@@ -385,6 +395,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->fd = -1;
msg->share += n;
msg->port_msg.nf = 1;
+ msg->port_msg.tracking = 0;
if (msg->share >= port->max_share) {
msg->share = 0;
@@ -677,7 +688,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
nxt_log(task, NXT_LOG_CRIT,
"port %d: too small message:%uz", port->socket.fd, msg->size);
- goto fail;
+
+ if (msg->fd != -1) {
+ nxt_fd_close(msg->fd);
+ }
+
+ return;
}
/* adjust size to actual buffer used size */
@@ -686,52 +702,82 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = orig_b = msg->buf;
b->mem.free += msg->size;
- if (msg->port_msg.mmap) {
- nxt_port_mmap_read(task, port, msg);
- b = msg->buf;
+ if (msg->port_msg.tracking) {
+ msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
+
+ } else {
+ msg->cancelled = 0;
}
if (nxt_slow_path(msg->port_msg.nf != 0)) {
+
fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
msg->port_msg.mf == 0);
- if (nxt_slow_path(fmsg == NULL)) {
- nxt_assert(fmsg != NULL);
- }
+ nxt_assert(fmsg != NULL);
- nxt_buf_chain_add(&fmsg->buf, msg->buf);
+ if (nxt_fast_path(fmsg->cancelled == 0)) {
- fmsg->size += msg->size;
+ if (msg->port_msg.mmap) {
+ nxt_port_mmap_read(task, msg);
+ b = msg->buf;
+ }
- msg->buf = NULL;
- b = NULL;
+ nxt_buf_chain_add(&fmsg->buf, msg->buf);
- if (nxt_fast_path(msg->port_msg.mf == 0)) {
- b = fmsg->buf;
+ fmsg->size += msg->size;
+ msg->buf = NULL;
+ b = NULL;
- port->handler(task, fmsg);
+ if (nxt_fast_path(msg->port_msg.mf == 0)) {
- msg->buf = fmsg->buf;
- msg->fd = fmsg->fd;
+ b = fmsg->buf;
+ port->handler(task, fmsg);
+
+ msg->buf = fmsg->buf;
+ msg->fd = fmsg->fd;
+ }
+ }
+
+ if (nxt_fast_path(msg->port_msg.mf == 0)) {
nxt_mp_free(port->mem_pool, fmsg);
}
} else {
if (nxt_slow_path(msg->port_msg.mf != 0)) {
- fmsg = nxt_port_frag_start(task, port, msg);
- if (nxt_slow_path(fmsg == NULL)) {
- nxt_assert(fmsg != NULL);
+ if (msg->port_msg.mmap && msg->cancelled == 0) {
+ nxt_port_mmap_read(task, msg);
+ b = msg->buf;
}
+ fmsg = nxt_port_frag_start(task, port, msg);
+
+ nxt_assert(fmsg != NULL);
+
fmsg->port_msg.nf = 0;
fmsg->port_msg.mf = 0;
- msg->buf = NULL;
- msg->fd = -1;
- b = NULL;
+ if (nxt_fast_path(msg->cancelled == 0)) {
+ msg->buf = NULL;
+ msg->fd = -1;
+ b = NULL;
+
+ } else {
+ if (msg->fd != -1) {
+ nxt_fd_close(msg->fd);
+ }
+ }
} else {
- port->handler(task, msg);
+ if (nxt_fast_path(msg->cancelled == 0)) {
+
+ if (msg->port_msg.mmap) {
+ nxt_port_mmap_read(task, msg);
+ b = msg->buf;
+ }
+
+ port->handler(task, msg);
+ }
}
}
@@ -754,14 +800,6 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
/* restore original buf */
msg->buf = orig_b;
}
-
- return;
-
-fail:
-
- if (msg->fd != -1) {
- nxt_fd_close(msg->fd);
- }
}