diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-03-05 15:38:51 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-03-05 15:38:51 +0300 |
commit | ddd2e8cc3676e81abd098baa2b8d262a74be8cee (patch) | |
tree | 192f498cff4c1115566180e413e733ff2269a5d1 /src/nxt_port_socket.c | |
parent | 42b66ec654a1169bdc5f8e3acf1b6af1787dbd27 (diff) | |
download | unit-ddd2e8cc3676e81abd098baa2b8d262a74be8cee.tar.gz unit-ddd2e8cc3676e81abd098baa2b8d262a74be8cee.tar.bz2 |
Improving port message fragment recognition.
This is required to assemble fragmented messages correctly. Stream
identifier is unique only for messages generated within a process, but
the (stream, pid) pair should be enough to avoid collisions. Adding
reply_port seems redundant because it's enough to add stream to a pid.
This closes #199 issue on GitHub.
Thanks to 洪志道 (Hong Zhi Dao).
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r-- | src/nxt_port_socket.c | 52 |
1 files changed, 36 insertions, 16 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 85da89c8..a426df31 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -638,15 +638,24 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +typedef struct { + uint32_t stream; + uint32_t pid; +} nxt_port_frag_key_t; + + static nxt_int_t nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) { nxt_port_recv_msg_t *fmsg; + nxt_port_frag_key_t *frag_key; fmsg = data; + frag_key = (nxt_port_frag_key_t *) lhq->key.start; - if (lhq->key.length == sizeof(uint32_t) - && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream) + if (lhq->key.length == sizeof(nxt_port_frag_key_t) + && frag_key->stream == fmsg->port_msg.stream + && frag_key->pid == (uint32_t) fmsg->port_msg.pid) { return NXT_OK; } @@ -684,6 +693,7 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, nxt_int_t res; nxt_lvlhsh_query_t lhq; nxt_port_recv_msg_t *fmsg; + nxt_port_frag_key_t frag_key; nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); @@ -695,9 +705,12 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, *fmsg = *msg; - lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t)); - lhq.key.length = sizeof(uint32_t); - lhq.key.start = (u_char *) &fmsg->port_msg.stream; + frag_key.stream = fmsg->port_msg.stream; + frag_key.pid = fmsg->port_msg.pid; + + lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); + lhq.key.length = sizeof(nxt_port_frag_key_t); + lhq.key.start = (u_char *) &frag_key; lhq.proto = &lvlhsh_frag_proto; lhq.replace = 0; lhq.value = fmsg; @@ -730,17 +743,24 @@ nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, static nxt_port_recv_msg_t * -nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, - nxt_bool_t last) +nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + nxt_int_t res; + nxt_bool_t last; + nxt_lvlhsh_query_t lhq; + nxt_port_frag_key_t frag_key; + + last = msg->port_msg.mf == 0; + + nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", + msg->port_msg.stream); - nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream); + frag_key.stream = msg->port_msg.stream; + frag_key.pid = msg->port_msg.pid; - lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t)); - lhq.key.length = sizeof(uint32_t); - lhq.key.start = (u_char *) &stream; + lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); + lhq.key.length = sizeof(nxt_port_frag_key_t); + lhq.key.start = (u_char *) &frag_key; lhq.proto = &lvlhsh_frag_proto; lhq.pool = port->mem_pool; @@ -753,7 +773,8 @@ nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, return lhq.value; default: - nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream); + nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", + frag_key.stream); return NULL; } @@ -793,8 +814,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_slow_path(msg->port_msg.nf != 0)) { - fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, - msg->port_msg.mf == 0); + fmsg = nxt_port_frag_find(task, port, msg); if (nxt_slow_path(fmsg == NULL)) { goto fmsg_failed; |