summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_app_nncq.h165
-rw-r--r--src/nxt_app_queue.h119
-rw-r--r--src/nxt_http_websocket.c16
-rw-r--r--src/nxt_port.c28
-rw-r--r--src/nxt_port.h25
-rw-r--r--src/nxt_port_queue.h102
-rw-r--r--src/nxt_port_socket.c271
-rw-r--r--src/nxt_router.c201
-rw-r--r--src/nxt_router_request.h4
-rw-r--r--src/nxt_unit.c1171
-rw-r--r--src/nxt_unit.h2
11 files changed, 1793 insertions, 311 deletions
diff --git a/src/nxt_app_nncq.h b/src/nxt_app_nncq.h
new file mode 100644
index 00000000..f9b8ce0c
--- /dev/null
+++ b/src/nxt_app_nncq.h
@@ -0,0 +1,165 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_APP_NNCQ_H_INCLUDED_
+#define _NXT_APP_NNCQ_H_INCLUDED_
+
+
+/* Appilcation Numeric Naive Circular Queue */
+
+#define NXT_APP_NNCQ_SIZE 131072
+
+typedef uint32_t nxt_app_nncq_atomic_t;
+typedef uint16_t nxt_app_nncq_cycle_t;
+
+typedef struct {
+ nxt_app_nncq_atomic_t head;
+ nxt_app_nncq_atomic_t entries[NXT_APP_NNCQ_SIZE];
+ nxt_app_nncq_atomic_t tail;
+} nxt_app_nncq_t;
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_head(nxt_app_nncq_t const volatile *q)
+{
+ return q->head;
+}
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_tail(nxt_app_nncq_t const volatile *q)
+{
+ return q->tail;
+}
+
+
+static inline void
+nxt_app_nncq_tail_cmp_inc(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t t)
+{
+ nxt_atomic_cmp_set(&q->tail, t, t + 1);
+}
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_index(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
+{
+ return i % NXT_APP_NNCQ_SIZE;
+}
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_map(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
+{
+ return i % NXT_APP_NNCQ_SIZE;
+}
+
+
+static inline nxt_app_nncq_cycle_t
+nxt_app_nncq_cycle(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
+{
+ return i / NXT_APP_NNCQ_SIZE;
+}
+
+
+static inline nxt_app_nncq_cycle_t
+nxt_app_nncq_next_cycle(nxt_app_nncq_t const volatile *q,
+ nxt_app_nncq_cycle_t i)
+{
+ return i + 1;
+}
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_new_entry(nxt_app_nncq_t const volatile *q,
+ nxt_app_nncq_cycle_t cycle,
+ nxt_app_nncq_atomic_t i)
+{
+ return cycle * NXT_APP_NNCQ_SIZE + (i % NXT_APP_NNCQ_SIZE);
+}
+
+
+static inline nxt_app_nncq_atomic_t
+nxt_app_nncq_empty(nxt_app_nncq_t const volatile *q)
+{
+ return NXT_APP_NNCQ_SIZE;
+}
+
+
+static void
+nxt_app_nncq_init(nxt_app_nncq_t volatile *q)
+{
+ q->head = NXT_APP_NNCQ_SIZE;
+ nxt_memzero((void *) q->entries,
+ NXT_APP_NNCQ_SIZE * sizeof(nxt_app_nncq_atomic_t));
+ q->tail = NXT_APP_NNCQ_SIZE;
+}
+
+
+static void
+nxt_app_nncq_enqueue(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t val)
+{
+ nxt_app_nncq_cycle_t e_cycle, t_cycle;
+ nxt_app_nncq_atomic_t n, t, e, j;
+
+ for ( ;; ) {
+ t = nxt_app_nncq_tail(q);
+ j = nxt_app_nncq_map(q, t);
+ e = q->entries[j];
+
+ e_cycle = nxt_app_nncq_cycle(q, e);
+ t_cycle = nxt_app_nncq_cycle(q, t);
+
+ if (e_cycle == t_cycle) {
+ nxt_app_nncq_tail_cmp_inc(q, t);
+ continue;
+ }
+
+ if (nxt_app_nncq_next_cycle(q, e_cycle) != t_cycle) {
+ continue;
+ }
+
+ n = nxt_app_nncq_new_entry(q, t_cycle, val);
+
+ if (nxt_atomic_cmp_set(&q->entries[j], e, n)) {
+ break;
+ }
+ }
+
+ nxt_app_nncq_tail_cmp_inc(q, t);
+}
+
+
+static nxt_app_nncq_atomic_t
+nxt_app_nncq_dequeue(nxt_app_nncq_t volatile *q)
+{
+ nxt_app_nncq_cycle_t e_cycle, h_cycle;
+ nxt_app_nncq_atomic_t h, j, e;
+
+ for ( ;; ) {
+ h = nxt_app_nncq_head(q);
+ j = nxt_app_nncq_map(q, h);
+ e = q->entries[j];
+
+ e_cycle = nxt_app_nncq_cycle(q, e);
+ h_cycle = nxt_app_nncq_cycle(q, h);
+
+ if (e_cycle != h_cycle) {
+ if (nxt_app_nncq_next_cycle(q, e_cycle) == h_cycle) {
+ return nxt_app_nncq_empty(q);
+ }
+
+ continue;
+ }
+
+ if (nxt_atomic_cmp_set(&q->head, h, h + 1)) {
+ break;
+ }
+ }
+
+ return nxt_app_nncq_index(q, e);
+}
+
+
+#endif /* _NXT_APP_NNCQ_H_INCLUDED_ */
diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h
new file mode 100644
index 00000000..127cb8f3
--- /dev/null
+++ b/src/nxt_app_queue.h
@@ -0,0 +1,119 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_APP_QUEUE_H_INCLUDED_
+#define _NXT_APP_QUEUE_H_INCLUDED_
+
+
+#include <nxt_app_nncq.h>
+
+
+/* Using Numeric Naive Circular Queue as a backend. */
+
+#define NXT_APP_QUEUE_SIZE NXT_APP_NNCQ_SIZE
+#define NXT_APP_QUEUE_MSG_SIZE 31
+
+typedef struct {
+ uint8_t size;
+ uint8_t data[NXT_APP_QUEUE_MSG_SIZE];
+ uint32_t tracking;
+} nxt_app_queue_item_t;
+
+
+typedef struct {
+ nxt_app_nncq_atomic_t nitems;
+ nxt_app_nncq_t free_items;
+ nxt_app_nncq_t queue;
+ nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE];
+} nxt_app_queue_t;
+
+
+nxt_inline void
+nxt_app_queue_init(nxt_app_queue_t volatile *q)
+{
+ nxt_app_nncq_atomic_t i;
+
+ nxt_app_nncq_init(&q->free_items);
+ nxt_app_nncq_init(&q->queue);
+
+ for (i = 0; i < NXT_APP_QUEUE_SIZE; i++) {
+ nxt_app_nncq_enqueue(&q->free_items, i);
+ }
+
+ q->nitems = 0;
+}
+
+
+nxt_inline nxt_int_t
+nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
+ uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
+{
+ nxt_app_queue_item_t *qi;
+ nxt_app_nncq_atomic_t i;
+
+ i = nxt_app_nncq_dequeue(&q->free_items);
+ if (i == nxt_app_nncq_empty(&q->free_items)) {
+ return NXT_AGAIN;
+ }
+
+ qi = (nxt_app_queue_item_t *) &q->items[i];
+
+ qi->size = size;
+ nxt_memcpy(qi->data, p, size);
+ qi->tracking = tracking;
+ *cookie = i;
+
+ nxt_app_nncq_enqueue(&q->queue, i);
+
+ i = nxt_atomic_fetch_add(&q->nitems, 1);
+
+ if (notify != NULL) {
+ *notify = (i == 0);
+ }
+
+ return NXT_OK;
+}
+
+
+nxt_inline nxt_bool_t
+nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
+ uint32_t tracking)
+{
+ nxt_app_queue_item_t *qi;
+
+ qi = (nxt_app_queue_item_t *) &q->items[cookie];
+
+ return nxt_atomic_cmp_set(&qi->tracking, tracking, 0);
+}
+
+
+nxt_inline ssize_t
+nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
+{
+ ssize_t res;
+ nxt_app_queue_item_t *qi;
+ nxt_app_nncq_atomic_t i;
+
+ i = nxt_app_nncq_dequeue(&q->queue);
+ if (i == nxt_app_nncq_empty(&q->queue)) {
+ *cookie = 0;
+ return -1;
+ }
+
+ qi = (nxt_app_queue_item_t *) &q->items[i];
+
+ res = qi->size;
+ nxt_memcpy(p, qi->data, qi->size);
+ *cookie = i;
+
+ nxt_app_nncq_enqueue(&q->free_items, i);
+
+ nxt_atomic_fetch_add(&q->nitems, -1);
+
+ return res;
+}
+
+
+#endif /* _NXT_APP_QUEUE_H_INCLUDED_ */
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index 393c20ac..1968633e 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -98,10 +98,10 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
b = next;
}
- res = nxt_port_socket_twrite(task, req_rpc_data->app_port,
- NXT_PORT_MSG_WEBSOCKET, -1,
- req_rpc_data->stream,
- task->thread->engine->port->id, out, NULL);
+ res = nxt_port_socket_write(task, req_rpc_data->app_port,
+ NXT_PORT_MSG_WEBSOCKET, -1,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, out);
if (nxt_slow_path(res != NXT_OK)) {
// TODO: handle
}
@@ -144,10 +144,10 @@ nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
goto close_handler;
}
- (void) nxt_port_socket_twrite(task, req_rpc_data->app_port,
- NXT_PORT_MSG_WEBSOCKET_LAST,
- -1, req_rpc_data->stream,
- task->thread->engine->port->id, NULL, NULL);
+ (void) nxt_port_socket_write(task, req_rpc_data->app_port,
+ NXT_PORT_MSG_WEBSOCKET_LAST,
+ -1, req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
close_handler:
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 54434d70..c9189d7c 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -8,6 +8,7 @@
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_router.h>
+#include <nxt_port_queue.h>
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
@@ -68,6 +69,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
+ port->queue_fd = -1;
+
} else {
nxt_mp_destroy(mp);
}
@@ -99,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_router_app_port_close(task, port);
}
}
+
+ if (port->queue_fd != -1) {
+ nxt_fd_close(port->queue_fd);
+ port->queue_fd = -1;
+ }
+
+ if (port->queue != NULL) {
+ nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t));
+ port->queue = NULL;
+ }
}
@@ -176,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
+/* TODO join with process_ready and move to nxt_main_process.c */
nxt_inline void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port, uint32_t stream)
@@ -227,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
msg->max_share = port->max_share;
msg->type = new_port->type;
- return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
- new_port->pair[1], stream, 0, b);
+ return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
+ new_port->pair[1], new_port->queue_fd,
+ stream, 0, b);
}
@@ -279,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
msg->u.new_port = port;
}
-
+/* TODO move to nxt_main_process.c */
void
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -304,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
+ if (msg->fd != -1) {
+ port->queue_fd = msg->fd;
+ port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd,
+ 0);
+ }
+
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index ab455f92..9fbf00b1 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -42,6 +42,7 @@ struct nxt_port_handlers_s {
/* Request headers. */
nxt_port_handler_t req_headers;
nxt_port_handler_t req_headers_ack;
+ nxt_port_handler_t req_body;
/* Websocket frame. */
nxt_port_handler_t websocket_frame;
@@ -51,6 +52,8 @@ struct nxt_port_handlers_s {
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
+ nxt_port_handler_t read_queue;
+ nxt_port_handler_t read_socket;
};
@@ -91,12 +94,15 @@ typedef enum {
_NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
_NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
+ _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
_NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
+ _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
+ _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
/ sizeof(nxt_port_handler_t),
@@ -124,6 +130,7 @@ typedef enum {
NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
+ NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
@@ -132,6 +139,8 @@ typedef enum {
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
+ NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
+ NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
} nxt_port_msg_type_t;
@@ -236,6 +245,12 @@ struct nxt_port_s {
nxt_atomic_t use_count;
nxt_process_type_t type;
+
+ nxt_fd_t queue_fd;
+ void *queue;
+
+ void *socket_msg;
+ int from_socket;
};
@@ -286,17 +301,17 @@ void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_read_close(nxt_port_t *port);
-nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
- nxt_buf_t *b, void *tracking);
+nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
+ nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
+ nxt_port_id_t reply_port, nxt_buf_t *b);
nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
nxt_buf_t *b)
{
- return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
- NULL);
+ return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
+ b);
}
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
diff --git a/src/nxt_port_queue.h b/src/nxt_port_queue.h
new file mode 100644
index 00000000..d2b2326b
--- /dev/null
+++ b/src/nxt_port_queue.h
@@ -0,0 +1,102 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_QUEUE_H_INCLUDED_
+#define _NXT_PORT_QUEUE_H_INCLUDED_
+
+
+#include <nxt_nncq.h>
+
+
+/* Using Numeric Naive Circular Queue as a backend. */
+
+#define NXT_PORT_QUEUE_SIZE NXT_NNCQ_SIZE
+#define NXT_PORT_QUEUE_MSG_SIZE 31
+
+
+typedef struct {
+ uint8_t size;
+ uint8_t data[NXT_PORT_QUEUE_MSG_SIZE];
+} nxt_port_queue_item_t;
+
+
+typedef struct {
+ nxt_nncq_atomic_t nitems;
+ nxt_nncq_t free_items;
+ nxt_nncq_t queue;
+ nxt_port_queue_item_t items[NXT_PORT_QUEUE_SIZE];
+} nxt_port_queue_t;
+
+
+nxt_inline void
+nxt_port_queue_init(nxt_port_queue_t volatile *q)
+{
+ nxt_nncq_atomic_t i;
+
+ nxt_nncq_init(&q->free_items);
+ nxt_nncq_init(&q->queue);
+
+ for (i = 0; i < NXT_PORT_QUEUE_SIZE; i++) {
+ nxt_nncq_enqueue(&q->free_items, i);
+ }
+
+ q->nitems = 0;
+}
+
+
+nxt_inline nxt_int_t
+nxt_port_queue_send(nxt_port_queue_t volatile *q, const void *p, uint8_t size,
+ int *notify)
+{
+ nxt_nncq_atomic_t i;
+ nxt_port_queue_item_t *qi;
+
+ i = nxt_nncq_dequeue(&q->free_items);
+ if (i == nxt_nncq_empty(&q->free_items)) {
+ *notify = 0;
+ return NXT_AGAIN;
+ }
+
+ qi = (nxt_port_queue_item_t *) &q->items[i];
+
+ qi->size = size;
+ nxt_memcpy(qi->data, p, size);
+
+ nxt_nncq_enqueue(&q->queue, i);
+
+ i = nxt_atomic_fetch_add(&q->nitems, 1);
+
+ *notify = (i == 0);
+
+ return NXT_OK;
+}
+
+
+nxt_inline ssize_t
+nxt_port_queue_recv(nxt_port_queue_t volatile *q, void *p)
+{
+ ssize_t res;
+ nxt_nncq_atomic_t i;
+ nxt_port_queue_item_t *qi;
+
+ i = nxt_nncq_dequeue(&q->queue);
+ if (i == nxt_nncq_empty(&q->queue)) {
+ return -1;
+ }
+
+ qi = (nxt_port_queue_item_t *) &q->items[i];
+
+ res = qi->size;
+ nxt_memcpy(p, qi->data, qi->size);
+
+ nxt_nncq_enqueue(&q->free_items, i);
+
+ nxt_atomic_fetch_add(&q->nitems, -1);
+
+ return res;
+}
+
+
+#endif /* _NXT_PORT_QUEUE_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 844b65ca..14e2e605 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -5,6 +5,7 @@
*/
#include <nxt_main.h>
+#include <nxt_port_queue.h>
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
@@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
@@ -143,12 +146,15 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
nxt_int_t
-nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
- void *tracking)
+nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
+ nxt_buf_t *b)
{
+ int notify;
+ uint8_t *p;
nxt_int_t res;
nxt_port_send_msg_t msg;
+ uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -156,14 +162,10 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = b;
msg.share = 0;
msg.fd = fd;
- msg.fd2 = -1;
+ msg.fd2 = fd2;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.allocated = 0;
- if (tracking != NULL) {
- nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
- }
-
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
@@ -172,7 +174,42 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mmap = 0;
msg.port_msg.nf = 0;
msg.port_msg.mf = 0;
- msg.port_msg.tracking = tracking != NULL;
+
+ if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
+
+ if (fd == -1
+ && (b == NULL
+ || nxt_buf_mem_used_size(&b->mem)
+ <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
+ {
+ p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
+ if (b != NULL) {
+ p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
+ }
+
+ res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
+
+ nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) (p - qmsg), notify, res);
+
+ if (notify == 0) {
+ return res;
+ }
+
+ msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
+ msg.buf = NULL;
+
+ } else {
+ qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
+
+ res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
+
+ nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ notify, res);
+ }
+ }
res = nxt_port_msg_chk_insert(task, port, &msg);
if (nxt_fast_path(res == NXT_DECLINED)) {
@@ -308,10 +345,6 @@ next_fragment:
port->max_size / PORT_MMAP_MIN_SIZE);
}
- if (msg->port_msg.tracking) {
- iov[0].iov_len += sizeof(msg->tracking_msg);
- }
-
sb.limit -= iov[0].iov_len;
nxt_sendbuf_mem_coalesce(task, &sb);
@@ -368,7 +401,6 @@ next_fragment:
msg->fd2 = -1;
msg->share += n;
msg->port_msg.nf = 1;
- msg->port_msg.tracking = 0;
if (msg->share >= port->max_share) {
msg->share = 0;
@@ -576,7 +608,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->engine = task->thread->engine;
port->socket.read_work_queue = &port->engine->fast_work_queue;
- port->socket.read_handler = nxt_port_read_handler;
+ port->socket.read_handler = port->queue != NULL
+ ? nxt_port_queue_read_handler
+ : nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
nxt_fd_event_enable_read(port->engine, &port->socket);
@@ -660,6 +694,206 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
}
+static void
+nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ struct iovec iov[2];
+ nxt_port_queue_t *queue;
+ nxt_port_recv_msg_t msg, *smsg;
+ uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
+
+ port = nxt_container_of(obj, nxt_port_t, socket);
+ msg.port = port;
+
+ nxt_assert(port->engine == task->thread->engine);
+
+ queue = port->queue;
+ nxt_atomic_fetch_add(&queue->nitems, 1);
+
+ for ( ;; ) {
+
+ if (port->from_socket == 0) {
+ n = nxt_port_queue_recv(queue, qmsg);
+
+ if (n < 0 && !port->socket.read_ready) {
+ nxt_atomic_fetch_add(&queue->nitems, -1);
+
+ n = nxt_port_queue_recv(queue, qmsg);
+ if (n < 0) {
+ return;
+ }
+
+ nxt_atomic_fetch_add(&queue->nitems, 1);
+ }
+
+ if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
+ port->from_socket++;
+
+ nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ port->from_socket);
+
+ n = -1;
+
+ continue;
+ }
+
+ nxt_debug(task, "port{%d,%d} %d: dequeue %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ } else {
+ if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
+ msg.port_msg = smsg->port_msg;
+ b = smsg->buf;
+ n = smsg->size;
+ msg.fd = smsg->fd;
+ msg.fd2 = smsg->fd2;
+
+ smsg->size = 0;
+
+ port->from_socket--;
+
+ nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ goto process;
+ }
+
+ n = -1;
+ }
+
+ if (n < 0 && !port->socket.read_ready) {
+ nxt_atomic_fetch_add(&queue->nitems, -1);
+ return;
+ }
+
+ b = nxt_port_buf_alloc(port);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* TODO: disable event for some time */
+ }
+
+ if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
+ nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
+
+ if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
+ nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
+ n - sizeof(nxt_port_msg_t));
+ }
+
+ } else {
+ iov[0].iov_base = &msg.port_msg;
+ iov[0].iov_len = sizeof(nxt_port_msg_t);
+
+ iov[1].iov_base = b->mem.pos;
+ iov[1].iov_len = port->max_size;
+
+ n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
+
+ if (n == (ssize_t) sizeof(nxt_port_msg_t)
+ && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
+ {
+ nxt_port_buf_free(port, b);
+
+ nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ continue;
+ }
+
+ nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ if (n > 0) {
+ if (port->from_socket == 0) {
+ nxt_debug(task, "port{%d,%d} %d: suspend message %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ smsg = port->socket_msg;
+
+ if (nxt_slow_path(smsg == NULL)) {
+ smsg = nxt_mp_alloc(port->mem_pool,
+ sizeof(nxt_port_recv_msg_t));
+
+ if (nxt_slow_path(smsg == NULL)) {
+ nxt_alert(task, "port{%d,%d} %d: suspend message "
+ "failed",
+ (int) port->pid, (int) port->id,
+ port->socket.fd);
+
+ return;
+ }
+
+ port->socket_msg = smsg;
+
+ } else {
+ if (nxt_slow_path(smsg->size != 0)) {
+ nxt_alert(task, "port{%d,%d} %d: too many suspend "
+ "messages",
+ (int) port->pid, (int) port->id,
+ port->socket.fd);
+
+ return;
+ }
+ }
+
+ smsg->port_msg = msg.port_msg;
+ smsg->buf = b;
+ smsg->size = n;
+ smsg->fd = msg.fd;
+ smsg->fd2 = msg.fd2;
+
+ continue;
+ }
+
+ port->from_socket--;
+ }
+ }
+
+ process:
+
+ if (n > 0) {
+ msg.buf = b;
+ msg.size = n;
+
+ nxt_port_read_msg_process(task, port, &msg);
+
+ /*
+ * To disable instant completion or buffer re-usage,
+ * handler should reset 'msg.buf'.
+ */
+ if (msg.buf == b) {
+ nxt_port_buf_free(port, b);
+ }
+
+ continue;
+ }
+
+ if (n == NXT_AGAIN) {
+ nxt_port_buf_free(port, b);
+
+ nxt_fd_event_enable_read(task->thread->engine, &port->socket);
+
+ continue;
+ }
+
+ /* n == 0 || n == NXT_ERROR */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+ return;
+ }
+}
+
+
typedef struct {
uint32_t stream;
uint32_t pid;
@@ -831,12 +1065,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = orig_b = msg->buf;
b->mem.free += msg->size;
- if (msg->port_msg.tracking) {
- msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
-
- } else {
- msg->cancelled = 0;
- }
+ msg->cancelled = 0;
if (nxt_slow_path(msg->port_msg.nf != 0)) {
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b8e94bcc..3dd0878b 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -15,6 +15,8 @@
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
#include <nxt_router_request.h>
+#include <nxt_app_queue.h>
+#include <nxt_port_queue.h>
typedef struct {
nxt_str_t type;
@@ -92,6 +94,12 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
+static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
+ nxt_port_t *port, nxt_fd_t fd);
static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
static void nxt_router_listen_socket_ready(nxt_task_t *task,
@@ -473,21 +481,25 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
nxt_inline nxt_bool_t
-nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
- uint32_t stream)
+nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *b, *next;
- nxt_bool_t cancelled;
+ nxt_buf_t *b, *next;
+ nxt_bool_t cancelled;
+ nxt_msg_info_t *msg_info;
+
+ msg_info = &req_rpc_data->msg_info;
if (msg_info->buf == NULL) {
return 0;
}
- cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
- stream);
+ cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
+ msg_info->tracking_cookie,
+ req_rpc_data->stream);
if (cancelled) {
- nxt_debug(task, "stream #%uD: cancelled by router", stream);
+ nxt_debug(task, "stream #%uD: cancelled by router",
+ req_rpc_data->stream);
}
for (b = msg_info->buf; b != NULL; b = next) {
@@ -529,7 +541,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
{
nxt_http_request_t *r;
- nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
+ nxt_router_msg_cancel(task, req_rpc_data);
if (req_rpc_data->app_port != NULL) {
nxt_router_app_port_release(task, req_rpc_data->app_port,
@@ -573,6 +585,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
static void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ nxt_int_t res;
nxt_app_t *app;
nxt_port_t *port, *main_app_port;
nxt_runtime_t *rt;
@@ -592,6 +605,17 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
+
+ } else {
+ if (msg->fd2 != -1) {
+ res = nxt_router_port_queue_map(task, port, msg->fd2);
+ if (nxt_slow_path(res != NXT_OK)) {
+ return;
+ }
+
+ nxt_fd_close(msg->fd2);
+ msg->fd2 = -1;
+ }
}
if (msg->port_msg.stream != 0) {
@@ -1523,6 +1547,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
+ ret = nxt_router_app_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
nxt_port_write_enable(task, port);
port->app = app;
@@ -1828,6 +1858,82 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
}
+static nxt_int_t
+nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_app_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_port_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
+{
+ void *mem;
+
+ nxt_assert(fd != -1);
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+
+ return NXT_ERROR;
+ }
+
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
void
nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
nxt_http_action_t *action)
@@ -2748,6 +2854,12 @@ nxt_router_thread_start(void *data)
return;
}
+ ret = nxt_router_port_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return;
+ }
+
engine->port = port;
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
@@ -3670,6 +3782,7 @@ static void
nxt_router_req_headers_ack_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
{
+ int res;
nxt_app_t *app;
nxt_bool_t start_process;
nxt_port_t *app_port, *main_app_port, *idle_port;
@@ -3752,6 +3865,24 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
req_rpc_data->app_port = app_port;
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
+ req_rpc_data->msg_info.body_fd);
+
+ lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
+
+ res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
+ req_rpc_data->msg_info.body_fd,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ r = req_rpc_data->request;
+
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
+ }
+
if (app->timeout != 0) {
r = req_rpc_data->request;
@@ -3886,10 +4017,10 @@ nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
msg->max_share = port->max_share;
msg->type = port->type;
- return nxt_port_socket_twrite(task, app_port,
+ return nxt_port_socket_write2(task, app_port,
NXT_PORT_MSG_NEW_PORT,
- port->pair[0],
- 0, 0, b, NULL);
+ port->pair[0], port->queue_fd,
+ 0, 0, b);
}
@@ -4522,6 +4653,13 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_int_t res;
nxt_port_t *port, *reply_port;
+ int notify;
+ struct {
+ nxt_port_msg_t pm;
+ nxt_port_mmap_msg_t mm;
+ } msg;
+
+
app = req_rpc_data->app;
nxt_assert(app != NULL);
@@ -4529,6 +4667,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
port = req_rpc_data->app_port;
nxt_assert(port != NULL);
+ nxt_assert(port->queue != NULL);
reply_port = task->thread->engine->port;
@@ -4569,20 +4708,38 @@ nxt_router_app_prepare_request(nxt_task_t *task,
req_rpc_data->msg_info.body_fd = -1;
}
- if (req_rpc_data->msg_info.body_fd != -1) {
- nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
- req_rpc_data->msg_info.body_fd);
+ msg.pm.stream = req_rpc_data->stream;
+ msg.pm.pid = reply_port->pid;
+ msg.pm.reply_port = reply_port->id;
+ msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
+ msg.pm.last = 0;
+ msg.pm.mmap = 1;
+ msg.pm.nf = 0;
+ msg.pm.mf = 0;
+ msg.pm.tracking = 0;
- lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
- }
+ nxt_port_mmap_handler_t *mmap_handler = buf->parent;
+ nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
+
+ msg.mm.mmap_id = hdr->id;
+ msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
+ msg.mm.size = nxt_buf_used_size(buf);
- res = nxt_port_socket_twrite(task, port,
- NXT_PORT_MSG_REQ_HEADERS,
- req_rpc_data->msg_info.body_fd,
- req_rpc_data->stream, reply_port->id, buf,
- NULL);
+ res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
+ req_rpc_data->stream, &notify,
+ &req_rpc_data->msg_info.tracking_cookie);
+ if (nxt_fast_path(res == NXT_OK)) {
+ if (notify != 0) {
+ (void) nxt_port_socket_write(task, port,
+ NXT_PORT_MSG_READ_QUEUE,
+ -1, req_rpc_data->stream,
+ reply_port->id, NULL);
- if (nxt_slow_path(res != NXT_OK)) {
+ } else {
+ nxt_debug(task, "queue is not empty");
+ }
+
+ } else {
nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
req_rpc_data->stream, &app->name);
diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h
index 1271520d..95044dbb 100644
--- a/src/nxt_router_request.h
+++ b/src/nxt_router_request.h
@@ -7,10 +7,10 @@
#define _NXT_ROUTER_REQUEST_H_INCLUDED_
-typedef struct nxt_msg_info_s {
+typedef struct {
nxt_buf_t *buf;
nxt_fd_t body_fd;
- nxt_port_mmap_tracking_t tracking;
+ uint32_t tracking_cookie;
nxt_work_handler_t completion_handler;
} nxt_msg_info_t;
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 66aadd98..1008a9d6 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -7,6 +7,8 @@
#include "nxt_main.h"
#include "nxt_port_memory_int.h"
+#include "nxt_port_queue.h"
+#include "nxt_app_queue.h"
#include "nxt_unit.h"
#include "nxt_unit_request.h"
@@ -50,12 +52,15 @@ nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
-static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
+static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
+ int queue_fd);
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id);
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
@@ -92,6 +97,7 @@ static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, int n);
+static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
@@ -103,8 +109,6 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
@@ -124,18 +128,22 @@ static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
- nxt_unit_port_t *port);
+ nxt_unit_port_t *port, int queue_fd);
nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
- nxt_unit_port_t *port);
+ nxt_unit_port_t *port, void *queue);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
@@ -150,18 +158,28 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
+ nxt_unit_read_buf_t *src);
+static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
-static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl);
-static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
- nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
+static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req);
+static nxt_unit_request_info_t *nxt_unit_request_hash_find(
+ nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
@@ -217,6 +235,7 @@ struct nxt_unit_request_info_impl_s {
nxt_unit_req_state_t state;
uint8_t websocket;
+ uint8_t in_hash;
/* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
@@ -349,6 +368,11 @@ struct nxt_unit_port_impl_s {
nxt_queue_t awaiting_req;
int ready;
+
+ void *queue;
+
+ int from_socket;
+ nxt_unit_read_buf_t *socket_rbuf;
};
@@ -375,7 +399,8 @@ typedef struct {
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
- int rc;
+ int rc, queue_fd;
+ void *mem;
uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
@@ -386,6 +411,8 @@ nxt_unit_init(nxt_unit_init_t *init)
return NULL;
}
+ queue_fd = -1;
+
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
&& init->read_port.id.pid != 0)
@@ -422,33 +449,58 @@ nxt_unit_init(nxt_unit_init_t *init)
ctx = &lib->main_ctx.ctx;
- lib->router_port = nxt_unit_add_port(ctx, &router_port);
+ lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
if (nxt_slow_path(lib->router_port == NULL)) {
nxt_unit_alert(NULL, "failed to add router_port");
goto fail;
}
- lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
+ goto fail;
+ }
+
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ nxt_port_queue_init(mem);
+
+ lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
nxt_unit_alert(NULL, "failed to add read_port");
+ munmap(mem, sizeof(nxt_port_queue_t));
+
goto fail;
}
- rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
+ rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
+ munmap(mem, sizeof(nxt_port_queue_t));
+
goto fail;
}
close(ready_port.out_fd);
+ close(queue_fd);
return ctx;
fail:
+ if (queue_fd != -1) {
+ close(queue_fd);
+ }
+
nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
@@ -497,6 +549,7 @@ nxt_unit_create(nxt_unit_init_t *init)
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
@@ -505,6 +558,7 @@ nxt_unit_create(nxt_unit_init_t *init)
if (cb->request_handler == NULL) {
nxt_unit_alert(NULL, "request_handler is NULL");
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
@@ -765,12 +819,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
static int
-nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
+nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = stream;
@@ -783,7 +842,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
+ memset(&cmsg, 0, sizeof(cmsg));
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
+
+ res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
@@ -838,6 +915,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
goto fail;
}
+ nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
+ port_msg->stream, (int) port_msg->type,
+ recv_msg.fd, recv_msg.fd2);
+
recv_msg.stream = port_msg->stream;
recv_msg.pid = port_msg->pid;
recv_msg.reply_port = port_msg->reply_port;
@@ -853,19 +934,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
goto fail;
}
- if (port_msg->tracking) {
- rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
-
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- if (rc == NXT_UNIT_AGAIN) {
- recv_msg.fd = -1;
- recv_msg.fd2 = -1;
- }
-
- goto fail;
- }
- }
-
/* Fragmentation is unsupported. */
if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
@@ -929,6 +997,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
rc = nxt_unit_process_req_headers(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_REQ_BODY:
+ rc = nxt_unit_process_req_body(ctx, &recv_msg);
+ break;
+
case _NXT_PORT_MSG_WEBSOCKET:
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
@@ -992,6 +1064,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
+ void *mem;
nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -1013,9 +1086,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port_msg = recv_msg->start;
- nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
recv_msg->stream, (int) new_port_msg->pid,
- (int) new_port_msg->id, recv_msg->fd);
+ (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -1025,6 +1098,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.in_fd = recv_msg->fd;
new_port.out_fd = -1;
+ mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd2, 0);
+
} else {
nb = 0;
@@ -1041,14 +1117,23 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.in_fd = -1;
new_port.out_fd = recv_msg->fd;
+
+ mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd2, 0);
}
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
+ strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
new_port.data = NULL;
recv_msg->fd = -1;
- port = nxt_unit_add_port(ctx, &new_port);
+ port = nxt_unit_add_port(ctx, &new_port, mem);
if (nxt_slow_path(port == NULL)) {
return NXT_UNIT_ERROR;
}
@@ -1134,6 +1219,7 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
req_impl->websocket = 0;
+ req_impl->in_hash = 0;
nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
(int) r->method_length,
@@ -1151,12 +1237,82 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (nxt_fast_path(res == NXT_UNIT_OK)) {
res = nxt_unit_send_req_headers_ack(req);
- if (nxt_slow_path(res != NXT_UNIT_OK)) {
- return res;
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ return NXT_UNIT_OK;
+ }
+ }
+
+ lib->callbacks.request_handler(req);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ uint64_t l;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_request_info_t *req;
+
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (req == NULL) {
+ return NXT_UNIT_OK;
+ }
+
+ l = req->content_buf->end - req->content_buf->free;
+
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
+ l += b->buf.end - b->buf.free;
+ }
+
+ if (recv_msg->incoming_buf != NULL) {
+ b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
+
+ /* "Move" incoming buffer list to req_impl. */
+ nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
+ recv_msg->incoming_buf = NULL;
+ }
+
+ req->content_fd = recv_msg->fd;
+ recv_msg->fd = -1;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (lib->callbacks.data_handler != NULL) {
+ lib->callbacks.data_handler(req);
+
+ return NXT_UNIT_OK;
+ }
+
+ if (req->content_fd != -1 || l == req->content_length) {
lib->callbacks.request_handler(req);
}
@@ -1260,6 +1416,9 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_queue_insert_tail(&process->ports, &port_impl->link);
port_impl->process = process;
+ port_impl->queue = NULL;
+ port_impl->from_socket = 0;
+ port_impl->socket_rbuf = NULL;
nxt_queue_init(&port_impl->awaiting_req);
@@ -1321,21 +1480,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
size_t hsize;
nxt_unit_impl_t *lib;
nxt_unit_mmap_buf_t *b;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_callbacks_t *cb;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
- req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
- recv_msg->last);
- if (req_impl == NULL) {
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (nxt_slow_path(req == NULL)) {
return NXT_UNIT_OK;
}
- req = &req_impl->req;
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
@@ -1501,12 +1656,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- if (req_impl->websocket) {
- nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
-
- req_impl->websocket = 0;
+ if (req_impl->in_hash) {
+ nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
}
+ req_impl->websocket = 0;
+
while (req_impl->outgoing_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
}
@@ -2170,7 +2325,6 @@ int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
int rc;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
@@ -2193,9 +2347,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
return NXT_UNIT_ERROR;
}
- ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
-
- rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
+ rc = nxt_unit_request_hash_add(req->ctx, req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
@@ -2466,6 +2618,8 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
pthread_mutex_unlock(&ctx_impl->mutex);
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
return rbuf;
}
@@ -2564,6 +2718,8 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+ nxt_unit_req_debug(req, "write: %d", (int) size);
+
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
@@ -2743,9 +2899,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
+ nxt_unit_req_debug(req, "read: %d", (int) buf_res);
+
if (buf_res < (ssize_t) size && req->content_fd != -1) {
res = read(req->content_fd, dst, size);
- if (res < 0) {
+ if (nxt_slow_path(res < 0)) {
nxt_unit_req_alert(req, "failed to read content: %s (%d)",
strerror(errno), errno);
@@ -3301,7 +3459,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
- nxt_port_msg_t *port_msg;
+ int res;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -3313,21 +3471,15 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
-
- nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
-
- if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_ERROR) {
nxt_unit_read_buf_release(ctx, rbuf);
return NXT_UNIT_ERROR;
}
- port_msg = (nxt_port_msg_t *) rbuf->buf;
-
- if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
+ if (nxt_unit_is_shm_ack(rbuf)) {
nxt_unit_read_buf_release(ctx, rbuf);
-
break;
}
@@ -3337,7 +3489,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
pthread_mutex_unlock(&ctx_impl->mutex);
- if (port_msg->type == _NXT_PORT_MSG_QUIT) {
+ if (nxt_unit_is_quit(rbuf)) {
nxt_unit_debug(ctx, "oosm: quit received");
return NXT_UNIT_ERROR;
@@ -3406,7 +3558,6 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
{
int i, fd, rc;
void *mem;
- char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
@@ -3420,59 +3571,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
return NULL;
}
- snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
- lib->pid, (void *) pthread_self());
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
- fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
- nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
-
-#elif (NXT_HAVE_SHM_OPEN_ANON)
-
- fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
- if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
-#elif (NXT_HAVE_SHM_OPEN)
-
- /* Just in case. */
- shm_unlink(name);
-
- fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
- if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
- if (nxt_slow_path(shm_unlink(name) == -1)) {
- nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
- strerror(errno), errno);
- }
-
-#else
-
-#error No working shared memory implementation.
-
-#endif
-
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
- nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
-
goto remove_fail;
}
@@ -3481,6 +3581,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
strerror(errno), errno);
+ close(fd);
+
goto remove_fail;
}
@@ -3533,6 +3635,80 @@ remove_fail:
static int
+nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
+{
+ int fd;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
+ char name[64];
+
+ snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
+ lib->pid, (void *) pthread_self());
+#endif
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN_ANON)
+
+ fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+#elif (NXT_HAVE_SHM_OPEN)
+
+ /* Just in case. */
+ shm_unlink(name);
+
+ fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+ if (nxt_slow_path(shm_unlink(name) == -1)) {
+ nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+ }
+
+#else
+
+#error No working shared memory implementation.
+
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, size) == -1)) {
+ nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ close(fd);
+
+ return -1;
+ }
+
+ return fd;
+}
+
+
+static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
{
ssize_t res;
@@ -3797,61 +3973,6 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
- nxt_unit_read_buf_t *rbuf)
-{
- int res;
- nxt_chunk_id_t c;
- nxt_unit_impl_t *lib;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_tracking_msg_t *tracking_msg;
-
- if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
- recv_msg->stream, (int) recv_msg->size);
-
- return NXT_UNIT_ERROR;
- }
-
- tracking_msg = recv_msg->start;
-
- recv_msg->start = tracking_msg + 1;
- recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->incoming.mutex);
-
- res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
- recv_msg->pid, tracking_msg->mmap_id,
- &hdr, rbuf);
-
- if (nxt_slow_path(res != NXT_UNIT_OK)) {
- return res;
- }
-
- c = tracking_msg->tracking_id;
- res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
-
- if (res == 0) {
- nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
- recv_msg->stream);
-
- nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
-
- res = NXT_UNIT_CANCELLED;
-
- } else {
- res = NXT_UNIT_OK;
- }
-
- pthread_mutex_unlock(&lib->incoming.mutex);
-
- return res;
-}
-
-
-static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
nxt_unit_read_buf_t *rbuf)
@@ -4154,7 +4275,7 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
}
process->pid = pid;
- process->use_count = 1;
+ process->use_count = 2;
process->next_port_id = 0;
process->lib = lib;
@@ -4176,8 +4297,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
break;
}
- nxt_unit_process_use(process);
-
return process;
}
@@ -4293,22 +4412,52 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
static int
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
- int res, err;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- struct pollfd fds[2];
+ int nevents, res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ struct pollfd fds[2];
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
-
if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
- return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+
+ return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
}
+ port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
+ port);
+
retry:
+ if (port_impl->from_socket == 0) {
+ res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ port_impl->from_socket);
+
+ } else {
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+ }
+ }
+
+ res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ return NXT_UNIT_OK;
+ }
+
fds[0].fd = ctx_impl->read_port->in_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
@@ -4317,31 +4466,47 @@ retry:
fds[1].events = POLLIN;
fds[1].revents = 0;
- res = poll(fds, 2, -1);
- if (nxt_slow_path(res < 0)) {
+ nevents = poll(fds, 2, -1);
+ if (nxt_slow_path(nevents == -1)) {
err = errno;
if (err == EINTR) {
goto retry;
}
- nxt_unit_alert(ctx, "poll() failed: %s (%d)",
- strerror(err), err);
+ nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
+ fds[0].fd, fds[1].fd, strerror(err), err);
rbuf->size = -1;
return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
+ nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
+
if ((fds[0].revents & POLLIN) != 0) {
- return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ return res;
}
if ((fds[1].revents & POLLIN) != 0) {
- return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
+ res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ return res;
}
- rbuf->size = -1;
+ nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
return NXT_UNIT_ERROR;
}
@@ -4392,9 +4557,11 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
static void
nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
+ int res;
nxt_queue_t ready_req;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req);
@@ -4419,7 +4586,35 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
- (void) nxt_unit_send_req_headers_ack(&req_impl->req);
+ req = &req_impl->req;
+
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ continue;
+ }
+ }
lib->callbacks.request_handler(&req_impl->req);
@@ -4432,6 +4627,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
@@ -4442,11 +4638,30 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
+ retry:
+
+ rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
+
+ nxt_unit_process_ready_req(ctx);
}
nxt_unit_ctx_release(ctx);
@@ -4455,11 +4670,68 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
}
+nxt_inline int
+nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
+{
+ if (nxt_fast_path(rbuf->size == 1)) {
+ return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_QUIT;
+ }
+
+ return 0;
+}
+
+
int
nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_use(ctx);
@@ -4467,11 +4739,35 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
+
+ retry:
+ rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ break;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
+
+ nxt_unit_process_ready_req(ctx);
}
nxt_unit_ctx_release(ctx);
@@ -4499,6 +4795,7 @@ static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
+ nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
rbuf = nxt_unit_read_buf_get(ctx);
@@ -4506,10 +4803,18 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
return NXT_UNIT_ERROR;
}
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- rc = nxt_unit_port_recv(ctx, port, rbuf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+retry:
+
+ if (port == lib->shared_port) {
+ rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
+
+ } else {
+ rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
+ }
+
+ if (rc != NXT_UNIT_OK) {
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
@@ -4526,6 +4831,15 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_process_ready_req(ctx);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (lib->online) {
+ goto retry;
+ }
+
return rc;
}
@@ -4540,10 +4854,12 @@ nxt_unit_done(nxt_unit_ctx_t *ctx)
nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *port;
- nxt_unit_ctx_impl_t *new_ctx;
+ int rc, queue_fd;
+ void *mem;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *new_ctx;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4554,33 +4870,57 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
return NULL;
}
+ rc = nxt_unit_ctx_init(lib, new_ctx, data);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ free(new_ctx);
+
+ return NULL;
+ }
+
+ queue_fd = -1;
+
port = nxt_unit_create_port(ctx);
if (nxt_slow_path(port == NULL)) {
- free(new_ctx);
+ goto fail;
+ }
- return NULL;
+ new_ctx->read_port = port;
+
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
+ goto fail;
}
- rc = nxt_unit_send_port(ctx, lib->router_port, port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
+
goto fail;
}
- rc = nxt_unit_ctx_init(lib, new_ctx, data);
+ nxt_port_queue_init(mem);
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ port_impl->queue = mem;
+
+ rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
- new_ctx->read_port = port;
+ close(queue_fd);
return &new_ctx->ctx;
fail:
- nxt_unit_remove_port(lib, &port->id);
- nxt_unit_port_release(port);
+ if (queue_fd != -1) {
+ close(queue_fd);
+ }
- free(new_ctx);
+ nxt_unit_ctx_release(&new_ctx->ctx);
return NULL;
}
@@ -4633,6 +4973,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
nxt_queue_remove(&ctx_impl->link);
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
+ nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
nxt_unit_port_release(ctx_impl->read_port);
}
@@ -4709,10 +5050,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
nxt_unit_process_release(process);
- port = nxt_unit_add_port(ctx, &new_port);
+ port = nxt_unit_add_port(ctx, &new_port, NULL);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_alert(ctx, "create_port: add_port() failed");
-
close(port_sockets[0]);
close(port_sockets[1]);
}
@@ -4723,10 +5062,11 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
static int
nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
- nxt_unit_port_t *port)
+ nxt_unit_port_t *port, int queue_fd)
{
ssize_t res;
nxt_unit_impl_t *lib;
+ int fds[2] = { port->out_fd, queue_fd };
struct {
nxt_port_msg_t msg;
@@ -4735,7 +5075,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
union {
struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
+ char space[CMSG_SPACE(sizeof(int) * 2)];
} cmsg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4758,7 +5098,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
memset(&cmsg, 0, sizeof(cmsg));
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
@@ -4771,7 +5111,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
- memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
+ memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
@@ -4799,7 +5139,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
if (c == 1) {
- nxt_unit_debug(NULL, "destroy port %d,%d",
+ nxt_unit_debug(NULL, "destroy port{%d,%d}",
(int) port->id.pid, (int) port->id.id);
nxt_unit_process_release(port_impl->process);
@@ -4816,13 +5156,31 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
port->out_fd = -1;
}
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+
+ port->out_fd = -1;
+ }
+
+ if (port_impl->queue != NULL) {
+ munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
+ ? sizeof(nxt_app_queue_t)
+ : sizeof(nxt_port_queue_t));
+ }
+
free(port_impl);
}
}
static nxt_unit_port_t *
-nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
{
int rc;
nxt_queue_t awaiting_req;
@@ -4840,9 +5198,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
if (nxt_slow_path(old_port != NULL)) {
- nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
- port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
+ "in_fd %d out_fd %d queue %p",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd, queue);
if (old_port->data == NULL) {
old_port->data = port->data;
@@ -4875,6 +5234,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+ if (old_port_impl->queue == NULL) {
+ old_port_impl->queue = queue;
+ }
+
if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
nxt_queue_init(&old_port_impl->awaiting_req);
@@ -4914,9 +5277,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = NULL;
- nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
+ nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ port->in_fd, port->out_fd, queue);
process = nxt_unit_process_get(lib, port->id.pid);
if (nxt_slow_path(process == NULL)) {
@@ -4929,6 +5292,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = malloc(sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
+ nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
+ port->id.pid, port->id.id);
+
goto unlock;
}
@@ -4951,6 +5317,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port->use_count = 2;
new_port->process = process;
new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+ new_port->queue = queue;
+ new_port->from_socket = 0;
+ new_port->socket_rbuf = NULL;
nxt_queue_init(&new_port->awaiting_req);
@@ -5010,13 +5379,13 @@ nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
(int) port_id->pid, (int) port_id->id);
return NULL;
}
- nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
port->in_fd, port->out_fd, port->data);
@@ -5089,10 +5458,12 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- lib->online = 0;
+ if (lib->online) {
+ lib->online = 0;
- if (lib->callbacks.quit != NULL) {
- lib->callbacks.quit(ctx);
+ if (lib->callbacks.quit != NULL) {
+ lib->callbacks.quit(ctx);
+ }
}
}
@@ -5137,20 +5508,91 @@ static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
- nxt_unit_impl_t *lib;
-
- nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
- (int) port->id.pid, (int) port->id.id, port->out_fd);
+ int notify;
+ ssize_t ret;
+ nxt_int_t rc;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ if (port_impl->queue != NULL && oob_size == 0
+ && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
+ {
+ rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) buf_size, notify);
+
+ if (notify) {
+ memcpy(&msg, buf, sizeof(nxt_port_msg_t));
+
+ msg.type = _NXT_PORT_MSG_READ_QUEUE;
+
+ if (lib->callbacks.port_send == NULL) {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = lib->callbacks.port_send(ctx, port, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+ }
+
+ }
+
+ return buf_size;
+ }
+
+ if (port_impl->queue != NULL) {
+ msg.type = _NXT_PORT_MSG_READ_SOCKET;
+
+ rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
+ (int) port->id.pid, (int) port->id.id, notify);
+ }
+
if (lib->callbacks.port_send != NULL) {
- return lib->callbacks.port_send(ctx, port, buf, buf_size,
- oob, oob_size);
+ ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
}
- return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
- oob, oob_size);
+ return ret;
}
@@ -5158,6 +5600,7 @@ static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
+ int err;
ssize_t res;
struct iovec iov[1];
struct msghdr msg;
@@ -5178,7 +5621,9 @@ retry:
res = sendmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
@@ -5187,7 +5632,7 @@ retry:
* implementation.
*/
nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
- fd, (int) buf_size, strerror(errno), errno);
+ fd, (int) buf_size, strerror(err), err);
} else {
nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
@@ -5199,6 +5644,158 @@ retry:
static int
+nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res, read;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ read = 0;
+
+retry:
+
+ if (port_impl->from_socket > 0) {
+ if (port_impl->socket_rbuf != NULL
+ && port_impl->socket_rbuf->size > 0)
+ {
+ port_impl->from_socket--;
+
+ nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
+ port_impl->socket_rbuf->size = 0;
+
+ nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+
+ } else {
+ res = nxt_unit_port_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) port->id.pid, (int) port->id.id,
+ port_impl->from_socket);
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+ }
+
+ if (read) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ read = 1;
+
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ if (port_impl->from_socket) {
+ nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
+ }
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (res == NXT_UNIT_AGAIN) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ if (port_impl->from_socket > 0) {
+ port_impl->from_socket--;
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (port_impl->socket_rbuf == NULL) {
+ port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
+
+ if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ port_impl->socket_rbuf->size = 0;
+ }
+
+ if (port_impl->socket_rbuf->size > 0) {
+ nxt_unit_alert(ctx, "too many port socket messages");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ goto retry;
+}
+
+
+nxt_inline void
+nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
+{
+ memcpy(dst->buf, src->buf, src->size);
+ dst->size = src->size;
+ memcpy(dst->oob, src->oob, sizeof(src->oob));
+}
+
+
+static int
+nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
+
+retry:
+
+ res = nxt_unit_app_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_AGAIN) {
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ goto retry;
+ }
+ }
+
+ return res;
+}
+
+
+static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
@@ -5214,6 +5811,9 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
+ nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
if (nxt_slow_path(rbuf->size < 0)) {
return NXT_UNIT_ERROR;
}
@@ -5247,13 +5847,13 @@ retry:
if (err == EAGAIN) {
nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
- fd, strerror(errno), errno);
+ fd, strerror(err), err);
return NXT_UNIT_AGAIN;
}
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
- fd, strerror(errno), errno);
+ fd, strerror(err), err);
return NXT_UNIT_ERROR;
}
@@ -5264,6 +5864,52 @@ retry:
}
+static int
+nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ uint32_t cookie;
+ nxt_port_msg_t *port_msg;
+ nxt_app_queue_t *queue;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ queue = port_impl->queue;
+
+retry:
+
+ rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
+
+ nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
+
+ if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
+
+ goto retry;
+ }
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -5392,12 +6038,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
static int
-nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl)
+nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req)
{
- uint32_t *stream;
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ uint32_t *stream;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ if (req_impl->in_hash) {
+ return NXT_UNIT_OK;
+ }
stream = &req_impl->stream;
@@ -5409,11 +6062,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
lhq.replace = 0;
lhq.value = req_impl;
- res = nxt_lvlhsh_insert(request_hash, &lhq);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
switch (res) {
case NXT_OK:
+ req_impl->in_hash = 1;
return NXT_UNIT_OK;
default:
@@ -5422,12 +6082,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
}
-static nxt_unit_request_info_impl_t *
-nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
- int remove)
+static nxt_unit_request_info_t *
+nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
{
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
lhq.key.length = sizeof(stream);
@@ -5435,16 +6096,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
lhq.proto = &lvlhsh_requests_proto;
lhq.pool = NULL;
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (remove) {
- res = nxt_lvlhsh_delete(request_hash, &lhq);
+ res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
} else {
- res = nxt_lvlhsh_find(request_hash, &lhq);
+ res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
}
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
switch (res) {
case NXT_OK:
+ req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
+ req);
+ req_impl->in_hash = 0;
+
return lhq.value;
default:
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 0f16773f..67244cf4 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -121,6 +121,8 @@ struct nxt_unit_callbacks_s {
*/
void (*request_handler)(nxt_unit_request_info_t *req);
+ void (*data_handler)(nxt_unit_request_info_t *req);
+
/* Process websocket frame. */
void (*websocket_handler)(nxt_unit_websocket_frame_t *ws);