summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:10 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:10 +0300
commit3cbc22a6dc45abdeade4deb364601230ddca02c1 (patch)
tree3eff409de1a4405c646a2c11633d50711fa22746 /src/nxt_unit.c
parentbf647588ff781e606651f001b53a4e83bb34c000 (diff)
downloadunit-3cbc22a6dc45abdeade4deb364601230ddca02c1.tar.gz
unit-3cbc22a6dc45abdeade4deb364601230ddca02c1.tar.bz2
Changing router to application port exchange protocol.
The application process needs to request the port from the router instead of the latter pushing the port before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router to use the application shared port and then the queue.
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c276
1 files changed, 249 insertions, 27 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index ddfd9c80..c1ef977f 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
@@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s {
nxt_unit_req_state_t state;
uint8_t websocket;
+ /* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
+ /* for nxt_unit_port_impl_t.awaiting_req */
+ nxt_queue_link_t port_wait_link;
char extra_data[];
};
@@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
nxt_atomic_t use_count;
+ nxt_atomic_t wait_items;
pthread_mutex_t mutex;
@@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t ready_req;
+
nxt_unit_read_buf_t *pending_read_head;
nxt_unit_read_buf_t **pending_read_tail;
nxt_unit_read_buf_t *free_read_buf;
@@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s {
nxt_queue_link_t link;
nxt_unit_process_t *process;
+
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t awaiting_req;
+
+ int ready;
};
@@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
ctx_impl->use_count = 1;
+ ctx_impl->wait_items = 0;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
+ nxt_queue_init(&ctx_impl->ready_req);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
@@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
+ int res;
nxt_unit_impl_t *lib;
- nxt_unit_port_t *port;
nxt_unit_port_id_t port_id;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
@@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(port == NULL)) {
- nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
- recv_msg->stream,
- (int) recv_msg->pid, (int) recv_msg->reply_port);
-
- return NXT_UNIT_ERROR;
- }
-
req = &req_impl->req;
- req->response_port = port;
-
req->request = recv_msg->start;
b = recv_msg->incoming_buf;
@@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
- lib->callbacks.request_handler(req);
+ nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
+
+ res = nxt_unit_request_check_response_port(req, &port_id);
+
+ if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(req);
+ }
return NXT_UNIT_OK;
}
static int
+nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id)
+{
+ int res;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx = req->ctx;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ if (nxt_fast_path(port != NULL)) {
+ req->response_port = port;
+
+ if (nxt_fast_path(port_impl->ready)) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "check_response_port: "
+ "port{%d,%d} already requested",
+ (int) port->id.pid, (int) port->id.id);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+ }
+
+ port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ if (nxt_slow_path(port_impl == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
+ (int) sizeof(nxt_unit_port_impl_t));
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port = &port_impl->port;
+
+ port->id = *port_id;
+ port->in_fd = -1;
+ port->out_fd = -1;
+ port->data = NULL;
+
+ res = nxt_unit_port_hash_add(&lib->ports, port);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
+ port->id.pid, port->id.id);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
+
+ port_impl->process = req_impl->process;
+
+
+ nxt_queue_init(&port_impl->awaiting_req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
+
+ port_impl->use_count = 2;
+ port_impl->ready = 0;
+
+ req->response_port = port;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_use(port_impl->process);
+
+ res = nxt_unit_get_port(ctx, port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
nxt_unit_read_buf_release(ctx, rbuf);
+ nxt_unit_process_ready_req(ctx_impl);
+
nxt_unit_ctx_release(ctx_impl);
return rc;
@@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
}
+static void
+nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_queue_t ready_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_init(&ready_req);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return;
+ }
+
+ nxt_queue_add(&ready_req, &ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->ready_req);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_queue_each(req_impl, &ready_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(&req_impl->req);
+
+ } nxt_queue_loop;
+}
+
+
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
@@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port;
+ int rc;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
*port = *old_port;
+ nxt_queue_init(&awaiting_req);
+
+ old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
+
+ old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+
pthread_mutex_unlock(&lib->mutex);
if (lib->callbacks.add_port != NULL
@@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
lib->callbacks.add_port(ctx, old_port);
}
+ nxt_queue_each(req_impl, &awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
return old_port;
}
@@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port->use_count = 2;
new_port->process = process;
+ new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ nxt_queue_init(&new_port->awaiting_req);
process = NULL;
@@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
}
+static int
+nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_port_t get_port;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_PORT;
+
+ m.get_port.id = port_id->id;
+ m.get_port.pid = port_id->pid;
+
+ nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
+ (int) port_id->id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)