summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c471
1 files changed, 363 insertions, 108 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index f75d61bc..097f50d6 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -54,11 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
int queue_fd);
-static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
@@ -106,6 +108,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
+static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
+ nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
@@ -144,6 +148,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, void *queue);
+static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
+ nxt_queue_t *awaiting_req);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
@@ -305,6 +311,9 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_read_buf_t */
nxt_queue_t free_rbuf;
+ int online;
+ int ready;
+
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -314,6 +323,7 @@ struct nxt_unit_ctx_impl_s {
struct nxt_unit_mmap_s {
nxt_port_mmap_header_t *hdr;
+ pthread_t src_thread;
/* of nxt_unit_read_buf_t */
nxt_queue_t awaiting_rbuf;
@@ -353,7 +363,6 @@ struct nxt_unit_impl_s {
pid_t pid;
int log_fd;
- int online;
nxt_unit_ctx_impl_t main_ctx;
};
@@ -560,7 +569,6 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->ports.slot = NULL;
lib->log_fd = STDERR_FILENO;
- lib->online = 1;
nxt_queue_init(&lib->contexts);
@@ -614,10 +622,16 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_unit_lib_use(lib);
+ pthread_mutex_lock(&lib->mutex);
+
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
ctx_impl->use_count = 1;
ctx_impl->wait_items = 0;
+ ctx_impl->online = 1;
+ ctx_impl->ready = 0;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
@@ -769,7 +783,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
uint32_t *shm_limit)
{
int rc;
- int ready_fd, router_fd, read_fd;
+ int ready_fd, router_fd, read_in_fd, read_out_fd;
char *unit_init, *version_end;
long version_length;
int64_t ready_pid, router_pid, read_pid;
@@ -801,15 +815,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
- "%"PRId64",%"PRIu32",%d;"
+ "%"PRId64",%"PRIu32",%d,%d;"
"%d,%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&router_pid, &router_id, &router_fd,
- &read_pid, &read_id, &read_fd,
+ &read_pid, &read_id, &read_in_fd, &read_out_fd,
log_fd, shm_limit);
- if (nxt_slow_path(rc != 12)) {
+ if (nxt_slow_path(rc != 13)) {
nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
return NXT_UNIT_ERROR;
@@ -829,8 +843,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
- read_port->in_fd = read_fd;
- read_port->out_fd = -1;
+ read_port->in_fd = read_in_fd;
+ read_port->out_fd = read_out_fd;
read_port->data = NULL;
*stream = ready_stream;
@@ -891,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
static int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq)
{
int rc;
pid_t pid;
@@ -979,6 +994,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
switch (port_msg->type) {
+ case _NXT_PORT_MSG_RPC_READY:
+ rc = NXT_UNIT_OK;
+ break;
+
case _NXT_PORT_MSG_QUIT:
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
@@ -990,6 +1009,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
rc = nxt_unit_process_new_port(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_PORT_ACK:
+ rc = nxt_unit_ctx_ready(ctx);
+ break;
+
case _NXT_PORT_MSG_CHANGE_FILE:
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
port_msg->stream, recv_msg.fd[0]);
@@ -1019,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
case _NXT_PORT_MSG_REQ_HEADERS:
- rc = nxt_unit_process_req_headers(ctx, &recv_msg);
+ rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
break;
case _NXT_PORT_MSG_REQ_BODY:
@@ -1055,7 +1078,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
default:
- nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
+ nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
rc = NXT_UNIT_ERROR;
@@ -1118,7 +1141,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- if (new_port_msg->id == (nxt_port_id_t) -1) {
+ if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
new_port.in_fd = recv_msg->fd[0];
@@ -1160,11 +1183,31 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- if (new_port_msg->id == (nxt_port_id_t) -1) {
+ if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
lib->shared_port = port;
- } else {
- nxt_unit_port_release(port);
+ return nxt_unit_ctx_ready(ctx);
+ }
+
+ nxt_unit_port_release(port);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ ctx_impl->ready = 1;
+
+ if (lib->callbacks.ready_handler) {
+ return lib->callbacks.ready_handler(ctx);
}
return NXT_UNIT_OK;
@@ -1172,7 +1215,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
-nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_request_info_t **preq)
{
int res;
nxt_unit_impl_t *lib;
@@ -1288,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
}
- lib->callbacks.request_handler(req);
+ if (preq == NULL) {
+ lib->callbacks.request_handler(req);
+
+ } else {
+ *preq = req;
+ }
}
return NXT_UNIT_OK;
@@ -1318,8 +1367,14 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (recv_msg->incoming_buf != NULL) {
b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
+ while (b->next != NULL) {
+ b = b->next;
+ }
+
/* "Move" incoming buffer list to req_impl. */
- nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
+ b->next = recv_msg->incoming_buf;
+ b->next->prev = &b->next;
+
recv_msg->incoming_buf = NULL;
}
@@ -1588,8 +1643,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
if (recv_msg->last) {
- req_impl->websocket = 0;
-
if (cb->close_handler) {
nxt_unit_req_debug(req, "close_handler");
@@ -1682,8 +1735,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
}
- req_impl->websocket = 0;
-
while (req_impl->outgoing_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
}
@@ -1704,6 +1755,8 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response_port = NULL;
}
+ req_impl->state = NXT_UNIT_RS_RELEASED;
+
pthread_mutex_lock(&ctx_impl->mutex);
nxt_queue_remove(&req_impl->link);
@@ -1711,8 +1764,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
pthread_mutex_unlock(&ctx_impl->mutex);
-
- req_impl->state = NXT_UNIT_RS_RELEASED;
}
@@ -2132,7 +2183,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req,
resp = req->response;
if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
- nxt_unit_req_warn(req, "add_field: too many response fields");
+ nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
+ (int) resp->fields_count);
return NXT_UNIT_ERROR;
}
@@ -2309,6 +2361,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
+
return NULL;
}
@@ -2949,8 +3003,6 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
- nxt_unit_req_debug(req, "read: %d", (int) buf_res);
-
if (buf_res < (ssize_t) size && req->content_fd != -1) {
res = read(req->content_fd, dst, size);
if (nxt_slow_path(res < 0)) {
@@ -3389,7 +3441,10 @@ retry:
for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
+ if (hdr->sent_over != 0xFFFFu
+ && (hdr->sent_over != port->id.id
+ || mm->src_thread != pthread_self()))
+ {
continue;
}
@@ -3657,6 +3712,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
hdr->src_pid = lib->pid;
hdr->dst_pid = port->id.pid;
hdr->sent_over = port->id.id;
+ mm->src_thread = pthread_self();
/* Mark first n chunk(s) as busy */
for (i = 0; i < n; i++) {
@@ -3975,6 +4031,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
} nxt_queue_loop;
return rc;
@@ -3982,6 +4040,32 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
static void
+nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_port_msg_t msg;
+
+ if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
+ return;
+ }
+
+ if (nxt_slow_path(ctx_impl->read_port == NULL
+ || ctx_impl->read_port->out_fd == -1))
+ {
+ nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
+
+ return;
+ }
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.type = _NXT_PORT_MSG_RPC_READY;
+
+ (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
+ &msg, sizeof(msg), NULL, 0);
+}
+
+
+static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
{
pthread_mutex_init(&mmaps->mutex, NULL);
@@ -4403,18 +4487,20 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
+ int rc;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rc = nxt_unit_run_once_impl(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_quit(ctx);
break;
}
}
@@ -4458,7 +4544,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
@@ -4483,17 +4569,17 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
nxt_unit_port_impl_t *port_impl;
struct pollfd fds[2];
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
-
+ if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
}
port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
port);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
retry:
if (port_impl->from_socket == 0) {
@@ -4585,8 +4671,6 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
- nxt_queue_init(&pending_rbuf);
-
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
@@ -4597,6 +4681,8 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
return NXT_UNIT_OK;
}
+ nxt_queue_init(&pending_rbuf);
+
nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
nxt_queue_init(&ctx_impl->pending_rbuf);
@@ -4607,7 +4693,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
- rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
} else {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4629,8 +4715,6 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
- nxt_queue_init(&ready_req);
-
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
@@ -4641,6 +4725,8 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
return;
}
+ nxt_queue_init(&ready_req);
+
nxt_queue_add(&ready_req, &ctx_impl->ready_req);
nxt_queue_init(&ctx_impl->ready_req);
@@ -4691,18 +4777,16 @@ int
nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
int rc;
- nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4716,7 +4800,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
goto retry;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4797,13 +4881,16 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4822,22 +4909,67 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
break;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
+ }
- rc = nxt_unit_process_pending_rbuf(ctx);
- if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
- break;
- }
+ nxt_unit_ctx_release(ctx);
- nxt_unit_process_ready_req(ctx);
+ return rc;
+}
+
+
+nxt_unit_request_info_t *
+nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ req = NULL;
+
+ if (nxt_slow_path(!ctx_impl->online)) {
+ goto done;
}
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ goto done;
+ }
+
+ rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (rc != NXT_UNIT_OK) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ goto done;
+ }
+
+ (void) nxt_unit_process_msg(ctx, rbuf, &req);
+
+done:
+
nxt_unit_ctx_release(ctx);
- return rc;
+ return req;
+}
+
+
+int
+nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ return (ctx == &lib->main_ctx.ctx);
}
@@ -4862,6 +4994,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
@@ -4869,6 +5002,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
retry:
@@ -4884,7 +5018,7 @@ retry:
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
@@ -4896,12 +5030,12 @@ retry:
nxt_unit_process_ready_req(ctx);
- rbuf = nxt_unit_read_buf_get(ctx);
- if (nxt_slow_path(rbuf == NULL)) {
- return NXT_UNIT_ERROR;
- }
+ if (ctx_impl->online) {
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
- if (lib->online) {
goto retry;
}
@@ -4945,14 +5079,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
queue_fd = -1;
- port = nxt_unit_create_port(ctx);
+ port = nxt_unit_create_port(&new_ctx->ctx);
if (nxt_slow_path(port == NULL)) {
goto fail;
}
new_ctx->read_port = port;
- queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
if (nxt_slow_path(queue_fd == -1)) {
goto fail;
}
@@ -4971,7 +5105,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
port_impl->queue = mem;
- rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
+ rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
@@ -4997,6 +5131,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
{
nxt_unit_impl_t *lib;
nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
@@ -5034,10 +5169,21 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
} nxt_queue_loop;
+ nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
+ {
+ if (rbuf != &ctx_impl->ctx_read_buf) {
+ nxt_unit_free(&ctx_impl->ctx, rbuf);
+ }
+ } nxt_queue_loop;
+
pthread_mutex_destroy(&ctx_impl->mutex);
+ pthread_mutex_lock(&lib->mutex);
+
nxt_queue_remove(&ctx_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
nxt_unit_port_release(ctx_impl->read_port);
@@ -5224,7 +5370,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
}
if (port_impl->queue != NULL) {
- munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
+ munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
? sizeof(nxt_app_queue_t)
: sizeof(nxt_port_queue_t));
}
@@ -5237,14 +5383,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
{
- int rc;
- nxt_queue_t awaiting_req;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_port_impl_t *new_port, *old_port_impl;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc, ready;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -5293,44 +5437,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
old_port_impl->queue = queue;
}
- if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
- nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
- nxt_queue_init(&old_port_impl->awaiting_req);
- }
-
- old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
- pthread_mutex_unlock(&lib->mutex);
+ /*
+ * Port can be market as 'ready' only after callbacks.add_port() call.
+ * Otherwise, request may try to use the port before callback.
+ */
+ if (lib->callbacks.add_port == NULL && ready) {
+ old_port_impl->ready = ready;
- if (lib->callbacks.add_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
- lib->callbacks.add_port(ctx, old_port);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
}
- nxt_queue_each(req_impl, &awaiting_req,
- nxt_unit_request_info_impl_t, port_wait_link)
- {
- nxt_queue_remove(&req_impl->port_wait_link);
+ pthread_mutex_unlock(&lib->mutex);
- ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
- ctx);
+ if (lib->callbacks.add_port != NULL && ready) {
+ lib->callbacks.add_port(ctx, old_port);
- pthread_mutex_lock(&ctx_impl->mutex);
+ pthread_mutex_lock(&lib->mutex);
- nxt_queue_insert_tail(&ctx_impl->ready_req,
- &req_impl->port_wait_link);
+ old_port_impl->ready = ready;
- pthread_mutex_unlock(&ctx_impl->mutex);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
- nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ pthread_mutex_unlock(&lib->mutex);
+ }
- } nxt_queue_loop;
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
return old_port;
}
new_port = NULL;
+ ready = 0;
nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
@@ -5341,7 +5486,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
goto unlock;
}
- if (port->id.id >= process->next_port_id) {
+ if (port->id.id != NXT_UNIT_SHARED_PORT_ID
+ && port->id.id >= process->next_port_id)
+ {
process->next_port_id = port->id.id + 1;
}
@@ -5371,13 +5518,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
new_port->use_count = 2;
new_port->process = process;
- new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
new_port->queue = queue;
new_port->from_socket = 0;
new_port->socket_rbuf = NULL;
nxt_queue_init(&new_port->awaiting_req);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ if (lib->callbacks.add_port == NULL) {
+ new_port->ready = ready;
+
+ } else {
+ new_port->ready = 0;
+ }
+
process = NULL;
unlock:
@@ -5388,14 +5543,55 @@ unlock:
nxt_unit_process_release(process);
}
- if (lib->callbacks.add_port != NULL
- && new_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
+ if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
lib->callbacks.add_port(ctx, &new_port->port);
+
+ nxt_queue_init(&awaiting_req);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ new_port->ready = 1;
+
+ if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
+ nxt_queue_init(&new_port->awaiting_req);
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
}
- return &new_port->port;
+ return (new_port == NULL) ? NULL : &new_port->port;
+}
+
+
+static void
+nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_each(req_impl, awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
+ } nxt_queue_loop;
}
@@ -5509,17 +5705,72 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
static void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
- nxt_unit_impl_t *lib;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_callbacks_t *cb;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ if (!ctx_impl->online) {
+ return;
+ }
+
+ ctx_impl->online = 0;
+
+ cb = &lib->callbacks;
+
+ if (cb->quit != NULL) {
+ cb->quit(ctx);
+ }
+
+ nxt_queue_each(req_impl, &ctx_impl->active_req,
+ nxt_unit_request_info_impl_t, link)
+ {
+ req = &req_impl->req;
+
+ nxt_unit_req_warn(req, "active request on ctx quit");
- if (lib->online) {
- lib->online = 0;
+ if (cb->close_handler) {
+ nxt_unit_req_debug(req, "close_handler");
+
+ cb->close_handler(req);
- if (lib->callbacks.quit != NULL) {
- lib->callbacks.quit(ctx);
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
}
+
+ } nxt_queue_loop;
+
+ if (ctx != &lib->main_ctx.ctx) {
+ return;
}
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.pid = lib->pid;
+ msg.type = _NXT_PORT_MSG_QUIT;
+
+ pthread_mutex_lock(&lib->mutex);
+
+ nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
+
+ if (ctx == &ctx_impl->ctx
+ || ctx_impl->read_port == NULL
+ || ctx_impl->read_port->out_fd == -1)
+ {
+ continue;
+ }
+
+ (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
+ &msg, sizeof(msg), NULL, 0);
+
+ } nxt_queue_loop;
+
+ pthread_mutex_unlock(&lib->mutex);
}
@@ -6388,7 +6639,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
p = malloc(size);
if (nxt_fast_path(p != NULL)) {
+#if (NXT_DEBUG_ALLOC)
nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
+#endif
} else {
nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
@@ -6402,7 +6655,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
void
nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
{
+#if (NXT_DEBUG_ALLOC)
nxt_unit_debug(ctx, "free(%p)", p);
+#endif
free(p);
}