diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 276 |
1 files changed, 249 insertions, 27 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ddfd9c80..c1ef977f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); @@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; + /* for nxt_unit_port_impl_t.awaiting_req */ + nxt_queue_link_t port_wait_link; char extra_data[]; }; @@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; nxt_atomic_t use_count; + nxt_atomic_t wait_items; pthread_mutex_t mutex; @@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t ready_req; + nxt_unit_read_buf_t *pending_read_head; nxt_unit_read_buf_t **pending_read_tail; nxt_unit_read_buf_t *free_read_buf; @@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s { nxt_queue_link_t link; nxt_unit_process_t *process; + + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t awaiting_req; + + int ready; }; @@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); ctx_impl->use_count = 1; + ctx_impl->wait_items = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); + nxt_queue_init(&ctx_impl->ready_req); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); @@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { + int res; nxt_unit_impl_t *lib; - nxt_unit_port_t *port; nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; @@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", - recv_msg->stream, - (int) recv_msg->pid, (int) recv_msg->reply_port); - - return NXT_UNIT_ERROR; - } - req = &req_impl->req; - req->response_port = port; - req->request = recv_msg->start; b = recv_msg->incoming_buf; @@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib->callbacks.request_handler(req); + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + res = nxt_unit_request_check_response_port(req, &port_id); + + if (nxt_fast_path(res == NXT_UNIT_OK)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); + } return NXT_UNIT_OK; } static int +nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id) +{ + int res; + nxt_unit_ctx_t *ctx; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + nxt_unit_request_info_impl_t *req_impl; + + ctx = req->ctx; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + if (nxt_fast_path(port != NULL)) { + req->response_port = port; + + if (nxt_fast_path(port_impl->ready)) { + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", + (int) port->id.pid, (int) port->id.id); + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "check_response_port: " + "port{%d,%d} already requested", + (int) port->id.pid, (int) port->id.id); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&lib->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; + } + + port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + if (nxt_slow_path(port_impl == NULL)) { + nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", + (int) sizeof(nxt_unit_port_impl_t)); + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_ERROR; + } + + port = &port_impl->port; + + port->id = *port_id; + port->in_fd = -1; + port->out_fd = -1; + port->data = NULL; + + res = nxt_unit_port_hash_add(&lib->ports, port); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + + port_impl->process = req_impl->process; + + + nxt_queue_init(&port_impl->awaiting_req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); + + port_impl->use_count = 2; + port_impl->ready = 0; + + req->response_port = port; + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_use(port_impl->process); + + res = nxt_unit_get_port(ctx, port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; +} + + +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { size_t hsize; @@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_process_ready_req(ctx_impl); + nxt_unit_ctx_release(ctx_impl); return rc; @@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) } +static void +nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_queue_t ready_req; + nxt_unit_impl_t *lib; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_init(&ready_req); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->ready_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return; + } + + nxt_queue_add(&ready_req, &ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->ready_req); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_queue_each(req_impl, &ready_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(&req_impl->req); + + } nxt_queue_loop; +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + int rc; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *new_port, *old_port_impl; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) *port = *old_port; + nxt_queue_init(&awaiting_req); + + old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } + + old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL @@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) lib->callbacks.add_port(ctx, old_port); } + nxt_queue_each(req_impl, &awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + return old_port; } @@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; + new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + + nxt_queue_init(&new_port->awaiting_req); process = NULL; @@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } +static int +nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_port_t get_port; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_PORT; + + m.get_port.id = port_id->id; + m.get_port.pid = port_id->pid; + + nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, + (int) port_id->id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) |