summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c276
1 files changed, 249 insertions, 27 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index ddfd9c80..c1ef977f 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
@@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s {
nxt_unit_req_state_t state;
uint8_t websocket;
+ /* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
+ /* for nxt_unit_port_impl_t.awaiting_req */
+ nxt_queue_link_t port_wait_link;
char extra_data[];
};
@@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
nxt_atomic_t use_count;
+ nxt_atomic_t wait_items;
pthread_mutex_t mutex;
@@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t ready_req;
+
nxt_unit_read_buf_t *pending_read_head;
nxt_unit_read_buf_t **pending_read_tail;
nxt_unit_read_buf_t *free_read_buf;
@@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s {
nxt_queue_link_t link;
nxt_unit_process_t *process;
+
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t awaiting_req;
+
+ int ready;
};
@@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
ctx_impl->use_count = 1;
+ ctx_impl->wait_items = 0;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
+ nxt_queue_init(&ctx_impl->ready_req);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
@@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
+ int res;
nxt_unit_impl_t *lib;
- nxt_unit_port_t *port;
nxt_unit_port_id_t port_id;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
@@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(port == NULL)) {
- nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
- recv_msg->stream,
- (int) recv_msg->pid, (int) recv_msg->reply_port);
-
- return NXT_UNIT_ERROR;
- }
-
req = &req_impl->req;
- req->response_port = port;
-
req->request = recv_msg->start;
b = recv_msg->incoming_buf;
@@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
- lib->callbacks.request_handler(req);
+ nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
+
+ res = nxt_unit_request_check_response_port(req, &port_id);
+
+ if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(req);
+ }
return NXT_UNIT_OK;
}
static int
+nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id)
+{
+ int res;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx = req->ctx;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ if (nxt_fast_path(port != NULL)) {
+ req->response_port = port;
+
+ if (nxt_fast_path(port_impl->ready)) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "check_response_port: "
+ "port{%d,%d} already requested",
+ (int) port->id.pid, (int) port->id.id);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+ }
+
+ port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ if (nxt_slow_path(port_impl == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
+ (int) sizeof(nxt_unit_port_impl_t));
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port = &port_impl->port;
+
+ port->id = *port_id;
+ port->in_fd = -1;
+ port->out_fd = -1;
+ port->data = NULL;
+
+ res = nxt_unit_port_hash_add(&lib->ports, port);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
+ port->id.pid, port->id.id);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
+
+ port_impl->process = req_impl->process;
+
+
+ nxt_queue_init(&port_impl->awaiting_req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
+
+ port_impl->use_count = 2;
+ port_impl->ready = 0;
+
+ req->response_port = port;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_use(port_impl->process);
+
+ res = nxt_unit_get_port(ctx, port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
nxt_unit_read_buf_release(ctx, rbuf);
+ nxt_unit_process_ready_req(ctx_impl);
+
nxt_unit_ctx_release(ctx_impl);
return rc;
@@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
}
+static void
+nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_queue_t ready_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_init(&ready_req);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return;
+ }
+
+ nxt_queue_add(&ready_req, &ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->ready_req);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_queue_each(req_impl, &ready_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(&req_impl->req);
+
+ } nxt_queue_loop;
+}
+
+
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
@@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port;
+ int rc;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
*port = *old_port;
+ nxt_queue_init(&awaiting_req);
+
+ old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
+
+ old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+
pthread_mutex_unlock(&lib->mutex);
if (lib->callbacks.add_port != NULL
@@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
lib->callbacks.add_port(ctx, old_port);
}
+ nxt_queue_each(req_impl, &awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
return old_port;
}
@@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port->use_count = 2;
new_port->process = process;
+ new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ nxt_queue_init(&new_port->awaiting_req);
process = NULL;
@@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
}
+static int
+nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_port_t get_port;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_PORT;
+
+ m.get_port.id = port_id->id;
+ m.get_port.pid = port_id->pid;
+
+ nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
+ (int) port_id->id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)