diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 471 |
1 files changed, 363 insertions, 108 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index f75d61bc..097f50d6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -54,11 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); -static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, @@ -106,6 +108,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); +static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, + nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); @@ -144,6 +148,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); +static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, + nxt_queue_t *awaiting_req); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -305,6 +311,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_read_buf_t */ nxt_queue_t free_rbuf; + int online; + int ready; + nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -314,6 +323,7 @@ struct nxt_unit_ctx_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + pthread_t src_thread; /* of nxt_unit_read_buf_t */ nxt_queue_t awaiting_rbuf; @@ -353,7 +363,6 @@ struct nxt_unit_impl_s { pid_t pid; int log_fd; - int online; nxt_unit_ctx_impl_t main_ctx; }; @@ -560,7 +569,6 @@ nxt_unit_create(nxt_unit_init_t *init) lib->ports.slot = NULL; lib->log_fd = STDERR_FILENO; - lib->online = 1; nxt_queue_init(&lib->contexts); @@ -614,10 +622,16 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_unit_lib_use(lib); + pthread_mutex_lock(&lib->mutex); + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + pthread_mutex_unlock(&lib->mutex); + ctx_impl->use_count = 1; ctx_impl->wait_items = 0; + ctx_impl->online = 1; + ctx_impl->ready = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -769,7 +783,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, uint32_t *shm_limit) { int rc; - int ready_fd, router_fd, read_fd; + int ready_fd, router_fd, read_in_fd, read_out_fd; char *unit_init, *version_end; long version_length; int64_t ready_pid, router_pid, read_pid; @@ -801,15 +815,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" - "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d,%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, - &read_pid, &read_id, &read_fd, + &read_pid, &read_id, &read_in_fd, &read_out_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 12)) { + if (nxt_slow_path(rc != 13)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -829,8 +843,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); - read_port->in_fd = read_fd; - read_port->out_fd = -1; + read_port->in_fd = read_in_fd; + read_port->out_fd = read_out_fd; read_port->data = NULL; *stream = ready_stream; @@ -891,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) static int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq) { int rc; pid_t pid; @@ -979,6 +994,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) switch (port_msg->type) { + case _NXT_PORT_MSG_RPC_READY: + rc = NXT_UNIT_OK; + break; + case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); @@ -990,6 +1009,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = nxt_unit_process_new_port(ctx, &recv_msg); break; + case _NXT_PORT_MSG_PORT_ACK: + rc = nxt_unit_ctx_ready(ctx); + break; + case _NXT_PORT_MSG_CHANGE_FILE: nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", port_msg->stream, recv_msg.fd[0]); @@ -1019,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; case _NXT_PORT_MSG_REQ_HEADERS: - rc = nxt_unit_process_req_headers(ctx, &recv_msg); + rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); break; case _NXT_PORT_MSG_REQ_BODY: @@ -1055,7 +1078,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; default: - nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", + nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); rc = NXT_UNIT_ERROR; @@ -1118,7 +1141,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (new_port_msg->id == (nxt_port_id_t) -1) { + if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); new_port.in_fd = recv_msg->fd[0]; @@ -1160,11 +1183,31 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (new_port_msg->id == (nxt_port_id_t) -1) { + if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { lib->shared_port = port; - } else { - nxt_unit_port_release(port); + return nxt_unit_ctx_ready(ctx); + } + + nxt_unit_port_release(port); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + ctx_impl->ready = 1; + + if (lib->callbacks.ready_handler) { + return lib->callbacks.ready_handler(ctx); } return NXT_UNIT_OK; @@ -1172,7 +1215,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int -nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_request_info_t **preq) { int res; nxt_unit_impl_t *lib; @@ -1288,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } } - lib->callbacks.request_handler(req); + if (preq == NULL) { + lib->callbacks.request_handler(req); + + } else { + *preq = req; + } } return NXT_UNIT_OK; @@ -1318,8 +1367,14 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (recv_msg->incoming_buf != NULL) { b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + while (b->next != NULL) { + b = b->next; + } + /* "Move" incoming buffer list to req_impl. */ - nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + b->next = recv_msg->incoming_buf; + b->next->prev = &b->next; + recv_msg->incoming_buf = NULL; } @@ -1588,8 +1643,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } if (recv_msg->last) { - req_impl->websocket = 0; - if (cb->close_handler) { nxt_unit_req_debug(req, "close_handler"); @@ -1682,8 +1735,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } - req_impl->websocket = 0; - while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -1704,6 +1755,8 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response_port = NULL; } + req_impl->state = NXT_UNIT_RS_RELEASED; + pthread_mutex_lock(&ctx_impl->mutex); nxt_queue_remove(&req_impl->link); @@ -1711,8 +1764,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); pthread_mutex_unlock(&ctx_impl->mutex); - - req_impl->state = NXT_UNIT_RS_RELEASED; } @@ -2132,7 +2183,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req, resp = req->response; if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { - nxt_unit_req_warn(req, "add_field: too many response fields"); + nxt_unit_req_warn(req, "add_field: too many response fields (%d)", + (int) resp->fields_count); return NXT_UNIT_ERROR; } @@ -2309,6 +2361,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); + return NULL; } @@ -2949,8 +3003,6 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); - nxt_unit_req_debug(req, "read: %d", (int) buf_res); - if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); if (nxt_slow_path(res < 0)) { @@ -3389,7 +3441,10 @@ retry: for (mm = lib->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { + if (hdr->sent_over != 0xFFFFu + && (hdr->sent_over != port->id.id + || mm->src_thread != pthread_self())) + { continue; } @@ -3657,6 +3712,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) hdr->src_pid = lib->pid; hdr->dst_pid = port->id.pid; hdr->sent_over = port->id.id; + mm->src_thread = pthread_self(); /* Mark first n chunk(s) as busy */ for (i = 0; i < n; i++) { @@ -3975,6 +4031,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + nxt_unit_awake_ctx(ctx, ctx_impl); + } nxt_queue_loop; return rc; @@ -3982,6 +4040,32 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) static void +nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_port_msg_t msg; + + if (nxt_fast_path(ctx == &ctx_impl->ctx)) { + return; + } + + if (nxt_slow_path(ctx_impl->read_port == NULL + || ctx_impl->read_port->out_fd == -1)) + { + nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); + + return; + } + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_RPC_READY; + + (void) nxt_unit_port_send(ctx, ctx_impl->read_port, + &msg, sizeof(msg), NULL, 0); +} + + +static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) { pthread_mutex_init(&mmaps->mutex, NULL); @@ -4403,18 +4487,20 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rc = nxt_unit_run_once_impl(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_quit(ctx); break; } } @@ -4458,7 +4544,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } @@ -4483,17 +4569,17 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_port_impl_t *port_impl; struct pollfd fds[2]; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - + if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, port); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + retry: if (port_impl->from_socket == 0) { @@ -4585,8 +4671,6 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; - nxt_queue_init(&pending_rbuf); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); @@ -4597,6 +4681,8 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) return NXT_UNIT_OK; } + nxt_queue_init(&pending_rbuf); + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); nxt_queue_init(&ctx_impl->pending_rbuf); @@ -4607,7 +4693,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { - rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); } else { nxt_unit_read_buf_release(ctx, rbuf); @@ -4629,8 +4715,6 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; - nxt_queue_init(&ready_req); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); @@ -4641,6 +4725,8 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) return; } + nxt_queue_init(&ready_req); + nxt_queue_add(&ready_req, &ctx_impl->ready_req); nxt_queue_init(&ctx_impl->ready_req); @@ -4691,18 +4777,16 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { int rc; - nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4716,7 +4800,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) goto retry; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } @@ -4797,13 +4881,16 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4822,22 +4909,67 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) break; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + } - rc = nxt_unit_process_pending_rbuf(ctx); - if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { - break; - } + nxt_unit_ctx_release(ctx); - nxt_unit_process_ready_req(ctx); + return rc; +} + + +nxt_unit_request_info_t * +nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + req = NULL; + + if (nxt_slow_path(!ctx_impl->online)) { + goto done; } + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + goto done; + } + + rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (rc != NXT_UNIT_OK) { + nxt_unit_read_buf_release(ctx, rbuf); + goto done; + } + + (void) nxt_unit_process_msg(ctx, rbuf, &req); + +done: + nxt_unit_ctx_release(ctx); - return rc; + return req; +} + + +int +nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + return (ctx == &lib->main_ctx.ctx); } @@ -4862,6 +4994,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { @@ -4869,6 +5002,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); retry: @@ -4884,7 +5018,7 @@ retry: return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } @@ -4896,12 +5030,12 @@ retry: nxt_unit_process_ready_req(ctx); - rbuf = nxt_unit_read_buf_get(ctx); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; - } + if (ctx_impl->online) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } - if (lib->online) { goto retry; } @@ -4945,14 +5079,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) queue_fd = -1; - port = nxt_unit_create_port(ctx); + port = nxt_unit_create_port(&new_ctx->ctx); if (nxt_slow_path(port == NULL)) { goto fail; } new_ctx->read_port = port; - queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); if (nxt_slow_path(queue_fd == -1)) { goto fail; } @@ -4971,7 +5105,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); port_impl->queue = mem; - rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); + rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -4997,6 +5131,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_read_buf_t *rbuf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; @@ -5034,10 +5169,21 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) } nxt_queue_loop; + nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) + { + if (rbuf != &ctx_impl->ctx_read_buf) { + nxt_unit_free(&ctx_impl->ctx, rbuf); + } + } nxt_queue_loop; + pthread_mutex_destroy(&ctx_impl->mutex); + pthread_mutex_lock(&lib->mutex); + nxt_queue_remove(&ctx_impl->link); + pthread_mutex_unlock(&lib->mutex); + if (nxt_fast_path(ctx_impl->read_port != NULL)) { nxt_unit_remove_port(lib, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); @@ -5224,7 +5370,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) } if (port_impl->queue != NULL) { - munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) ? sizeof(nxt_app_queue_t) : sizeof(nxt_port_queue_t)); } @@ -5237,14 +5383,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { - int rc; - nxt_queue_t awaiting_req; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *new_port, *old_port_impl; - nxt_unit_request_info_impl_t *req_impl; + int rc, ready; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_port_impl_t *new_port, *old_port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -5293,44 +5437,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) old_port_impl->queue = queue; } - if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { - nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); - nxt_queue_init(&old_port_impl->awaiting_req); - } - - old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + ready = (port->in_fd != -1 || port->out_fd != -1); - pthread_mutex_unlock(&lib->mutex); + /* + * Port can be market as 'ready' only after callbacks.add_port() call. + * Otherwise, request may try to use the port before callback. + */ + if (lib->callbacks.add_port == NULL && ready) { + old_port_impl->ready = ready; - if (lib->callbacks.add_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { - lib->callbacks.add_port(ctx, old_port); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } } - nxt_queue_each(req_impl, &awaiting_req, - nxt_unit_request_info_impl_t, port_wait_link) - { - nxt_queue_remove(&req_impl->port_wait_link); + pthread_mutex_unlock(&lib->mutex); - ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, - ctx); + if (lib->callbacks.add_port != NULL && ready) { + lib->callbacks.add_port(ctx, old_port); - pthread_mutex_lock(&ctx_impl->mutex); + pthread_mutex_lock(&lib->mutex); - nxt_queue_insert_tail(&ctx_impl->ready_req, - &req_impl->port_wait_link); + old_port_impl->ready = ready; - pthread_mutex_unlock(&ctx_impl->mutex); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } - nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + pthread_mutex_unlock(&lib->mutex); + } - } nxt_queue_loop; + nxt_unit_process_awaiting_req(ctx, &awaiting_req); return old_port; } new_port = NULL; + ready = 0; nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, @@ -5341,7 +5486,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) goto unlock; } - if (port->id.id >= process->next_port_id) { + if (port->id.id != NXT_UNIT_SHARED_PORT_ID + && port->id.id >= process->next_port_id) + { process->next_port_id = port->id.id + 1; } @@ -5371,13 +5518,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) new_port->use_count = 2; new_port->process = process; - new_port->ready = (port->in_fd != -1 || port->out_fd != -1); new_port->queue = queue; new_port->from_socket = 0; new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); + ready = (port->in_fd != -1 || port->out_fd != -1); + + if (lib->callbacks.add_port == NULL) { + new_port->ready = ready; + + } else { + new_port->ready = 0; + } + process = NULL; unlock: @@ -5388,14 +5543,55 @@ unlock: nxt_unit_process_release(process); } - if (lib->callbacks.add_port != NULL - && new_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { + if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { lib->callbacks.add_port(ctx, &new_port->port); + + nxt_queue_init(&awaiting_req); + + pthread_mutex_lock(&lib->mutex); + + new_port->ready = 1; + + if (!nxt_queue_is_empty(&new_port->awaiting_req)) { + nxt_queue_add(&awaiting_req, &new_port->awaiting_req); + nxt_queue_init(&new_port->awaiting_req); + } + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_awaiting_req(ctx, &awaiting_req); } - return &new_port->port; + return (new_port == NULL) ? NULL : &new_port->port; +} + + +static void +nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) +{ + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_each(req_impl, awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + nxt_unit_awake_ctx(ctx, ctx_impl); + + } nxt_queue_loop; } @@ -5509,17 +5705,72 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + if (!ctx_impl->online) { + return; + } + + ctx_impl->online = 0; + + cb = &lib->callbacks; + + if (cb->quit != NULL) { + cb->quit(ctx); + } + + nxt_queue_each(req_impl, &ctx_impl->active_req, + nxt_unit_request_info_impl_t, link) + { + req = &req_impl->req; + + nxt_unit_req_warn(req, "active request on ctx quit"); - if (lib->online) { - lib->online = 0; + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); } + + } nxt_queue_loop; + + if (ctx != &lib->main_ctx.ctx) { + return; } + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.pid = lib->pid; + msg.type = _NXT_PORT_MSG_QUIT; + + pthread_mutex_lock(&lib->mutex); + + nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { + + if (ctx == &ctx_impl->ctx + || ctx_impl->read_port == NULL + || ctx_impl->read_port->out_fd == -1) + { + continue; + } + + (void) nxt_unit_port_send(ctx, ctx_impl->read_port, + &msg, sizeof(msg), NULL, 0); + + } nxt_queue_loop; + + pthread_mutex_unlock(&lib->mutex); } @@ -6388,7 +6639,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) p = malloc(size); if (nxt_fast_path(p != NULL)) { +#if (NXT_DEBUG_ALLOC) nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); +#endif } else { nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", @@ -6402,7 +6655,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) { +#if (NXT_DEBUG_ALLOC) nxt_unit_debug(ctx, "free(%p)", p); +#endif free(p); } |