summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.h
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2020-08-13 19:28:27 +0300
committerAndrei Belov <defan@nginx.com>2020-08-13 19:28:27 +0300
commitaff76e4f90b4e948c327ce2b021dc3203c33cbcd (patch)
tree5bd6ac3aa92683777548472984c209bf26d8a971 /src/nxt_port.h
parent04ce9f997e0e49e57ce4b5fc4aa98134232a1974 (diff)
parent6473d4b65a99aa10d509220fb99d8c4f65631ed0 (diff)
downloadunit-1.19.0-1.tar.gz
unit-1.19.0-1.tar.bz2
Merged with the default branch.1.19.0-1
Diffstat (limited to '')
-rw-r--r--src/nxt_port.h56
1 files changed, 45 insertions, 11 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 0e8707f3..3ac8c735 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -25,7 +25,9 @@ struct nxt_port_handlers_s {
/* File descriptor exchange. */
nxt_port_handler_t change_file;
nxt_port_handler_t new_port;
+ nxt_port_handler_t get_port;
nxt_port_handler_t mmap;
+ nxt_port_handler_t get_mmap;
/* New process */
nxt_port_handler_t process_created;
@@ -39,6 +41,8 @@ struct nxt_port_handlers_s {
/* Request headers. */
nxt_port_handler_t req_headers;
+ nxt_port_handler_t req_headers_ack;
+ nxt_port_handler_t req_body;
/* Websocket frame. */
nxt_port_handler_t websocket_frame;
@@ -48,6 +52,8 @@ struct nxt_port_handlers_s {
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
+ nxt_port_handler_t read_queue;
+ nxt_port_handler_t read_socket;
};
@@ -77,7 +83,9 @@ typedef enum {
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
+ _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
+ _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
_NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
@@ -85,12 +93,16 @@ typedef enum {
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
_NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
+ _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
+ _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
_NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
+ _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
+ _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
/ sizeof(nxt_port_handler_t),
@@ -107,8 +119,10 @@ typedef enum {
NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
+ NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
- | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
+ | NXT_PORT_MSG_SYNC,
+ NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
@@ -116,6 +130,7 @@ typedef enum {
NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
+ NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
@@ -124,6 +139,8 @@ typedef enum {
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
+ NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
+ NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
} nxt_port_msg_type_t;
@@ -156,7 +173,7 @@ typedef struct {
nxt_queue_link_t link;
nxt_buf_t *buf;
size_t share;
- nxt_fd_t fd;
+ nxt_fd_t fd[2];
nxt_port_msg_t port_msg;
uint32_t tracking_msg[2];
uint8_t close_fd; /* 1 bit */
@@ -165,7 +182,7 @@ typedef struct {
struct nxt_port_recv_msg_s {
- nxt_fd_t fd;
+ nxt_fd_t fd[2];
nxt_buf_t *buf;
nxt_port_t *port;
nxt_port_msg_t port_msg;
@@ -188,6 +205,7 @@ struct nxt_port_s {
nxt_queue_link_t app_link; /* for nxt_app_t.ports */
nxt_app_t *app;
+ nxt_port_t *main_app_port;
nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
nxt_msec_t idle_start;
@@ -200,11 +218,10 @@ struct nxt_port_s {
/* Maximum interleave of message parts. */
uint32_t max_share;
- uint32_t app_pending_responses;
uint32_t app_responses;
- nxt_queue_t pending_requests;
- nxt_queue_t active_websockets;
+ uint32_t active_websockets;
+ uint32_t active_requests;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
@@ -226,6 +243,12 @@ struct nxt_port_s {
nxt_atomic_t use_count;
nxt_process_type_t type;
+
+ nxt_fd_t queue_fd;
+ void *queue;
+
+ void *socket_msg;
+ int from_socket;
};
@@ -238,6 +261,17 @@ typedef struct {
} nxt_port_msg_new_port_t;
+typedef struct {
+ nxt_port_id_t id;
+ nxt_pid_t pid;
+} nxt_port_msg_get_port_t;
+
+
+typedef struct {
+ uint32_t id;
+} nxt_port_msg_get_mmap_t;
+
+
/*
* nxt_port_data_t size is allocation size
* which enables effective reuse of memory pool cache.
@@ -265,17 +299,17 @@ void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_read_close(nxt_port_t *port);
-nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
- nxt_buf_t *b, void *tracking);
+nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
+ nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
+ nxt_port_id_t reply_port, nxt_buf_t *b);
nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
nxt_buf_t *b)
{
- return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
- NULL);
+ return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
+ b);
}
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,