summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port_socket.c183
1 files changed, 179 insertions, 4 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index dce97e83..a430eacf 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -242,6 +242,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.type = type & NXT_PORT_MSG_MASK;
msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg.port_msg.mmap = 0;
+ msg.port_msg.nf = 0;
+ msg.port_msg.mf = 0;
msg.work.data = NULL;
@@ -324,11 +326,15 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
sb.size = 0;
sb.limit = port->max_size;
+ sb.limit_reached = 0;
+ sb.nmax_reached = 0;
+
m = nxt_port_mmap_get_method(task, port, msg->buf);
if (m == NXT_PORT_METHOD_MMAP) {
sb.limit = (1ULL << 31) - 1;
- sb.nmax = NXT_IOBUF_MAX * 10 - 1;
+ sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
+ port->max_size / PORT_MMAP_MIN_SIZE);
}
nxt_sendbuf_mem_coalesce(task, &sb);
@@ -347,6 +353,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
}
msg->port_msg.last |= sb.last;
+ msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
@@ -368,12 +375,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
m == NXT_PORT_METHOD_MMAP);
if (msg->buf != NULL) {
+ nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
+ msg->port_msg.stream);
+
/*
* A file descriptor is sent only
* in the first message of a stream.
*/
msg->fd = -1;
msg->share += n;
+ msg->port_msg.nf = 1;
if (msg->share >= port->max_share) {
msg->share = 0;
@@ -534,12 +545,134 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
}
+static nxt_int_t
+nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_port_recv_msg_t *fmsg;
+
+ fmsg = data;
+
+ if (lhq->key.length == sizeof(uint32_t)
+ && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
+ {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+
+static void *
+nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
+{
+ return nxt_mp_alloc(ctx, size);
+}
+
+
+static void
+nxt_port_lvlhsh_frag_free(void *ctx, void *p)
+{
+ return nxt_mp_free(ctx, p);
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_port_lvlhsh_frag_test,
+ nxt_port_lvlhsh_frag_alloc,
+ nxt_port_lvlhsh_frag_free,
+};
+
+
+static nxt_port_recv_msg_t *
+nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_recv_msg_t *msg)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_port_recv_msg_t *fmsg;
+
+ nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
+
+ fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
+
+ if (nxt_slow_path(fmsg == NULL)) {
+ return NULL;
+ }
+
+ *fmsg = *msg;
+
+ lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
+ lhq.key.length = sizeof(uint32_t);
+ lhq.key.start = (u_char *) &fmsg->port_msg.stream;
+ lhq.proto = &lvlhsh_frag_proto;
+ lhq.replace = 0;
+ lhq.value = fmsg;
+ lhq.pool = port->mem_pool;
+
+ res = nxt_lvlhsh_insert(&port->frags, &lhq);
+
+ switch (res) {
+
+ case NXT_OK:
+ return fmsg;
+
+ case NXT_DECLINED:
+ nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
+ fmsg->port_msg.stream);
+ nxt_mp_free(port->mem_pool, fmsg);
+
+ return NULL;
+
+ default:
+ nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
+ fmsg->port_msg.stream);
+
+ nxt_mp_free(port->mem_pool, fmsg);
+
+ return NULL;
+
+ }
+}
+
+
+static nxt_port_recv_msg_t *
+nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
+ nxt_bool_t last)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+
+ nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
+
+ lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
+ lhq.key.length = sizeof(uint32_t);
+ lhq.key.start = (u_char *) &stream;
+ lhq.proto = &lvlhsh_frag_proto;
+ lhq.pool = port->mem_pool;
+
+ res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
+ nxt_lvlhsh_find(&port->frags, &lhq);
+
+ switch (res) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ nxt_log(task, NXT_LOG_WARN, "frag stream #%uD not found", stream);
+
+ return NULL;
+ }
+}
+
+
static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
- nxt_buf_t *b;
- nxt_buf_t *orig_b;
+ nxt_buf_t *b, *orig_b;
+ nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
nxt_log(task, NXT_LOG_CRIT,
@@ -558,7 +691,49 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = msg->buf;
}
- port->handler(task, msg);
+ if (nxt_slow_path(msg->port_msg.nf != 0)) {
+ fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
+ msg->port_msg.mf == 0);
+
+ if (nxt_slow_path(fmsg == NULL)) {
+ nxt_assert(fmsg != NULL);
+ }
+
+ nxt_buf_chain_add(&fmsg->buf, msg->buf);
+
+ fmsg->size += msg->size;
+
+ msg->buf = NULL;
+ b = NULL;
+
+ if (nxt_fast_path(msg->port_msg.mf == 0)) {
+ b = fmsg->buf;
+
+ port->handler(task, fmsg);
+
+ msg->buf = fmsg->buf;
+ msg->fd = fmsg->fd;
+
+ nxt_mp_free(port->mem_pool, fmsg);
+ }
+ } else {
+ if (nxt_slow_path(msg->port_msg.mf != 0)) {
+ fmsg = nxt_port_frag_start(task, port, msg);
+
+ if (nxt_slow_path(fmsg == NULL)) {
+ nxt_assert(fmsg != NULL);
+ }
+
+ fmsg->port_msg.nf = 0;
+ fmsg->port_msg.mf = 0;
+
+ msg->buf = NULL;
+ msg->fd = -1;
+ b = NULL;
+ } else {
+ port->handler(task, msg);
+ }
+ }
if (msg->port_msg.mmap && orig_b != b) {