summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_application.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_application.c92
1 files changed, 31 insertions, 61 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 2cd3c921..a854564e 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -413,7 +413,9 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
msg->port_msg.reply_port);
if (nxt_slow_path(port == NULL)) {
- //
+ nxt_debug(task, "stream #%uD: reply port %d not found",
+ msg->port_msg.stream, msg->port_msg.reply_port);
+ return;
}
wmsg.port = port;
@@ -425,20 +427,12 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
-nxt_inline nxt_port_t *
-nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg)
-{
- return msg->port;
-}
-
-
u_char *
nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
{
size_t free_size;
u_char *res;
nxt_buf_t *b;
- nxt_port_t *port;
res = NULL;
@@ -446,12 +440,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
b = *msg->buf;
if (b == NULL) {
- port = nxt_app_msg_get_port(task, msg);
- if (nxt_slow_path(port == NULL)) {
- return NULL;
- }
-
- b = nxt_port_mmap_get_buf(task, port, size);
+ b = nxt_port_mmap_get_buf(task, msg->port, size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
@@ -573,10 +562,9 @@ nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
}
-nxt_int_t
-nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
+nxt_inline nxt_int_t
+nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
{
- size_t length;
nxt_buf_t *buf;
do {
@@ -603,7 +591,25 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
break;
} while (1);
- buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length);
+ buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
+
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
+{
+ size_t length;
+ nxt_int_t ret;
+ nxt_buf_t *buf;
+
+ ret = nxt_app_msg_read_size_(task, msg, &length);
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ buf = msg->buf;
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
return NXT_ERROR;
@@ -688,37 +694,13 @@ nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
nxt_int_t
nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
{
- nxt_buf_t *buf;
-
- do {
- buf = msg->buf;
+ nxt_int_t ret;
- if (nxt_slow_path(buf == NULL)) {
- return NXT_DONE;
- }
-
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
- if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
- msg->buf = buf->next;
- continue;
- }
- return NXT_ERROR;
- }
-
- if (buf->mem.pos[0] >= 128) {
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
- return NXT_ERROR;
- }
- }
-
- break;
- } while (1);
-
- buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
+ ret = nxt_app_msg_read_size_(task, msg, size);
nxt_debug(task, "nxt_read_size: %d", (int) *size);
- return NXT_OK;
+ return ret;
}
@@ -921,21 +903,15 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
{
nxt_int_t rc;
nxt_buf_t *b;
- nxt_port_t *port;
rc = NXT_OK;
- port = nxt_app_msg_get_port(task, msg);
- if (nxt_slow_path(port == NULL)) {
- return NXT_ERROR;
- }
-
if (nxt_slow_path(last == 1)) {
do {
b = *msg->buf;
if (b == NULL) {
- b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST);
*msg->buf = b;
break;
}
@@ -945,7 +921,7 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
}
if (nxt_slow_path(msg->write != NULL)) {
- rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
+ rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA,
-1, msg->stream, 0, msg->write);
msg->write = NULL;
@@ -962,7 +938,6 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
{
size_t free_size, copy_size;
nxt_buf_t *b;
- nxt_port_t *port;
nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
@@ -970,12 +945,7 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
b = *msg->buf;
if (b == NULL) {
- port = nxt_app_msg_get_port(task, msg);
- if (nxt_slow_path(port == NULL)) {
- return NXT_ERROR;
- }
-
- b = nxt_port_mmap_get_buf(task, port, size);
+ b = nxt_port_mmap_get_buf(task, msg->port, size);
if (nxt_slow_path(b == NULL)) {
return NXT_ERROR;
}