summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c96
-rw-r--r--src/nxt_unit.h2
2 files changed, 74 insertions, 24 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 65a76bee..4b0f5230 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -305,6 +305,8 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_read_buf_t */
nxt_queue_t free_rbuf;
+ int online;
+
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -354,7 +356,6 @@ struct nxt_unit_impl_s {
pid_t pid;
int log_fd;
- int online;
nxt_unit_ctx_impl_t main_ctx;
};
@@ -561,7 +562,6 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->ports.slot = NULL;
lib->log_fd = STDERR_FILENO;
- lib->online = 1;
nxt_queue_init(&lib->contexts);
@@ -615,10 +615,15 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_unit_lib_use(lib);
+ pthread_mutex_lock(&lib->mutex);
+
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
ctx_impl->use_count = 1;
ctx_impl->wait_items = 0;
+ ctx_impl->online = 1;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
@@ -1119,7 +1124,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- if (new_port_msg->id == (nxt_port_id_t) -1) {
+ if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
new_port.in_fd = recv_msg->fd[0];
@@ -1161,7 +1166,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- if (new_port_msg->id == (nxt_port_id_t) -1) {
+ if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
lib->shared_port = port;
} else {
@@ -4408,18 +4413,20 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
+ int rc;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rc = nxt_unit_run_once_impl(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_quit(ctx);
break;
}
}
@@ -4696,18 +4703,16 @@ int
nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
int rc;
- nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4802,13 +4807,16 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
rc = NXT_UNIT_OK;
- while (nxt_fast_path(lib->online)) {
+ while (nxt_fast_path(ctx_impl->online)) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4867,6 +4875,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
@@ -4874,6 +4883,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
retry:
@@ -4906,7 +4916,7 @@ retry:
return NXT_UNIT_ERROR;
}
- if (lib->online) {
+ if (ctx_impl->online) {
goto retry;
}
@@ -4950,14 +4960,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
queue_fd = -1;
- port = nxt_unit_create_port(ctx);
+ port = nxt_unit_create_port(&new_ctx->ctx);
if (nxt_slow_path(port == NULL)) {
goto fail;
}
new_ctx->read_port = port;
- queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
if (nxt_slow_path(queue_fd == -1)) {
goto fail;
}
@@ -4976,7 +4986,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
port_impl->queue = mem;
- rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
+ rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
@@ -5041,8 +5051,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
pthread_mutex_destroy(&ctx_impl->mutex);
+ pthread_mutex_lock(&lib->mutex);
+
nxt_queue_remove(&ctx_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
nxt_unit_port_release(ctx_impl->read_port);
@@ -5229,7 +5243,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
}
if (port_impl->queue != NULL) {
- munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
+ munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
? sizeof(nxt_app_queue_t)
: sizeof(nxt_port_queue_t));
}
@@ -5346,7 +5360,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
goto unlock;
}
- if (port->id.id >= process->next_port_id) {
+ if (port->id.id != NXT_UNIT_SHARED_PORT_ID
+ && port->id.id >= process->next_port_id)
+ {
process->next_port_id = port->id.id + 1;
}
@@ -5514,17 +5530,49 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
static void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
- nxt_unit_impl_t *lib;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (lib->online) {
- lib->online = 0;
+ if (!ctx_impl->online) {
+ return;
+ }
- if (lib->callbacks.quit != NULL) {
- lib->callbacks.quit(ctx);
- }
+ ctx_impl->online = 0;
+
+ if (lib->callbacks.quit != NULL) {
+ lib->callbacks.quit(ctx);
}
+
+ if (ctx != &lib->main_ctx.ctx) {
+ return;
+ }
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.pid = lib->pid;
+ msg.type = _NXT_PORT_MSG_QUIT;
+
+ pthread_mutex_lock(&lib->mutex);
+
+ nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
+
+ if (ctx == &ctx_impl->ctx
+ || ctx_impl->read_port == NULL
+ || ctx_impl->read_port->out_fd == -1)
+ {
+ continue;
+ }
+
+ (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
+ &msg, sizeof(msg), NULL, 0);
+
+ } nxt_queue_loop;
+
+ pthread_mutex_unlock(&lib->mutex);
}
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index e90f0781..52f9bc27 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -34,6 +34,8 @@ enum {
#define NXT_UNIT_INIT_ENV "NXT_UNIT_INIT"
+#define NXT_UNIT_SHARED_PORT_ID ((uint16_t) 0xFFFFu)
+
/*
* Mostly opaque structure with library state.
*