summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c201
1 files changed, 179 insertions, 22 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b8e94bcc..3dd0878b 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -15,6 +15,8 @@
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
#include <nxt_router_request.h>
+#include <nxt_app_queue.h>
+#include <nxt_port_queue.h>
typedef struct {
nxt_str_t type;
@@ -92,6 +94,12 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
+static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
+ nxt_port_t *port, nxt_fd_t fd);
static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
static void nxt_router_listen_socket_ready(nxt_task_t *task,
@@ -473,21 +481,25 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
nxt_inline nxt_bool_t
-nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
- uint32_t stream)
+nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *b, *next;
- nxt_bool_t cancelled;
+ nxt_buf_t *b, *next;
+ nxt_bool_t cancelled;
+ nxt_msg_info_t *msg_info;
+
+ msg_info = &req_rpc_data->msg_info;
if (msg_info->buf == NULL) {
return 0;
}
- cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
- stream);
+ cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
+ msg_info->tracking_cookie,
+ req_rpc_data->stream);
if (cancelled) {
- nxt_debug(task, "stream #%uD: cancelled by router", stream);
+ nxt_debug(task, "stream #%uD: cancelled by router",
+ req_rpc_data->stream);
}
for (b = msg_info->buf; b != NULL; b = next) {
@@ -529,7 +541,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
{
nxt_http_request_t *r;
- nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
+ nxt_router_msg_cancel(task, req_rpc_data);
if (req_rpc_data->app_port != NULL) {
nxt_router_app_port_release(task, req_rpc_data->app_port,
@@ -573,6 +585,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
static void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ nxt_int_t res;
nxt_app_t *app;
nxt_port_t *port, *main_app_port;
nxt_runtime_t *rt;
@@ -592,6 +605,17 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
+
+ } else {
+ if (msg->fd2 != -1) {
+ res = nxt_router_port_queue_map(task, port, msg->fd2);
+ if (nxt_slow_path(res != NXT_OK)) {
+ return;
+ }
+
+ nxt_fd_close(msg->fd2);
+ msg->fd2 = -1;
+ }
}
if (msg->port_msg.stream != 0) {
@@ -1523,6 +1547,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
+ ret = nxt_router_app_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
nxt_port_write_enable(task, port);
port->app = app;
@@ -1828,6 +1858,82 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
}
+static nxt_int_t
+nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_app_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_port_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
+{
+ void *mem;
+
+ nxt_assert(fd != -1);
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+
+ return NXT_ERROR;
+ }
+
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
void
nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
nxt_http_action_t *action)
@@ -2748,6 +2854,12 @@ nxt_router_thread_start(void *data)
return;
}
+ ret = nxt_router_port_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return;
+ }
+
engine->port = port;
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
@@ -3670,6 +3782,7 @@ static void
nxt_router_req_headers_ack_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
{
+ int res;
nxt_app_t *app;
nxt_bool_t start_process;
nxt_port_t *app_port, *main_app_port, *idle_port;
@@ -3752,6 +3865,24 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
req_rpc_data->app_port = app_port;
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
+ req_rpc_data->msg_info.body_fd);
+
+ lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
+
+ res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
+ req_rpc_data->msg_info.body_fd,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ r = req_rpc_data->request;
+
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
+ }
+
if (app->timeout != 0) {
r = req_rpc_data->request;
@@ -3886,10 +4017,10 @@ nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
msg->max_share = port->max_share;
msg->type = port->type;
- return nxt_port_socket_twrite(task, app_port,
+ return nxt_port_socket_write2(task, app_port,
NXT_PORT_MSG_NEW_PORT,
- port->pair[0],
- 0, 0, b, NULL);
+ port->pair[0], port->queue_fd,
+ 0, 0, b);
}
@@ -4522,6 +4653,13 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_int_t res;
nxt_port_t *port, *reply_port;
+ int notify;
+ struct {
+ nxt_port_msg_t pm;
+ nxt_port_mmap_msg_t mm;
+ } msg;
+
+
app = req_rpc_data->app;
nxt_assert(app != NULL);
@@ -4529,6 +4667,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
port = req_rpc_data->app_port;
nxt_assert(port != NULL);
+ nxt_assert(port->queue != NULL);
reply_port = task->thread->engine->port;
@@ -4569,20 +4708,38 @@ nxt_router_app_prepare_request(nxt_task_t *task,
req_rpc_data->msg_info.body_fd = -1;
}
- if (req_rpc_data->msg_info.body_fd != -1) {
- nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
- req_rpc_data->msg_info.body_fd);
+ msg.pm.stream = req_rpc_data->stream;
+ msg.pm.pid = reply_port->pid;
+ msg.pm.reply_port = reply_port->id;
+ msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
+ msg.pm.last = 0;
+ msg.pm.mmap = 1;
+ msg.pm.nf = 0;
+ msg.pm.mf = 0;
+ msg.pm.tracking = 0;
- lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
- }
+ nxt_port_mmap_handler_t *mmap_handler = buf->parent;
+ nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
+
+ msg.mm.mmap_id = hdr->id;
+ msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
+ msg.mm.size = nxt_buf_used_size(buf);
- res = nxt_port_socket_twrite(task, port,
- NXT_PORT_MSG_REQ_HEADERS,
- req_rpc_data->msg_info.body_fd,
- req_rpc_data->stream, reply_port->id, buf,
- NULL);
+ res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
+ req_rpc_data->stream, &notify,
+ &req_rpc_data->msg_info.tracking_cookie);
+ if (nxt_fast_path(res == NXT_OK)) {
+ if (notify != 0) {
+ (void) nxt_port_socket_write(task, port,
+ NXT_PORT_MSG_READ_QUEUE,
+ -1, req_rpc_data->stream,
+ reply_port->id, NULL);
- if (nxt_slow_path(res != NXT_OK)) {
+ } else {
+ nxt_debug(task, "queue is not empty");
+ }
+
+ } else {
nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
req_rpc_data->stream, &app->name);