diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_socket.c | 183 |
1 files changed, 179 insertions, 4 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index dce97e83..a430eacf 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -242,6 +242,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.type = type & NXT_PORT_MSG_MASK; msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg.port_msg.mmap = 0; + msg.port_msg.nf = 0; + msg.port_msg.mf = 0; msg.work.data = NULL; @@ -324,11 +326,15 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) sb.size = 0; sb.limit = port->max_size; + sb.limit_reached = 0; + sb.nmax_reached = 0; + m = nxt_port_mmap_get_method(task, port, msg->buf); if (m == NXT_PORT_METHOD_MMAP) { sb.limit = (1ULL << 31) - 1; - sb.nmax = NXT_IOBUF_MAX * 10 - 1; + sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, + port->max_size / PORT_MMAP_MIN_SIZE); } nxt_sendbuf_mem_coalesce(task, &sb); @@ -347,6 +353,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } msg->port_msg.last |= sb.last; + msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); @@ -368,12 +375,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { + nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, + msg->port_msg.stream); + /* * A file descriptor is sent only * in the first message of a stream. */ msg->fd = -1; msg->share += n; + msg->port_msg.nf = 1; if (msg->share >= port->max_share) { msg->share = 0; @@ -534,12 +545,134 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +static nxt_int_t +nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_port_recv_msg_t *fmsg; + + fmsg = data; + + if (lhq->key.length == sizeof(uint32_t) + && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream) + { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +static void * +nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) +{ + return nxt_mp_alloc(ctx, size); +} + + +static void +nxt_port_lvlhsh_frag_free(void *ctx, void *p) +{ + return nxt_mp_free(ctx, p); +} + + +static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_port_lvlhsh_frag_test, + nxt_port_lvlhsh_frag_alloc, + nxt_port_lvlhsh_frag_free, +}; + + +static nxt_port_recv_msg_t * +nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, + nxt_port_recv_msg_t *msg) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_port_recv_msg_t *fmsg; + + nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); + + fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); + + if (nxt_slow_path(fmsg == NULL)) { + return NULL; + } + + *fmsg = *msg; + + lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t)); + lhq.key.length = sizeof(uint32_t); + lhq.key.start = (u_char *) &fmsg->port_msg.stream; + lhq.proto = &lvlhsh_frag_proto; + lhq.replace = 0; + lhq.value = fmsg; + lhq.pool = port->mem_pool; + + res = nxt_lvlhsh_insert(&port->frags, &lhq); + + switch (res) { + + case NXT_OK: + return fmsg; + + case NXT_DECLINED: + nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", + fmsg->port_msg.stream); + nxt_mp_free(port->mem_pool, fmsg); + + return NULL; + + default: + nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", + fmsg->port_msg.stream); + + nxt_mp_free(port->mem_pool, fmsg); + + return NULL; + + } +} + + +static nxt_port_recv_msg_t * +nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, + nxt_bool_t last) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream); + + lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t)); + lhq.key.length = sizeof(uint32_t); + lhq.key.start = (u_char *) &stream; + lhq.proto = &lvlhsh_frag_proto; + lhq.pool = port->mem_pool; + + res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : + nxt_lvlhsh_find(&port->frags, &lhq); + + switch (res) { + + case NXT_OK: + return lhq.value; + + default: + nxt_log(task, NXT_LOG_WARN, "frag stream #%uD not found", stream); + + return NULL; + } +} + + static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b; - nxt_buf_t *orig_b; + nxt_buf_t *b, *orig_b; + nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { nxt_log(task, NXT_LOG_CRIT, @@ -558,7 +691,49 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = msg->buf; } - port->handler(task, msg); + if (nxt_slow_path(msg->port_msg.nf != 0)) { + fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, + msg->port_msg.mf == 0); + + if (nxt_slow_path(fmsg == NULL)) { + nxt_assert(fmsg != NULL); + } + + nxt_buf_chain_add(&fmsg->buf, msg->buf); + + fmsg->size += msg->size; + + msg->buf = NULL; + b = NULL; + + if (nxt_fast_path(msg->port_msg.mf == 0)) { + b = fmsg->buf; + + port->handler(task, fmsg); + + msg->buf = fmsg->buf; + msg->fd = fmsg->fd; + + nxt_mp_free(port->mem_pool, fmsg); + } + } else { + if (nxt_slow_path(msg->port_msg.mf != 0)) { + fmsg = nxt_port_frag_start(task, port, msg); + + if (nxt_slow_path(fmsg == NULL)) { + nxt_assert(fmsg != NULL); + } + + fmsg->port_msg.nf = 0; + fmsg->port_msg.mf = 0; + + msg->buf = NULL; + msg->fd = -1; + b = NULL; + } else { + port->handler(task, msg); + } + } if (msg->port_msg.mmap && orig_b != b) { |