diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
commit | 89c0f7c5db5003b8fd8df3e1babb0c802004bf4c (patch) | |
tree | 7ac164d6fe41fd76beb3d328e6fde4020f744b63 /src/nxt_application.c | |
parent | 45d08d5145c63fd788f85d9e789314dcf093c99e (diff) | |
download | unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.gz unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.bz2 |
Implementing the ability to cancel request before worker starts processing it.
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r-- | src/nxt_application.c | 92 |
1 files changed, 31 insertions, 61 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 2cd3c921..a854564e 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -413,7 +413,9 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(port == NULL)) { - // + nxt_debug(task, "stream #%uD: reply port %d not found", + msg->port_msg.stream, msg->port_msg.reply_port); + return; } wmsg.port = port; @@ -425,20 +427,12 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } -nxt_inline nxt_port_t * -nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg) -{ - return msg->port; -} - - u_char * nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) { size_t free_size; u_char *res; nxt_buf_t *b; - nxt_port_t *port; res = NULL; @@ -446,12 +440,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) b = *msg->buf; if (b == NULL) { - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NULL; - } - - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, msg->port, size); if (nxt_slow_path(b == NULL)) { return NULL; } @@ -573,10 +562,9 @@ nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, } -nxt_int_t -nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) +nxt_inline nxt_int_t +nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) { - size_t length; nxt_buf_t *buf; do { @@ -603,7 +591,25 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) break; } while (1); - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length); + buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); + + return NXT_OK; +} + + +nxt_int_t +nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) +{ + size_t length; + nxt_int_t ret; + nxt_buf_t *buf; + + ret = nxt_app_msg_read_size_(task, msg, &length); + if (ret != NXT_OK) { + return ret; + } + + buf = msg->buf; if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { return NXT_ERROR; @@ -688,37 +694,13 @@ nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) { - nxt_buf_t *buf; - - do { - buf = msg->buf; + nxt_int_t ret; - if (nxt_slow_path(buf == NULL)) { - return NXT_DONE; - } - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { - if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - msg->buf = buf->next; - continue; - } - return NXT_ERROR; - } - - if (buf->mem.pos[0] >= 128) { - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { - return NXT_ERROR; - } - } - - break; - } while (1); - - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); + ret = nxt_app_msg_read_size_(task, msg, size); nxt_debug(task, "nxt_read_size: %d", (int) *size); - return NXT_OK; + return ret; } @@ -921,21 +903,15 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) { nxt_int_t rc; nxt_buf_t *b; - nxt_port_t *port; rc = NXT_OK; - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - if (nxt_slow_path(last == 1)) { do { b = *msg->buf; if (b == NULL) { - b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST); *msg->buf = b; break; } @@ -945,7 +921,7 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) } if (nxt_slow_path(msg->write != NULL)) { - rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, + rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA, -1, msg->stream, 0, msg->write); msg->write = NULL; @@ -962,7 +938,6 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, { size_t free_size, copy_size; nxt_buf_t *b; - nxt_port_t *port; nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); @@ -970,12 +945,7 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, b = *msg->buf; if (b == NULL) { - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, msg->port, size); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } |