diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-04 15:03:45 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-04 15:03:45 +0300 |
commit | 00ecf713e36de2a5efffe761458b7ac0328bce87 (patch) | |
tree | d10e45f41d972326c4e5469a2cf4248a465514b6 | |
parent | 0faecee609b66a353d27499ca78ff6abcd3fef14 (diff) | |
download | unit-00ecf713e36de2a5efffe761458b7ac0328bce87.tar.gz unit-00ecf713e36de2a5efffe761458b7ac0328bce87.tar.bz2 |
Port message fragmentation supported.
- Each sendmsg() transmits no more than port->max_size payload data.
- Longer buffers are fragmented and send using multiple sendmsg() calls.
- On receive side, buffers are connected in chain.
- Number of handler calls is the same as number of nxt_port_socket_write()
calls.
- nxt_buf_make_plain() function introduced to make single plain buffer from
the chain.
-rw-r--r-- | src/nxt_buf.c | 30 | ||||
-rw-r--r-- | src/nxt_buf.h | 13 | ||||
-rw-r--r-- | src/nxt_main_process.c | 36 | ||||
-rw-r--r-- | src/nxt_port.c | 2 | ||||
-rw-r--r-- | src/nxt_port.h | 5 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 183 | ||||
-rw-r--r-- | src/nxt_router.c | 24 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 6 | ||||
-rw-r--r-- | src/nxt_sendbuf.h | 2 |
9 files changed, 273 insertions, 28 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c index 826cd017..7bc983a3 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -298,3 +298,33 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) } } } + + +nxt_buf_t * +nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size) +{ + nxt_buf_t *b, *i; + + if (nxt_slow_path(size == 0)) { + for (i = src; i != NULL; i = i->next) { + size += nxt_buf_used_size(i); + } + } + + b = nxt_buf_mem_alloc(mp, size, 0); + + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + for (i = src; i != NULL; i = i->next) { + if (nxt_slow_path(nxt_buf_mem_free_size(&b->mem) < + nxt_buf_used_size(i))) { + break; + } + + b->mem.free = nxt_cpymem(b->mem.free, i->mem.pos, nxt_buf_used_size(i)); + } + + return b; +} diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 2e896093..7a894d0f 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -250,6 +250,19 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags); NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, + size_t size); + +nxt_inline nxt_buf_t * +nxt_buf_chk_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size) +{ + if (nxt_slow_path(src != NULL && src->next != NULL)) { + return nxt_buf_make_plain(mp, src, size); + } + + return src; +} + #define \ nxt_buf_free(mp, b) \ nxt_mp_free((mp), (b)) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 128c0fde..ba916abf 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -179,13 +179,21 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) ret = NXT_ERROR; - b = msg->buf; + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + return; + } + + b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size); + + if (b == NULL) { + return; + } nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, b->mem.pos); - mp = nxt_mp_create(1024, 128, 256, 32); - nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); start = b->mem.pos; @@ -821,6 +829,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; + /* TODO check b size and make plain */ + out = NULL; ls.socket = -1; @@ -1037,14 +1047,20 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_debug(task, "application languages: \"%*s\"", - b->mem.free - b->mem.pos, b->mem.pos); - mp = nxt_mp_create(1024, 128, 256, 32); if (mp == NULL) { return; } + b = nxt_buf_chk_make_plain(mp, b, msg->size); + + if (b == NULL) { + return; + } + + nxt_debug(task, "application languages: \"%*s\"", + b->mem.free - b->mem.pos, b->mem.pos); + conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); if (conf == NULL) { goto fail; @@ -1131,7 +1147,7 @@ nxt_app_lang_compare(const void *v1, const void *v2) static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - ssize_t n, size; + ssize_t n, size, offset; nxt_buf_t *b; nxt_int_t ret; nxt_file_t file; @@ -1150,16 +1166,20 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto error; } + offset = 0; + for (b = msg->buf; b != NULL; b = b->next) { size = nxt_buf_mem_used_size(&b->mem); - n = nxt_file_write(&file, b->mem.pos, size, 0); + n = nxt_file_write(&file, b->mem.pos, size, offset); if (nxt_slow_path(n != size)) { nxt_file_close(task, &file); (void) nxt_file_delete(file.name); goto error; } + + offset += n; } nxt_file_close(task, &file); diff --git a/src/nxt_port.c b/src/nxt_port.c index df652ac4..52144759 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -248,6 +248,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; + /* TODO check b size and make plain */ + nxt_debug(task, "new port %d received for process %PI:%d", msg->fd, new_port_msg->pid, new_port_msg->id); diff --git a/src/nxt_port.h b/src/nxt_port.h index 09ad6367..bade2adb 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -106,6 +106,9 @@ typedef struct { /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */ uint8_t mmap; /* 1 bit */ + + uint8_t nf; + uint8_t mf; } nxt_port_msg_t; @@ -171,6 +174,8 @@ struct nxt_port_s { nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */ nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */ + nxt_lvlhsh_t frags; + nxt_atomic_t use_count; nxt_process_type_t type; diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index dce97e83..a430eacf 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -242,6 +242,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.type = type & NXT_PORT_MSG_MASK; msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg.port_msg.mmap = 0; + msg.port_msg.nf = 0; + msg.port_msg.mf = 0; msg.work.data = NULL; @@ -324,11 +326,15 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) sb.size = 0; sb.limit = port->max_size; + sb.limit_reached = 0; + sb.nmax_reached = 0; + m = nxt_port_mmap_get_method(task, port, msg->buf); if (m == NXT_PORT_METHOD_MMAP) { sb.limit = (1ULL << 31) - 1; - sb.nmax = NXT_IOBUF_MAX * 10 - 1; + sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, + port->max_size / PORT_MMAP_MIN_SIZE); } nxt_sendbuf_mem_coalesce(task, &sb); @@ -347,6 +353,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } msg->port_msg.last |= sb.last; + msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); @@ -368,12 +375,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { + nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, + msg->port_msg.stream); + /* * A file descriptor is sent only * in the first message of a stream. */ msg->fd = -1; msg->share += n; + msg->port_msg.nf = 1; if (msg->share >= port->max_share) { msg->share = 0; @@ -534,12 +545,134 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +static nxt_int_t +nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_port_recv_msg_t *fmsg; + + fmsg = data; + + if (lhq->key.length == sizeof(uint32_t) + && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream) + { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +static void * +nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) +{ + return nxt_mp_alloc(ctx, size); +} + + +static void +nxt_port_lvlhsh_frag_free(void *ctx, void *p) +{ + return nxt_mp_free(ctx, p); +} + + +static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_port_lvlhsh_frag_test, + nxt_port_lvlhsh_frag_alloc, + nxt_port_lvlhsh_frag_free, +}; + + +static nxt_port_recv_msg_t * +nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, + nxt_port_recv_msg_t *msg) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_port_recv_msg_t *fmsg; + + nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); + + fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); + + if (nxt_slow_path(fmsg == NULL)) { + return NULL; + } + + *fmsg = *msg; + + lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t)); + lhq.key.length = sizeof(uint32_t); + lhq.key.start = (u_char *) &fmsg->port_msg.stream; + lhq.proto = &lvlhsh_frag_proto; + lhq.replace = 0; + lhq.value = fmsg; + lhq.pool = port->mem_pool; + + res = nxt_lvlhsh_insert(&port->frags, &lhq); + + switch (res) { + + case NXT_OK: + return fmsg; + + case NXT_DECLINED: + nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", + fmsg->port_msg.stream); + nxt_mp_free(port->mem_pool, fmsg); + + return NULL; + + default: + nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", + fmsg->port_msg.stream); + + nxt_mp_free(port->mem_pool, fmsg); + + return NULL; + + } +} + + +static nxt_port_recv_msg_t * +nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, + nxt_bool_t last) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream); + + lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t)); + lhq.key.length = sizeof(uint32_t); + lhq.key.start = (u_char *) &stream; + lhq.proto = &lvlhsh_frag_proto; + lhq.pool = port->mem_pool; + + res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : + nxt_lvlhsh_find(&port->frags, &lhq); + + switch (res) { + + case NXT_OK: + return lhq.value; + + default: + nxt_log(task, NXT_LOG_WARN, "frag stream #%uD not found", stream); + + return NULL; + } +} + + static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b; - nxt_buf_t *orig_b; + nxt_buf_t *b, *orig_b; + nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { nxt_log(task, NXT_LOG_CRIT, @@ -558,7 +691,49 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = msg->buf; } - port->handler(task, msg); + if (nxt_slow_path(msg->port_msg.nf != 0)) { + fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, + msg->port_msg.mf == 0); + + if (nxt_slow_path(fmsg == NULL)) { + nxt_assert(fmsg != NULL); + } + + nxt_buf_chain_add(&fmsg->buf, msg->buf); + + fmsg->size += msg->size; + + msg->buf = NULL; + b = NULL; + + if (nxt_fast_path(msg->port_msg.mf == 0)) { + b = fmsg->buf; + + port->handler(task, fmsg); + + msg->buf = fmsg->buf; + msg->fd = fmsg->fd; + + nxt_mp_free(port->mem_pool, fmsg); + } + } else { + if (nxt_slow_path(msg->port_msg.mf != 0)) { + fmsg = nxt_port_frag_start(task, port, msg); + + if (nxt_slow_path(fmsg == NULL)) { + nxt_assert(fmsg != NULL); + } + + fmsg->port_msg.nf = 0; + fmsg->port_msg.mf = 0; + + msg->buf = NULL; + msg->fd = -1; + b = NULL; + } else { + port->handler(task, msg); + } + } if (msg->port_msg.mmap && orig_b != b) { diff --git a/src/nxt_router.c b/src/nxt_router.c index fddc3582..d46dae72 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -592,27 +592,19 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - size_t dump_size; nxt_int_t ret; nxt_buf_t *b; nxt_router_temp_conf_t *tmcf; - b = msg->buf; - - dump_size = nxt_buf_used_size(b); - - if (dump_size > 300) { - dump_size = 300; - } - - nxt_debug(task, "router conf data (%z): %*s", - msg->size, dump_size, b->mem.pos); - tmcf = nxt_router_temp_conf(task); if (nxt_slow_path(tmcf == NULL)) { return; } + b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); + + nxt_assert(b != NULL); + tmcf->conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; tmcf->port = nxt_runtime_port_find(task->thread->runtime, @@ -1442,8 +1434,12 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, rpc = data; sa = rpc->socket_conf->sockaddr; + tmcf = rpc->temp_conf; + + in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size); + + nxt_assert(in != NULL); - in = msg->buf; p = in->mem.pos; error = *p++; @@ -1452,8 +1448,6 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1 + sa->length + socket_errors[error].length + (in->mem.free - p); - tmcf = rpc->temp_conf; - out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); if (nxt_slow_path(out == NULL)) { return; diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 2a529c14..1684f67c 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -111,7 +111,9 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) if (total + size > sb->limit) { size = sb->limit - total; - if (size == 0) { + sb->limit_reached = 1; + + if (nxt_slow_path(size == 0)) { break; } } @@ -119,6 +121,8 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) if (b->mem.pos != last) { if (++n >= sb->nmax) { + sb->nmax_reached = 1; + goto done; } diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 03da2c5d..3bcbaaf4 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -58,6 +58,8 @@ typedef struct { uint32_t nmax; uint8_t sync; /* 1 bit */ uint8_t last; /* 1 bit */ + uint8_t limit_reached; + uint8_t nmax_reached; size_t size; size_t limit; |