diff options
-rw-r--r-- | src/nxt_app_nncq.h | 165 | ||||
-rw-r--r-- | src/nxt_app_queue.h | 119 | ||||
-rw-r--r-- | src/nxt_http_websocket.c | 16 | ||||
-rw-r--r-- | src/nxt_port.c | 28 | ||||
-rw-r--r-- | src/nxt_port.h | 25 | ||||
-rw-r--r-- | src/nxt_port_queue.h | 102 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 271 | ||||
-rw-r--r-- | src/nxt_router.c | 201 | ||||
-rw-r--r-- | src/nxt_router_request.h | 4 | ||||
-rw-r--r-- | src/nxt_unit.c | 1171 | ||||
-rw-r--r-- | src/nxt_unit.h | 2 |
11 files changed, 1793 insertions, 311 deletions
diff --git a/src/nxt_app_nncq.h b/src/nxt_app_nncq.h new file mode 100644 index 00000000..f9b8ce0c --- /dev/null +++ b/src/nxt_app_nncq.h @@ -0,0 +1,165 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_APP_NNCQ_H_INCLUDED_ +#define _NXT_APP_NNCQ_H_INCLUDED_ + + +/* Appilcation Numeric Naive Circular Queue */ + +#define NXT_APP_NNCQ_SIZE 131072 + +typedef uint32_t nxt_app_nncq_atomic_t; +typedef uint16_t nxt_app_nncq_cycle_t; + +typedef struct { + nxt_app_nncq_atomic_t head; + nxt_app_nncq_atomic_t entries[NXT_APP_NNCQ_SIZE]; + nxt_app_nncq_atomic_t tail; +} nxt_app_nncq_t; + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_head(nxt_app_nncq_t const volatile *q) +{ + return q->head; +} + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_tail(nxt_app_nncq_t const volatile *q) +{ + return q->tail; +} + + +static inline void +nxt_app_nncq_tail_cmp_inc(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t t) +{ + nxt_atomic_cmp_set(&q->tail, t, t + 1); +} + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_index(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i) +{ + return i % NXT_APP_NNCQ_SIZE; +} + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_map(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i) +{ + return i % NXT_APP_NNCQ_SIZE; +} + + +static inline nxt_app_nncq_cycle_t +nxt_app_nncq_cycle(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i) +{ + return i / NXT_APP_NNCQ_SIZE; +} + + +static inline nxt_app_nncq_cycle_t +nxt_app_nncq_next_cycle(nxt_app_nncq_t const volatile *q, + nxt_app_nncq_cycle_t i) +{ + return i + 1; +} + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_new_entry(nxt_app_nncq_t const volatile *q, + nxt_app_nncq_cycle_t cycle, + nxt_app_nncq_atomic_t i) +{ + return cycle * NXT_APP_NNCQ_SIZE + (i % NXT_APP_NNCQ_SIZE); +} + + +static inline nxt_app_nncq_atomic_t +nxt_app_nncq_empty(nxt_app_nncq_t const volatile *q) +{ + return NXT_APP_NNCQ_SIZE; +} + + +static void +nxt_app_nncq_init(nxt_app_nncq_t volatile *q) +{ + q->head = NXT_APP_NNCQ_SIZE; + nxt_memzero((void *) q->entries, + NXT_APP_NNCQ_SIZE * sizeof(nxt_app_nncq_atomic_t)); + q->tail = NXT_APP_NNCQ_SIZE; +} + + +static void +nxt_app_nncq_enqueue(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t val) +{ + nxt_app_nncq_cycle_t e_cycle, t_cycle; + nxt_app_nncq_atomic_t n, t, e, j; + + for ( ;; ) { + t = nxt_app_nncq_tail(q); + j = nxt_app_nncq_map(q, t); + e = q->entries[j]; + + e_cycle = nxt_app_nncq_cycle(q, e); + t_cycle = nxt_app_nncq_cycle(q, t); + + if (e_cycle == t_cycle) { + nxt_app_nncq_tail_cmp_inc(q, t); + continue; + } + + if (nxt_app_nncq_next_cycle(q, e_cycle) != t_cycle) { + continue; + } + + n = nxt_app_nncq_new_entry(q, t_cycle, val); + + if (nxt_atomic_cmp_set(&q->entries[j], e, n)) { + break; + } + } + + nxt_app_nncq_tail_cmp_inc(q, t); +} + + +static nxt_app_nncq_atomic_t +nxt_app_nncq_dequeue(nxt_app_nncq_t volatile *q) +{ + nxt_app_nncq_cycle_t e_cycle, h_cycle; + nxt_app_nncq_atomic_t h, j, e; + + for ( ;; ) { + h = nxt_app_nncq_head(q); + j = nxt_app_nncq_map(q, h); + e = q->entries[j]; + + e_cycle = nxt_app_nncq_cycle(q, e); + h_cycle = nxt_app_nncq_cycle(q, h); + + if (e_cycle != h_cycle) { + if (nxt_app_nncq_next_cycle(q, e_cycle) == h_cycle) { + return nxt_app_nncq_empty(q); + } + + continue; + } + + if (nxt_atomic_cmp_set(&q->head, h, h + 1)) { + break; + } + } + + return nxt_app_nncq_index(q, e); +} + + +#endif /* _NXT_APP_NNCQ_H_INCLUDED_ */ diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h new file mode 100644 index 00000000..127cb8f3 --- /dev/null +++ b/src/nxt_app_queue.h @@ -0,0 +1,119 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_APP_QUEUE_H_INCLUDED_ +#define _NXT_APP_QUEUE_H_INCLUDED_ + + +#include <nxt_app_nncq.h> + + +/* Using Numeric Naive Circular Queue as a backend. */ + +#define NXT_APP_QUEUE_SIZE NXT_APP_NNCQ_SIZE +#define NXT_APP_QUEUE_MSG_SIZE 31 + +typedef struct { + uint8_t size; + uint8_t data[NXT_APP_QUEUE_MSG_SIZE]; + uint32_t tracking; +} nxt_app_queue_item_t; + + +typedef struct { + nxt_app_nncq_atomic_t nitems; + nxt_app_nncq_t free_items; + nxt_app_nncq_t queue; + nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE]; +} nxt_app_queue_t; + + +nxt_inline void +nxt_app_queue_init(nxt_app_queue_t volatile *q) +{ + nxt_app_nncq_atomic_t i; + + nxt_app_nncq_init(&q->free_items); + nxt_app_nncq_init(&q->queue); + + for (i = 0; i < NXT_APP_QUEUE_SIZE; i++) { + nxt_app_nncq_enqueue(&q->free_items, i); + } + + q->nitems = 0; +} + + +nxt_inline nxt_int_t +nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, + uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie) +{ + nxt_app_queue_item_t *qi; + nxt_app_nncq_atomic_t i; + + i = nxt_app_nncq_dequeue(&q->free_items); + if (i == nxt_app_nncq_empty(&q->free_items)) { + return NXT_AGAIN; + } + + qi = (nxt_app_queue_item_t *) &q->items[i]; + + qi->size = size; + nxt_memcpy(qi->data, p, size); + qi->tracking = tracking; + *cookie = i; + + nxt_app_nncq_enqueue(&q->queue, i); + + i = nxt_atomic_fetch_add(&q->nitems, 1); + + if (notify != NULL) { + *notify = (i == 0); + } + + return NXT_OK; +} + + +nxt_inline nxt_bool_t +nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie, + uint32_t tracking) +{ + nxt_app_queue_item_t *qi; + + qi = (nxt_app_queue_item_t *) &q->items[cookie]; + + return nxt_atomic_cmp_set(&qi->tracking, tracking, 0); +} + + +nxt_inline ssize_t +nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie) +{ + ssize_t res; + nxt_app_queue_item_t *qi; + nxt_app_nncq_atomic_t i; + + i = nxt_app_nncq_dequeue(&q->queue); + if (i == nxt_app_nncq_empty(&q->queue)) { + *cookie = 0; + return -1; + } + + qi = (nxt_app_queue_item_t *) &q->items[i]; + + res = qi->size; + nxt_memcpy(p, qi->data, qi->size); + *cookie = i; + + nxt_app_nncq_enqueue(&q->free_items, i); + + nxt_atomic_fetch_add(&q->nitems, -1); + + return res; +} + + +#endif /* _NXT_APP_QUEUE_H_INCLUDED_ */ diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index 393c20ac..1968633e 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -98,10 +98,10 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) b = next; } - res = nxt_port_socket_twrite(task, req_rpc_data->app_port, - NXT_PORT_MSG_WEBSOCKET, -1, - req_rpc_data->stream, - task->thread->engine->port->id, out, NULL); + res = nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_WEBSOCKET, -1, + req_rpc_data->stream, + task->thread->engine->port->id, out); if (nxt_slow_path(res != NXT_OK)) { // TODO: handle } @@ -144,10 +144,10 @@ nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) goto close_handler; } - (void) nxt_port_socket_twrite(task, req_rpc_data->app_port, - NXT_PORT_MSG_WEBSOCKET_LAST, - -1, req_rpc_data->stream, - task->thread->engine->port->id, NULL, NULL); + (void) nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_WEBSOCKET_LAST, + -1, req_rpc_data->stream, + task->thread->engine->port->id, NULL); close_handler: diff --git a/src/nxt_port.c b/src/nxt_port.c index 54434d70..c9189d7c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_router.h> +#include <nxt_port_queue.h> static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -68,6 +69,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); + port->queue_fd = -1; + } else { nxt_mp_destroy(mp); } @@ -99,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) nxt_router_app_port_close(task, port); } } + + if (port->queue_fd != -1) { + nxt_fd_close(port->queue_fd); + port->queue_fd = -1; + } + + if (port->queue != NULL) { + nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + port->queue = NULL; + } } @@ -176,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +/* TODO join with process_ready and move to nxt_main_process.c */ nxt_inline void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port, uint32_t stream) @@ -227,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, msg->max_share = port->max_share; msg->type = new_port->type; - return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->pair[1], stream, 0, b); + return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, + new_port->pair[1], new_port->queue_fd, + stream, 0, b); } @@ -279,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->u.new_port = port; } - +/* TODO move to nxt_main_process.c */ void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -304,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); + if (msg->fd != -1) { + port->queue_fd = msg->fd; + port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, + 0); + } + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); } diff --git a/src/nxt_port.h b/src/nxt_port.h index ab455f92..9fbf00b1 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -42,6 +42,7 @@ struct nxt_port_handlers_s { /* Request headers. */ nxt_port_handler_t req_headers; nxt_port_handler_t req_headers_ack; + nxt_port_handler_t req_body; /* Websocket frame. */ nxt_port_handler_t websocket_frame; @@ -51,6 +52,8 @@ struct nxt_port_handlers_s { nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; + nxt_port_handler_t read_queue; + nxt_port_handler_t read_socket; }; @@ -91,12 +94,15 @@ typedef enum { _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack), + _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body), _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), + _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue), + _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket), NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) / sizeof(nxt_port_handler_t), @@ -124,6 +130,7 @@ typedef enum { NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID), NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS, + NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY, NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET, NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET), @@ -132,6 +139,8 @@ typedef enum { NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), + NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE, + NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET, } nxt_port_msg_type_t; @@ -236,6 +245,12 @@ struct nxt_port_s { nxt_atomic_t use_count; nxt_process_type_t type; + + nxt_fd_t queue_fd; + void *queue; + + void *socket_msg; + int from_socket; }; @@ -286,17 +301,17 @@ void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_write_close(nxt_port_t *port); void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_read_close(nxt_port_t *port); -nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, - nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, - nxt_buf_t *b, void *tracking); +nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, + nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, + nxt_port_id_t reply_port, nxt_buf_t *b); nxt_inline nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) { - return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b, - NULL); + return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port, + b); } void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, diff --git a/src/nxt_port_queue.h b/src/nxt_port_queue.h new file mode 100644 index 00000000..d2b2326b --- /dev/null +++ b/src/nxt_port_queue.h @@ -0,0 +1,102 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PORT_QUEUE_H_INCLUDED_ +#define _NXT_PORT_QUEUE_H_INCLUDED_ + + +#include <nxt_nncq.h> + + +/* Using Numeric Naive Circular Queue as a backend. */ + +#define NXT_PORT_QUEUE_SIZE NXT_NNCQ_SIZE +#define NXT_PORT_QUEUE_MSG_SIZE 31 + + +typedef struct { + uint8_t size; + uint8_t data[NXT_PORT_QUEUE_MSG_SIZE]; +} nxt_port_queue_item_t; + + +typedef struct { + nxt_nncq_atomic_t nitems; + nxt_nncq_t free_items; + nxt_nncq_t queue; + nxt_port_queue_item_t items[NXT_PORT_QUEUE_SIZE]; +} nxt_port_queue_t; + + +nxt_inline void +nxt_port_queue_init(nxt_port_queue_t volatile *q) +{ + nxt_nncq_atomic_t i; + + nxt_nncq_init(&q->free_items); + nxt_nncq_init(&q->queue); + + for (i = 0; i < NXT_PORT_QUEUE_SIZE; i++) { + nxt_nncq_enqueue(&q->free_items, i); + } + + q->nitems = 0; +} + + +nxt_inline nxt_int_t +nxt_port_queue_send(nxt_port_queue_t volatile *q, const void *p, uint8_t size, + int *notify) +{ + nxt_nncq_atomic_t i; + nxt_port_queue_item_t *qi; + + i = nxt_nncq_dequeue(&q->free_items); + if (i == nxt_nncq_empty(&q->free_items)) { + *notify = 0; + return NXT_AGAIN; + } + + qi = (nxt_port_queue_item_t *) &q->items[i]; + + qi->size = size; + nxt_memcpy(qi->data, p, size); + + nxt_nncq_enqueue(&q->queue, i); + + i = nxt_atomic_fetch_add(&q->nitems, 1); + + *notify = (i == 0); + + return NXT_OK; +} + + +nxt_inline ssize_t +nxt_port_queue_recv(nxt_port_queue_t volatile *q, void *p) +{ + ssize_t res; + nxt_nncq_atomic_t i; + nxt_port_queue_item_t *qi; + + i = nxt_nncq_dequeue(&q->queue); + if (i == nxt_nncq_empty(&q->queue)) { + return -1; + } + + qi = (nxt_port_queue_item_t *) &q->items[i]; + + res = qi->size; + nxt_memcpy(p, qi->data, qi->size); + + nxt_nncq_enqueue(&q->free_items, i); + + nxt_atomic_fetch_add(&q->nitems, -1); + + return res; +} + + +#endif /* _NXT_PORT_QUEUE_H_INCLUDED_ */ diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 844b65ca..14e2e605 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -5,6 +5,7 @@ */ #include <nxt_main.h> +#include <nxt_port_queue.h> static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, @@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, + void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); @@ -143,12 +146,15 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg) nxt_int_t -nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, - void *tracking) +nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, + nxt_buf_t *b) { + int notify; + uint8_t *p; nxt_int_t res; nxt_port_send_msg_t msg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; msg.link.next = NULL; msg.link.prev = NULL; @@ -156,14 +162,10 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = b; msg.share = 0; msg.fd = fd; - msg.fd2 = -1; + msg.fd2 = fd2; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; - if (tracking != NULL) { - nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); - } - msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; @@ -172,7 +174,42 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; - msg.port_msg.tracking = tracking != NULL; + + if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { + + if (fd == -1 + && (b == NULL + || nxt_buf_mem_used_size(&b->mem) + <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)))) + { + p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t)); + if (b != NULL) { + p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem)); + } + + res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) (p - qmsg), notify, res); + + if (notify == 0) { + return res; + } + + msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; + msg.buf = NULL; + + } else { + qmsg[0] = _NXT_PORT_MSG_READ_SOCKET; + + res = nxt_port_queue_send(port->queue, qmsg, 1, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + notify, res); + } + } res = nxt_port_msg_chk_insert(task, port, &msg); if (nxt_fast_path(res == NXT_DECLINED)) { @@ -308,10 +345,6 @@ next_fragment: port->max_size / PORT_MMAP_MIN_SIZE); } - if (msg->port_msg.tracking) { - iov[0].iov_len += sizeof(msg->tracking_msg); - } - sb.limit -= iov[0].iov_len; nxt_sendbuf_mem_coalesce(task, &sb); @@ -368,7 +401,6 @@ next_fragment: msg->fd2 = -1; msg->share += n; msg->port_msg.nf = 1; - msg->port_msg.tracking = 0; if (msg->share >= port->max_share) { msg->share = 0; @@ -576,7 +608,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) port->engine = task->thread->engine; port->socket.read_work_queue = &port->engine->fast_work_queue; - port->socket.read_handler = nxt_port_read_handler; + port->socket.read_handler = port->queue != NULL + ? nxt_port_queue_read_handler + : nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; nxt_fd_event_enable_read(port->engine, &port->socket); @@ -660,6 +694,206 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +static void +nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_buf_t *b; + nxt_port_t *port; + struct iovec iov[2]; + nxt_port_queue_t *queue; + nxt_port_recv_msg_t msg, *smsg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; + + port = nxt_container_of(obj, nxt_port_t, socket); + msg.port = port; + + nxt_assert(port->engine == task->thread->engine); + + queue = port->queue; + nxt_atomic_fetch_add(&queue->nitems, 1); + + for ( ;; ) { + + if (port->from_socket == 0) { + n = nxt_port_queue_recv(queue, qmsg); + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + + n = nxt_port_queue_recv(queue, qmsg); + if (n < 0) { + return; + } + + nxt_atomic_fetch_add(&queue->nitems, 1); + } + + if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { + port->from_socket++; + + nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", + (int) port->pid, (int) port->id, port->socket.fd, + port->from_socket); + + n = -1; + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: dequeue %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + } else { + if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { + msg.port_msg = smsg->port_msg; + b = smsg->buf; + n = smsg->size; + msg.fd = smsg->fd; + msg.fd2 = smsg->fd2; + + smsg->size = 0; + + port->from_socket--; + + nxt_debug(task, "port{%d,%d} %d: use suspended message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + goto process; + } + + n = -1; + } + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + return; + } + + b = nxt_port_buf_alloc(port); + + if (nxt_slow_path(b == NULL)) { + /* TODO: disable event for some time */ + } + + if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); + + if (n > (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), + n - sizeof(nxt_port_msg_t)); + } + + } else { + iov[0].iov_base = &msg.port_msg; + iov[0].iov_len = sizeof(nxt_port_msg_t); + + iov[1].iov_base = b->mem.pos; + iov[1].iov_len = port->max_size; + + n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); + + if (n == (ssize_t) sizeof(nxt_port_msg_t) + && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) + { + nxt_port_buf_free(port, b); + + nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: recvmsg %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + if (n > 0) { + if (port->from_socket == 0) { + nxt_debug(task, "port{%d,%d} %d: suspend message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + smsg = port->socket_msg; + + if (nxt_slow_path(smsg == NULL)) { + smsg = nxt_mp_alloc(port->mem_pool, + sizeof(nxt_port_recv_msg_t)); + + if (nxt_slow_path(smsg == NULL)) { + nxt_alert(task, "port{%d,%d} %d: suspend message " + "failed", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + + port->socket_msg = smsg; + + } else { + if (nxt_slow_path(smsg->size != 0)) { + nxt_alert(task, "port{%d,%d} %d: too many suspend " + "messages", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + } + + smsg->port_msg = msg.port_msg; + smsg->buf = b; + smsg->size = n; + smsg->fd = msg.fd; + smsg->fd2 = msg.fd2; + + continue; + } + + port->from_socket--; + } + } + + process: + + if (n > 0) { + msg.buf = b; + msg.size = n; + + nxt_port_read_msg_process(task, port, &msg); + + /* + * To disable instant completion or buffer re-usage, + * handler should reset 'msg.buf'. + */ + if (msg.buf == b) { + nxt_port_buf_free(port, b); + } + + continue; + } + + if (n == NXT_AGAIN) { + nxt_port_buf_free(port, b); + + nxt_fd_event_enable_read(task->thread->engine, &port->socket); + + continue; + } + + /* n == 0 || n == NXT_ERROR */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_port_error_handler, task, &port->socket, NULL); + return; + } +} + + typedef struct { uint32_t stream; uint32_t pid; @@ -831,12 +1065,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = orig_b = msg->buf; b->mem.free += msg->size; - if (msg->port_msg.tracking) { - msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; - - } else { - msg->cancelled = 0; - } + msg->cancelled = 0; if (nxt_slow_path(msg->port_msg.nf != 0)) { diff --git a/src/nxt_router.c b/src/nxt_router.c index b8e94bcc..3dd0878b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -15,6 +15,8 @@ #include <nxt_unit_request.h> #include <nxt_unit_response.h> #include <nxt_router_request.h> +#include <nxt_app_queue.h> +#include <nxt_port_queue.h> typedef struct { nxt_str_t type; @@ -92,6 +94,12 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, + nxt_port_t *port, nxt_fd_t fd); static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); static void nxt_router_listen_socket_ready(nxt_task_t *task, @@ -473,21 +481,25 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_inline nxt_bool_t -nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, - uint32_t stream) +nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *b, *next; - nxt_bool_t cancelled; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; + nxt_msg_info_t *msg_info; + + msg_info = &req_rpc_data->msg_info; if (msg_info->buf == NULL) { return 0; } - cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, - stream); + cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", stream); + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); } for (b = msg_info->buf; b != NULL; b = next) { @@ -529,7 +541,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, { nxt_http_request_t *r; - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); + nxt_router_msg_cancel(task, req_rpc_data); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -573,6 +585,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_int_t res; nxt_app_t *app; nxt_port_t *port, *main_app_port; nxt_runtime_t *rt; @@ -592,6 +605,17 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; + + } else { + if (msg->fd2 != -1) { + res = nxt_router_port_queue_map(task, port, msg->fd2); + if (nxt_slow_path(res != NXT_OK)) { + return; + } + + nxt_fd_close(msg->fd2); + msg->fd2 = -1; + } } if (msg->port_msg.stream != 0) { @@ -1523,6 +1547,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } + ret = nxt_router_app_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + nxt_port_write_enable(task, port); port->app = app; @@ -1828,6 +1858,82 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } +static nxt_int_t +nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_app_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_port_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) +{ + void *mem; + + nxt_assert(fd != -1); + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + + return NXT_ERROR; + } + + port->queue = mem; + + return NXT_OK; +} + + void nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, nxt_http_action_t *action) @@ -2748,6 +2854,12 @@ nxt_router_thread_start(void *data) return; } + ret = nxt_router_port_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return; + } + engine->port = port; nxt_port_enable(task, port, &nxt_router_app_port_handlers); @@ -3670,6 +3782,7 @@ static void nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) { + int res; nxt_app_t *app; nxt_bool_t start_process; nxt_port_t *app_port, *main_app_port, *idle_port; @@ -3752,6 +3865,24 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, req_rpc_data->app_port = app_port; + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); + + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); + + res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + + if (nxt_slow_path(res != NXT_OK)) { + r = req_rpc_data->request; + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + } + if (app->timeout != 0) { r = req_rpc_data->request; @@ -3886,10 +4017,10 @@ nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) msg->max_share = port->max_share; msg->type = port->type; - return nxt_port_socket_twrite(task, app_port, + return nxt_port_socket_write2(task, app_port, NXT_PORT_MSG_NEW_PORT, - port->pair[0], - 0, 0, b, NULL); + port->pair[0], port->queue_fd, + 0, 0, b); } @@ -4522,6 +4653,13 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_int_t res; nxt_port_t *port, *reply_port; + int notify; + struct { + nxt_port_msg_t pm; + nxt_port_mmap_msg_t mm; + } msg; + + app = req_rpc_data->app; nxt_assert(app != NULL); @@ -4529,6 +4667,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, port = req_rpc_data->app_port; nxt_assert(port != NULL); + nxt_assert(port->queue != NULL); reply_port = task->thread->engine->port; @@ -4569,20 +4708,38 @@ nxt_router_app_prepare_request(nxt_task_t *task, req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->msg_info.body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, - req_rpc_data->msg_info.body_fd); + msg.pm.stream = req_rpc_data->stream; + msg.pm.pid = reply_port->pid; + msg.pm.reply_port = reply_port->id; + msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; + msg.pm.last = 0; + msg.pm.mmap = 1; + msg.pm.nf = 0; + msg.pm.mf = 0; + msg.pm.tracking = 0; - lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); - } + nxt_port_mmap_handler_t *mmap_handler = buf->parent; + nxt_port_mmap_header_t *hdr = mmap_handler->hdr; + + msg.mm.mmap_id = hdr->id; + msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); + msg.mm.size = nxt_buf_used_size(buf); - res = nxt_port_socket_twrite(task, port, - NXT_PORT_MSG_REQ_HEADERS, - req_rpc_data->msg_info.body_fd, - req_rpc_data->stream, reply_port->id, buf, - NULL); + res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), + req_rpc_data->stream, ¬ify, + &req_rpc_data->msg_info.tracking_cookie); + if (nxt_fast_path(res == NXT_OK)) { + if (notify != 0) { + (void) nxt_port_socket_write(task, port, + NXT_PORT_MSG_READ_QUEUE, + -1, req_rpc_data->stream, + reply_port->id, NULL); - if (nxt_slow_path(res != NXT_OK)) { + } else { + nxt_debug(task, "queue is not empty"); + } + + } else { nxt_alert(task, "stream #%uD, app '%V': failed to send app message", req_rpc_data->stream, &app->name); diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h index 1271520d..95044dbb 100644 --- a/src/nxt_router_request.h +++ b/src/nxt_router_request.h @@ -7,10 +7,10 @@ #define _NXT_ROUTER_REQUEST_H_INCLUDED_ -typedef struct nxt_msg_info_s { +typedef struct { nxt_buf_t *buf; nxt_fd_t body_fd; - nxt_port_mmap_tracking_t tracking; + uint32_t tracking_cookie; nxt_work_handler_t completion_handler; } nxt_msg_info_t; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 66aadd98..1008a9d6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -7,6 +7,8 @@ #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_port_queue.h" +#include "nxt_app_queue.h" #include "nxt_unit.h" #include "nxt_unit_request.h" @@ -50,12 +52,15 @@ nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, + int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); @@ -92,6 +97,7 @@ static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n); +static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, @@ -103,8 +109,6 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); @@ -124,18 +128,22 @@ static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port); + nxt_unit_port_t *port, int queue_fd); nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port); + nxt_unit_port_t *port, void *queue); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -150,18 +158,28 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, + nxt_unit_read_buf_t *src); +static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); -static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl); -static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( - nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); +static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req); +static nxt_unit_request_info_t *nxt_unit_request_hash_find( + nxt_unit_ctx_t *ctx, uint32_t stream, int remove); static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); @@ -217,6 +235,7 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + uint8_t in_hash; /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; @@ -349,6 +368,11 @@ struct nxt_unit_port_impl_s { nxt_queue_t awaiting_req; int ready; + + void *queue; + + int from_socket; + nxt_unit_read_buf_t *socket_rbuf; }; @@ -375,7 +399,8 @@ typedef struct { nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc; + int rc, queue_fd; + void *mem; uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; @@ -386,6 +411,8 @@ nxt_unit_init(nxt_unit_init_t *init) return NULL; } + queue_fd = -1; + if (init->ready_port.id.pid != 0 && init->ready_stream != 0 && init->read_port.id.pid != 0) @@ -422,33 +449,58 @@ nxt_unit_init(nxt_unit_init_t *init) ctx = &lib->main_ctx.ctx; - lib->router_port = nxt_unit_add_port(ctx, &router_port); + lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; + } + + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_port_queue_init(mem); + + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } - rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } close(ready_port.out_fd); + close(queue_fd); return ctx; fail: + if (queue_fd != -1) { + close(queue_fd); + } + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; @@ -497,6 +549,7 @@ nxt_unit_create(nxt_unit_init_t *init) rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -505,6 +558,7 @@ nxt_unit_create(nxt_unit_init_t *init) if (cb->request_handler == NULL) { nxt_unit_alert(NULL, "request_handler is NULL"); + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -765,12 +819,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; nxt_port_msg_t msg; nxt_unit_impl_t *lib; + union { + struct cmsghdr cm; + char space[CMSG_SPACE(sizeof(int))]; + } cmsg; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); msg.stream = stream; @@ -783,7 +842,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); + memset(&cmsg, 0, sizeof(cmsg)); + + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_level = SOL_SOCKET; + cmsg.cm.cmsg_type = SCM_RIGHTS; + + /* + * memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -838,6 +915,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } + nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", + port_msg->stream, (int) port_msg->type, + recv_msg.fd, recv_msg.fd2); + recv_msg.stream = port_msg->stream; recv_msg.pid = port_msg->pid; recv_msg.reply_port = port_msg->reply_port; @@ -853,19 +934,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } - if (port_msg->tracking) { - rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - if (rc == NXT_UNIT_AGAIN) { - recv_msg.fd = -1; - recv_msg.fd2 = -1; - } - - goto fail; - } - } - /* Fragmentation is unsupported. */ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", @@ -929,6 +997,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = nxt_unit_process_req_headers(ctx, &recv_msg); break; + case _NXT_PORT_MSG_REQ_BODY: + rc = nxt_unit_process_req_body(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_WEBSOCKET: rc = nxt_unit_process_websocket(ctx, &recv_msg); break; @@ -992,6 +1064,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + void *mem; nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -1013,9 +1086,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port_msg = recv_msg->start; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", recv_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, recv_msg->fd); + (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -1025,6 +1098,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = recv_msg->fd; new_port.out_fd = -1; + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); + } else { nb = 0; @@ -1041,14 +1117,23 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = -1; new_port.out_fd = recv_msg->fd; + + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); } + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, + strerror(errno), errno); + + return NXT_UNIT_ERROR; + } new_port.data = NULL; recv_msg->fd = -1; - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, mem); if (nxt_slow_path(port == NULL)) { return NXT_UNIT_ERROR; } @@ -1134,6 +1219,7 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; req_impl->websocket = 0; + req_impl->in_hash = 0; nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, (int) r->method_length, @@ -1151,12 +1237,82 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (nxt_fast_path(res == NXT_UNIT_OK)) { res = nxt_unit_send_req_headers_ack(req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + return NXT_UNIT_OK; + } + } + + lib->callbacks.request_handler(req); + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + uint64_t l; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (req == NULL) { + return NXT_UNIT_OK; + } + + l = req->content_buf->end - req->content_buf->free; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + l += b->buf.end - b->buf.free; + } + + if (recv_msg->incoming_buf != NULL) { + b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + + /* "Move" incoming buffer list to req_impl. */ + nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + recv_msg->incoming_buf = NULL; + } + + req->content_fd = recv_msg->fd; + recv_msg->fd = -1; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.data_handler != NULL) { + lib->callbacks.data_handler(req); + + return NXT_UNIT_OK; + } + + if (req->content_fd != -1 || l == req->content_length) { lib->callbacks.request_handler(req); } @@ -1260,6 +1416,9 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_queue_insert_tail(&process->ports, &port_impl->link); port_impl->process = process; + port_impl->queue = NULL; + port_impl->from_socket = 0; + port_impl->socket_rbuf = NULL; nxt_queue_init(&port_impl->awaiting_req); @@ -1321,21 +1480,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) size_t hsize; nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *b; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, - recv_msg->last); - if (req_impl == NULL) { + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (nxt_slow_path(req == NULL)) { return NXT_UNIT_OK; } - req = &req_impl->req; + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); cb = &lib->callbacks; @@ -1501,12 +1656,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->websocket) { - nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - - req_impl->websocket = 0; + if (req_impl->in_hash) { + nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } + req_impl->websocket = 0; + while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -2170,7 +2325,6 @@ int nxt_unit_response_upgrade(nxt_unit_request_info_t *req) { int rc; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); @@ -2193,9 +2347,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); - - rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + rc = nxt_unit_request_hash_add(req->ctx, req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); @@ -2466,6 +2618,8 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + return rbuf; } @@ -2564,6 +2718,8 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, nxt_unit_request_info_impl_t *req_impl; char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + nxt_unit_req_debug(req, "write: %d", (int) size); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; @@ -2743,9 +2899,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); + nxt_unit_req_debug(req, "read: %d", (int) buf_res); + if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); - if (res < 0) { + if (nxt_slow_path(res < 0)) { nxt_unit_req_alert(req, "failed to read content: %s (%d)", strerror(errno), errno); @@ -3301,7 +3459,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) { - nxt_port_msg_t *port_msg; + int res; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -3313,21 +3471,15 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - - nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); - - if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); return NXT_UNIT_ERROR; } - port_msg = (nxt_port_msg_t *) rbuf->buf; - - if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + if (nxt_unit_is_shm_ack(rbuf)) { nxt_unit_read_buf_release(ctx, rbuf); - break; } @@ -3337,7 +3489,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); - if (port_msg->type == _NXT_PORT_MSG_QUIT) { + if (nxt_unit_is_quit(rbuf)) { nxt_unit_debug(ctx, "oosm: quit received"); return NXT_UNIT_ERROR; @@ -3406,7 +3558,6 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; - char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; @@ -3420,59 +3571,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) return NULL; } - snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", - lib->pid, (void *) pthread_self()); - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", - strerror(errno), errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink(name); - - fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink(name) == -1)) { - nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, - strerror(errno), errno); - } - -#else - -#error No working shared memory implementation. - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, - strerror(errno), errno); - goto remove_fail; } @@ -3481,6 +3581,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, strerror(errno), errno); + close(fd); + goto remove_fail; } @@ -3533,6 +3635,80 @@ remove_fail: static int +nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) +{ + int fd; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + char name[64]; + + snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", + lib->pid, (void *) pthread_self()); +#endif + +#if (NXT_HAVE_MEMFD_CREATE) + + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); + +#elif (NXT_HAVE_SHM_OPEN_ANON) + + fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", + strerror(errno), errno); + + return -1; + } + +#elif (NXT_HAVE_SHM_OPEN) + + /* Just in case. */ + shm_unlink(name); + + fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + if (nxt_slow_path(shm_unlink(name) == -1)) { + nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, + strerror(errno), errno); + } + +#else + +#error No working shared memory implementation. + +#endif + + if (nxt_slow_path(ftruncate(fd, size) == -1)) { + nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, + strerror(errno), errno); + + close(fd); + + return -1; + } + + return fd; +} + + +static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; @@ -3797,61 +3973,6 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, - nxt_unit_read_buf_t *rbuf) -{ - int res; - nxt_chunk_id_t c; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_tracking_msg_t *tracking_msg; - - if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->stream, (int) recv_msg->size); - - return NXT_UNIT_ERROR; - } - - tracking_msg = recv_msg->start; - - recv_msg->start = tracking_msg + 1; - recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->incoming.mutex); - - res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, - recv_msg->pid, tracking_msg->mmap_id, - &hdr, rbuf); - - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; - } - - c = tracking_msg->tracking_id; - res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - - if (res == 0) { - nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->stream); - - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); - - res = NXT_UNIT_CANCELLED; - - } else { - res = NXT_UNIT_OK; - } - - pthread_mutex_unlock(&lib->incoming.mutex); - - return res; -} - - -static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf) @@ -4154,7 +4275,7 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) } process->pid = pid; - process->use_count = 1; + process->use_count = 2; process->next_port_id = 0; process->lib = lib; @@ -4176,8 +4297,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) break; } - nxt_unit_process_use(process); - return process; } @@ -4293,22 +4412,52 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - int res, err; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - struct pollfd fds[2]; + int nevents, res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + struct pollfd fds[2]; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + + return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } + port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, + port); + retry: + if (port_impl->from_socket == 0) { + res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + port_impl->from_socket); + + } else { + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + } + + res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; @@ -4317,31 +4466,47 @@ retry: fds[1].events = POLLIN; fds[1].revents = 0; - res = poll(fds, 2, -1); - if (nxt_slow_path(res < 0)) { + nevents = poll(fds, 2, -1); + if (nxt_slow_path(nevents == -1)) { err = errno; if (err == EINTR) { goto retry; } - nxt_unit_alert(ctx, "poll() failed: %s (%d)", - strerror(err), err); + nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", + fds[0].fd, fds[1].fd, strerror(err), err); rbuf->size = -1; return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); + if ((fds[0].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } if ((fds[1].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } - rbuf->size = -1; + nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); return NXT_UNIT_ERROR; } @@ -4392,9 +4557,11 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { + int res; nxt_queue_t ready_req; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); @@ -4419,7 +4586,35 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); - (void) nxt_unit_send_req_headers_ack(&req_impl->req); + req = &req_impl->req; + + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + continue; + } + } lib->callbacks.request_handler(&req_impl->req); @@ -4432,6 +4627,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { int rc; nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); @@ -4442,11 +4638,30 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + retry: + + rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4455,11 +4670,68 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) } +nxt_inline int +nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) +{ + if (nxt_fast_path(rbuf->size == 1)) { + return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_SHM_ACK; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_QUIT; + } + + return 0; +} + + int nxt_unit_run_shared(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_use(ctx); @@ -4467,11 +4739,35 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + + retry: + rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_read_buf_release(ctx, rbuf); + break; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4499,6 +4795,7 @@ static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; + nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; rbuf = nxt_unit_read_buf_get(ctx); @@ -4506,10 +4803,18 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - rc = nxt_unit_port_recv(ctx, port, rbuf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { +retry: + + if (port == lib->shared_port) { + rc = nxt_unit_shared_port_recv(ctx, port, rbuf); + + } else { + rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); + } + + if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); return rc; } @@ -4526,6 +4831,15 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_process_ready_req(ctx); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + if (lib->online) { + goto retry; + } + return rc; } @@ -4540,10 +4854,12 @@ nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *port; - nxt_unit_ctx_impl_t *new_ctx; + int rc, queue_fd; + void *mem; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *new_ctx; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4554,33 +4870,57 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } + rc = nxt_unit_ctx_init(lib, new_ctx, data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + free(new_ctx); + + return NULL; + } + + queue_fd = -1; + port = nxt_unit_create_port(ctx); if (nxt_slow_path(port == NULL)) { - free(new_ctx); + goto fail; + } - return NULL; + new_ctx->read_port = port; + + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; } - rc = nxt_unit_send_port(ctx, lib->router_port, port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + goto fail; } - rc = nxt_unit_ctx_init(lib, new_ctx, data); + nxt_port_queue_init(mem); + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + port_impl->queue = mem; + + rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - new_ctx->read_port = port; + close(queue_fd); return &new_ctx->ctx; fail: - nxt_unit_remove_port(lib, &port->id); - nxt_unit_port_release(port); + if (queue_fd != -1) { + close(queue_fd); + } - free(new_ctx); + nxt_unit_ctx_release(&new_ctx->ctx); return NULL; } @@ -4633,6 +4973,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_remove(&ctx_impl->link); if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -4709,10 +5050,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) nxt_unit_process_release(process); - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, NULL); if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "create_port: add_port() failed"); - close(port_sockets[0]); close(port_sockets[1]); } @@ -4723,10 +5062,11 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port) + nxt_unit_port_t *port, int queue_fd) { ssize_t res; nxt_unit_impl_t *lib; + int fds[2] = { port->out_fd, queue_fd }; struct { nxt_port_msg_t msg; @@ -4735,7 +5075,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, union { struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; + char space[CMSG_SPACE(sizeof(int) * 2)]; } cmsg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4758,7 +5098,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, memset(&cmsg, 0, sizeof(cmsg)); - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; @@ -4771,7 +5111,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); @@ -4799,7 +5139,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) c = nxt_atomic_fetch_add(&port_impl->use_count, -1); if (c == 1) { - nxt_unit_debug(NULL, "destroy port %d,%d", + nxt_unit_debug(NULL, "destroy port{%d,%d}", (int) port->id.pid, (int) port->id.id); nxt_unit_process_release(port_impl->process); @@ -4816,13 +5156,31 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) port->out_fd = -1; } + if (port->in_fd != -1) { + close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + + port->out_fd = -1; + } + + if (port_impl->queue != NULL) { + munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t)); + } + free(port_impl); } } static nxt_unit_port_t * -nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { int rc; nxt_queue_t awaiting_req; @@ -4840,9 +5198,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); if (nxt_slow_path(old_port != NULL)) { - nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", - port->id.pid, port->id.id, - port->in_fd, port->out_fd); + nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " + "in_fd %d out_fd %d queue %p", + port->id.pid, port->id.id, + port->in_fd, port->out_fd, queue); if (old_port->data == NULL) { old_port->data = port->data; @@ -4875,6 +5234,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + if (old_port_impl->queue == NULL) { + old_port_impl->queue = queue; + } + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); nxt_queue_init(&old_port_impl->awaiting_req); @@ -4914,9 +5277,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = NULL; - nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", + nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, - port->in_fd, port->out_fd); + port->in_fd, port->out_fd, queue); process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { @@ -4929,6 +5292,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { + nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", + port->id.pid, port->id.id); + goto unlock; } @@ -4951,6 +5317,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + new_port->queue = queue; + new_port->from_socket = 0; + new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); @@ -5010,13 +5379,13 @@ nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(NULL, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", (int) port_id->pid, (int) port_id->id); return NULL; } - nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->in_fd, port->out_fd, port->data); @@ -5089,10 +5458,12 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->online = 0; + if (lib->online) { + lib->online = 0; - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } } @@ -5137,20 +5508,91 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - nxt_unit_impl_t *lib; - - nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", - (int) port->id.pid, (int) port->id.id, port->out_fd); + int notify; + ssize_t ret; + nxt_int_t rc; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + if (port_impl->queue != NULL && oob_size == 0 + && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) + { + rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", + (int) port->id.pid, (int) port->id.id, + (int) buf_size, notify); + + if (notify) { + memcpy(&msg, buf, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_READ_QUEUE; + + if (lib->callbacks.port_send == NULL) { + ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = lib->callbacks.port_send(ctx, port, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + } + + } + + return buf_size; + } + + if (port_impl->queue != NULL) { + msg.type = _NXT_PORT_MSG_READ_SOCKET; + + rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", + (int) port->id.pid, (int) port->id.id, notify); + } + if (lib->callbacks.port_send != NULL) { - return lib->callbacks.port_send(ctx, port, buf, buf_size, - oob, oob_size); + ret = lib->callbacks.port_send(ctx, port, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); } - return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, - oob, oob_size); + return ret; } @@ -5158,6 +5600,7 @@ static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { + int err; ssize_t res; struct iovec iov[1]; struct msghdr msg; @@ -5178,7 +5621,9 @@ retry: res = sendmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + err = errno; + + if (err == EINTR) { goto retry; } @@ -5187,7 +5632,7 @@ retry: * implementation. */ nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", - fd, (int) buf_size, strerror(errno), errno); + fd, (int) buf_size, strerror(err), err); } else { nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, @@ -5199,6 +5644,158 @@ retry: static int +nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res, read; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + read = 0; + +retry: + + if (port_impl->from_socket > 0) { + if (port_impl->socket_rbuf != NULL + && port_impl->socket_rbuf->size > 0) + { + port_impl->from_socket--; + + nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); + port_impl->socket_rbuf->size = 0; + + nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + + } else { + res = nxt_unit_port_queue_recv(port, rbuf); + + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) port->id.pid, (int) port->id.id, + port_impl->from_socket); + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + + if (read) { + return NXT_UNIT_AGAIN; + } + + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + read = 1; + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + if (port_impl->from_socket) { + nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); + } + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (res == NXT_UNIT_AGAIN) { + return NXT_UNIT_AGAIN; + } + + if (port_impl->from_socket > 0) { + port_impl->from_socket--; + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (port_impl->socket_rbuf == NULL) { + port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); + + if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + port_impl->socket_rbuf->size = 0; + } + + if (port_impl->socket_rbuf->size > 0) { + nxt_unit_alert(ctx, "too many port socket messages"); + + return NXT_UNIT_ERROR; + } + + nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + goto retry; +} + + +nxt_inline void +nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) +{ + memcpy(dst->buf, src->buf, src->size); + dst->size = src->size; + memcpy(dst->oob, src->oob, sizeof(src->oob)); +} + + +static int +nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res; + +retry: + + res = nxt_unit_app_queue_recv(port, rbuf); + + if (res == NXT_UNIT_AGAIN) { + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + goto retry; + } + } + + return res; +} + + +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { @@ -5214,6 +5811,9 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, rbuf->buf, sizeof(rbuf->buf), rbuf->oob, sizeof(rbuf->oob)); + nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + if (nxt_slow_path(rbuf->size < 0)) { return NXT_UNIT_ERROR; } @@ -5247,13 +5847,13 @@ retry: if (err == EAGAIN) { nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_AGAIN; } nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_ERROR; } @@ -5264,6 +5864,52 @@ retry: } +static int +nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + +static int +nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + uint32_t cookie; + nxt_port_msg_t *port_msg; + nxt_app_queue_t *queue; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + queue = port_impl->queue; + +retry: + + rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); + + nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); + + if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + return NXT_UNIT_OK; + } + + nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); + + goto retry; + } + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -5392,12 +6038,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { static int -nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl) +nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req) { - uint32_t *stream; - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (req_impl->in_hash) { + return NXT_UNIT_OK; + } stream = &req_impl->stream; @@ -5409,11 +6062,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, lhq.replace = 0; lhq.value = req_impl; - res = nxt_lvlhsh_insert(request_hash, &lhq); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); + + pthread_mutex_unlock(&ctx_impl->mutex); switch (res) { case NXT_OK: + req_impl->in_hash = 1; return NXT_UNIT_OK; default: @@ -5422,12 +6082,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, } -static nxt_unit_request_info_impl_t * -nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, - int remove) +static nxt_unit_request_info_t * +nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) { - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); lhq.key.length = sizeof(stream); @@ -5435,16 +6096,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, lhq.proto = &lvlhsh_requests_proto; lhq.pool = NULL; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + if (remove) { - res = nxt_lvlhsh_delete(request_hash, &lhq); + res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); } else { - res = nxt_lvlhsh_find(request_hash, &lhq); + res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); } + pthread_mutex_unlock(&ctx_impl->mutex); + switch (res) { case NXT_OK: + req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, + req); + req_impl->in_hash = 0; + return lhq.value; default: diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 0f16773f..67244cf4 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -121,6 +121,8 @@ struct nxt_unit_callbacks_s { */ void (*request_handler)(nxt_unit_request_info_t *req); + void (*data_handler)(nxt_unit_request_info_t *req); + /* Process websocket frame. */ void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); |