diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 201 |
1 files changed, 179 insertions, 22 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index b8e94bcc..3dd0878b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -15,6 +15,8 @@ #include <nxt_unit_request.h> #include <nxt_unit_response.h> #include <nxt_router_request.h> +#include <nxt_app_queue.h> +#include <nxt_port_queue.h> typedef struct { nxt_str_t type; @@ -92,6 +94,12 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, + nxt_port_t *port, nxt_fd_t fd); static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); static void nxt_router_listen_socket_ready(nxt_task_t *task, @@ -473,21 +481,25 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_inline nxt_bool_t -nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, - uint32_t stream) +nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *b, *next; - nxt_bool_t cancelled; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; + nxt_msg_info_t *msg_info; + + msg_info = &req_rpc_data->msg_info; if (msg_info->buf == NULL) { return 0; } - cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, - stream); + cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", stream); + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); } for (b = msg_info->buf; b != NULL; b = next) { @@ -529,7 +541,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, { nxt_http_request_t *r; - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); + nxt_router_msg_cancel(task, req_rpc_data); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -573,6 +585,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_int_t res; nxt_app_t *app; nxt_port_t *port, *main_app_port; nxt_runtime_t *rt; @@ -592,6 +605,17 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; + + } else { + if (msg->fd2 != -1) { + res = nxt_router_port_queue_map(task, port, msg->fd2); + if (nxt_slow_path(res != NXT_OK)) { + return; + } + + nxt_fd_close(msg->fd2); + msg->fd2 = -1; + } } if (msg->port_msg.stream != 0) { @@ -1523,6 +1547,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } + ret = nxt_router_app_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + nxt_port_write_enable(task, port); port->app = app; @@ -1828,6 +1858,82 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } +static nxt_int_t +nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_app_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_port_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) +{ + void *mem; + + nxt_assert(fd != -1); + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + + return NXT_ERROR; + } + + port->queue = mem; + + return NXT_OK; +} + + void nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, nxt_http_action_t *action) @@ -2748,6 +2854,12 @@ nxt_router_thread_start(void *data) return; } + ret = nxt_router_port_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return; + } + engine->port = port; nxt_port_enable(task, port, &nxt_router_app_port_handlers); @@ -3670,6 +3782,7 @@ static void nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) { + int res; nxt_app_t *app; nxt_bool_t start_process; nxt_port_t *app_port, *main_app_port, *idle_port; @@ -3752,6 +3865,24 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, req_rpc_data->app_port = app_port; + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); + + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); + + res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + + if (nxt_slow_path(res != NXT_OK)) { + r = req_rpc_data->request; + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + } + if (app->timeout != 0) { r = req_rpc_data->request; @@ -3886,10 +4017,10 @@ nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) msg->max_share = port->max_share; msg->type = port->type; - return nxt_port_socket_twrite(task, app_port, + return nxt_port_socket_write2(task, app_port, NXT_PORT_MSG_NEW_PORT, - port->pair[0], - 0, 0, b, NULL); + port->pair[0], port->queue_fd, + 0, 0, b); } @@ -4522,6 +4653,13 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_int_t res; nxt_port_t *port, *reply_port; + int notify; + struct { + nxt_port_msg_t pm; + nxt_port_mmap_msg_t mm; + } msg; + + app = req_rpc_data->app; nxt_assert(app != NULL); @@ -4529,6 +4667,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, port = req_rpc_data->app_port; nxt_assert(port != NULL); + nxt_assert(port->queue != NULL); reply_port = task->thread->engine->port; @@ -4569,20 +4708,38 @@ nxt_router_app_prepare_request(nxt_task_t *task, req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->msg_info.body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, - req_rpc_data->msg_info.body_fd); + msg.pm.stream = req_rpc_data->stream; + msg.pm.pid = reply_port->pid; + msg.pm.reply_port = reply_port->id; + msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; + msg.pm.last = 0; + msg.pm.mmap = 1; + msg.pm.nf = 0; + msg.pm.mf = 0; + msg.pm.tracking = 0; - lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); - } + nxt_port_mmap_handler_t *mmap_handler = buf->parent; + nxt_port_mmap_header_t *hdr = mmap_handler->hdr; + + msg.mm.mmap_id = hdr->id; + msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); + msg.mm.size = nxt_buf_used_size(buf); - res = nxt_port_socket_twrite(task, port, - NXT_PORT_MSG_REQ_HEADERS, - req_rpc_data->msg_info.body_fd, - req_rpc_data->stream, reply_port->id, buf, - NULL); + res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), + req_rpc_data->stream, ¬ify, + &req_rpc_data->msg_info.tracking_cookie); + if (nxt_fast_path(res == NXT_OK)) { + if (notify != 0) { + (void) nxt_port_socket_write(task, port, + NXT_PORT_MSG_READ_QUEUE, + -1, req_rpc_data->stream, + reply_port->id, NULL); - if (nxt_slow_path(res != NXT_OK)) { + } else { + nxt_debug(task, "queue is not empty"); + } + + } else { nxt_alert(task, "stream #%uD, app '%V': failed to send app message", req_rpc_data->stream, &app->name); |