summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c89
1 files changed, 65 insertions, 24 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 01fe2dab..c9b5105b 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -195,7 +195,24 @@ nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
static nxt_port_send_msg_t *
-nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
+nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg)
+{
+ if (msg->work.data == NULL) {
+ msg = nxt_port_msg_create(task, msg);
+ }
+
+ if (msg != NULL) {
+ nxt_queue_insert_head(&port->messages, &msg->link);
+ }
+
+ return msg;
+}
+
+
+static nxt_port_send_msg_t *
+nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg)
{
if (msg->work.data == NULL) {
msg = nxt_port_msg_create(task, msg);
@@ -260,7 +277,7 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
} else {
nxt_thread_mutex_lock(&port->write_mutex);
- res = nxt_port_msg_push(task, port, &msg);
+ res = nxt_port_msg_insert_tail(task, port, &msg);
nxt_thread_mutex_unlock(&port->write_mutex);
@@ -349,6 +366,8 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
iov[0].iov_len += sizeof(msg->tracking_msg);
}
+ sb.limit -= iov[0].iov_len;
+
nxt_sendbuf_mem_coalesce(task, &sb);
plain_size = sb.size;
@@ -407,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
}
data = NULL;
- if (nxt_port_msg_push(task, port, msg) != NULL) {
+ if (nxt_port_msg_insert_tail(task, port, msg) != NULL) {
use_delta++;
}
}
@@ -422,16 +441,17 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
data = NULL;
}
- } else if (nxt_slow_path(n == NXT_ERROR)) {
+ } else {
if (msg->link.next == NULL) {
- if (nxt_port_msg_push(task, port, msg) != NULL) {
+ if (nxt_port_msg_insert_head(task, port, msg) != NULL) {
use_delta++;
}
}
- goto fail;
- }
- /* n == NXT_AGAIN */
+ if (nxt_slow_path(n == NXT_ERROR)) {
+ goto fail;
+ }
+ }
} while (port->socket.write_ready);
@@ -546,6 +566,7 @@ void
nxt_port_read_close(nxt_port_t *port)
{
port->socket.read_ready = 0;
+ port->socket.read = NXT_EVENT_INACTIVE;
nxt_socket_close(port->socket.task, port->pair[0]);
port->pair[0] = -1;
}
@@ -618,15 +639,24 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
}
+typedef struct {
+ uint32_t stream;
+ uint32_t pid;
+} nxt_port_frag_key_t;
+
+
static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
{
nxt_port_recv_msg_t *fmsg;
+ nxt_port_frag_key_t *frag_key;
fmsg = data;
+ frag_key = (nxt_port_frag_key_t *) lhq->key.start;
- if (lhq->key.length == sizeof(uint32_t)
- && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
+ if (lhq->key.length == sizeof(nxt_port_frag_key_t)
+ && frag_key->stream == fmsg->port_msg.stream
+ && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
{
return NXT_OK;
}
@@ -664,6 +694,7 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
nxt_port_recv_msg_t *fmsg;
+ nxt_port_frag_key_t frag_key;
nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
@@ -675,9 +706,12 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
*fmsg = *msg;
- lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
- lhq.key.length = sizeof(uint32_t);
- lhq.key.start = (u_char *) &fmsg->port_msg.stream;
+ frag_key.stream = fmsg->port_msg.stream;
+ frag_key.pid = fmsg->port_msg.pid;
+
+ lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
+ lhq.key.length = sizeof(nxt_port_frag_key_t);
+ lhq.key.start = (u_char *) &frag_key;
lhq.proto = &lvlhsh_frag_proto;
lhq.replace = 0;
lhq.value = fmsg;
@@ -710,17 +744,24 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
static nxt_port_recv_msg_t *
-nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
- nxt_bool_t last)
+nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
{
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ nxt_int_t res;
+ nxt_bool_t last;
+ nxt_lvlhsh_query_t lhq;
+ nxt_port_frag_key_t frag_key;
+
+ last = msg->port_msg.mf == 0;
+
+ nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
+ msg->port_msg.stream);
- nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
+ frag_key.stream = msg->port_msg.stream;
+ frag_key.pid = msg->port_msg.pid;
- lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
- lhq.key.length = sizeof(uint32_t);
- lhq.key.start = (u_char *) &stream;
+ lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
+ lhq.key.length = sizeof(nxt_port_frag_key_t);
+ lhq.key.start = (u_char *) &frag_key;
lhq.proto = &lvlhsh_frag_proto;
lhq.pool = port->mem_pool;
@@ -733,7 +774,8 @@ nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
return lhq.value;
default:
- nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream);
+ nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
+ frag_key.stream);
return NULL;
}
@@ -773,8 +815,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
if (nxt_slow_path(msg->port_msg.nf != 0)) {
- fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
- msg->port_msg.mf == 0);
+ fmsg = nxt_port_frag_find(task, port, msg);
if (nxt_slow_path(fmsg == NULL)) {
goto fmsg_failed;