From 42b66ec654a1169bdc5f8e3acf1b6af1787dbd27 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 5 Mar 2019 15:38:50 +0300 Subject: Fixing EAGAIN processing for port message send. Sending large plain (exceeding port's max_size, not in shared memory) messages causes message fragmentation. First message fragment is sent successfully, but the next fragment may fail with the EAGAIN error. In this case, the message has to be pushed back to queue head for additional processing. Related to #167 issue on GitHub. --- src/nxt_port_socket.c | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 8694ef65..85da89c8 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -195,7 +195,24 @@ nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) static nxt_port_send_msg_t * -nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) +nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg) +{ + if (msg->work.data == NULL) { + msg = nxt_port_msg_create(task, msg); + } + + if (msg != NULL) { + nxt_queue_insert_head(&port->messages, &msg->link); + } + + return msg; +} + + +static nxt_port_send_msg_t * +nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg) { if (msg->work.data == NULL) { msg = nxt_port_msg_create(task, msg); @@ -260,7 +277,7 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } else { nxt_thread_mutex_lock(&port->write_mutex); - res = nxt_port_msg_push(task, port, &msg); + res = nxt_port_msg_insert_tail(task, port, &msg); nxt_thread_mutex_unlock(&port->write_mutex); @@ -409,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } data = NULL; - if (nxt_port_msg_push(task, port, msg) != NULL) { + if (nxt_port_msg_insert_tail(task, port, msg) != NULL) { use_delta++; } } @@ -424,16 +441,17 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) data = NULL; } - } else if (nxt_slow_path(n == NXT_ERROR)) { + } else { if (msg->link.next == NULL) { - if (nxt_port_msg_push(task, port, msg) != NULL) { + if (nxt_port_msg_insert_head(task, port, msg) != NULL) { use_delta++; } } - goto fail; - } - /* n == NXT_AGAIN */ + if (nxt_slow_path(n == NXT_ERROR)) { + goto fail; + } + } } while (port->socket.write_ready); -- cgit