summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:19:55 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:19:55 +0300
commitec3389b63bd7a9159d2be4a2863140f75095c7d3 (patch)
treef88972c19c713748b56c339dab1bdd60892eaeec /src
parent3a721e1d96720505d4d6638e77d2c296d962519c (diff)
downloadunit-ec3389b63bd7a9159d2be4a2863140f75095c7d3.tar.gz
unit-ec3389b63bd7a9159d2be4a2863140f75095c7d3.tar.bz2
Libunit refactoring: port management.
- Changed the port management callbacks to notifications, which e. g. avoids the need to call the libunit function - Added context and library instance reference counts for a safer resource release - Added the router main port initialization
Diffstat (limited to 'src')
-rw-r--r--src/nodejs/unit-http/unit.cpp46
-rw-r--r--src/nodejs/unit-http/unit.h2
-rw-r--r--src/nxt_application.c14
-rw-r--r--src/nxt_external.c14
-rw-r--r--src/nxt_process.c2
-rw-r--r--src/nxt_unit.c580
-rw-r--r--src/nxt_unit.h44
7 files changed, 372 insertions, 330 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 555b21fa..468acf96 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -18,7 +18,8 @@ static void delete_port_data(uv_handle_t* handle);
napi_ref Unit::constructor_;
-struct nxt_nodejs_ctx_t {
+struct port_data_t {
+ nxt_unit_ctx_t *ctx;
nxt_unit_port_id_t port_id;
uv_poll_t poll;
};
@@ -360,8 +361,8 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int err;
Unit *obj;
uv_loop_t *loop;
+ port_data_t *data;
napi_status status;
- nxt_nodejs_ctx_t *node_ctx;
if (port->in_fd != -1) {
obj = reinterpret_cast<Unit *>(ctx->unit->data);
@@ -378,27 +379,28 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
return NXT_UNIT_ERROR;
}
- node_ctx = new nxt_nodejs_ctx_t;
+ data = new port_data_t;
- err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
+ err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
return NXT_UNIT_ERROR;
}
- err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
+ err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll");
return NXT_UNIT_ERROR;
}
- ctx->data = node_ctx;
+ port->data = data;
- node_ctx->port_id = port->id;
- node_ctx->poll.data = ctx;
+ data->ctx = ctx;
+ data->port_id = port->id;
+ data->poll.data = ctx;
}
- return nxt_unit_add_port(ctx, port);
+ return NXT_UNIT_OK;
}
@@ -410,35 +412,31 @@ operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
void
-Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{
- nxt_nodejs_ctx_t *node_ctx;
+ port_data_t *data;
- if (ctx->data != NULL) {
- node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
+ if (port->data != NULL) {
+ data = (port_data_t *) port->data;
- if (node_ctx->port_id == *port_id) {
- uv_poll_stop(&node_ctx->poll);
+ if (data->port_id == port->id) {
+ uv_poll_stop(&data->poll);
- node_ctx->poll.data = node_ctx;
- uv_close((uv_handle_t *) &node_ctx->poll, delete_port_data);
-
- ctx->data = NULL;
+ data->poll.data = data;
+ uv_close((uv_handle_t *) &data->poll, delete_port_data);
}
}
-
- nxt_unit_remove_port(ctx, port_id);
}
static void
delete_port_data(uv_handle_t* handle)
{
- nxt_nodejs_ctx_t *node_ctx;
+ port_data_t *data;
- node_ctx = (nxt_nodejs_ctx_t *) handle->data;
+ data = (port_data_t *) handle->data;
- delete node_ctx;
+ delete data;
}
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index 18359118..07823c26 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -40,7 +40,7 @@ private:
void shm_ack_handler(nxt_unit_ctx_t *ctx);
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
- static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+ static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port);
static void quit_cb(nxt_unit_ctx_t *ctx);
void quit(nxt_unit_ctx_t *ctx);
diff --git a/src/nxt_application.c b/src/nxt_application.c
index c331764f..372a88b4 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -1263,7 +1263,7 @@ nxt_app_parse_type(u_char *p, size_t length)
nxt_int_t
nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
{
- nxt_port_t *my_port, *main_port;
+ nxt_port_t *my_port, *main_port, *router_port;
nxt_runtime_t *rt;
nxt_memzero(init, sizeof(nxt_unit_init_t));
@@ -1275,6 +1275,11 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
return NXT_ERROR;
}
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+ if (nxt_slow_path(router_port == NULL)) {
+ return NXT_ERROR;
+ }
+
my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
if (nxt_slow_path(my_port == NULL)) {
return NXT_ERROR;
@@ -1289,6 +1294,13 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
init->ready_stream = my_port->process->stream;
+ init->router_port.id.pid = router_port->pid;
+ init->router_port.id.id = router_port->id;
+ init->router_port.in_fd = -1;
+ init->router_port.out_fd = router_port->pair[1];
+
+ nxt_fd_blocking(task, router_port->pair[1]);
+
init->read_port.id.pid = my_port->pid;
init->read_port.id.id = my_port->id;
init->read_port.in_fd = my_port->pair[0];
diff --git a/src/nxt_external.c b/src/nxt_external.c
index 6370a9c4..2471c812 100644
--- a/src/nxt_external.c
+++ b/src/nxt_external.c
@@ -69,7 +69,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
nxt_str_t str;
nxt_int_t rc;
nxt_uint_t i, argc;
- nxt_port_t *my_port, *main_port;
+ nxt_port_t *my_port, *main_port, *router_port;
nxt_runtime_t *rt;
nxt_conf_value_t *value;
nxt_common_app_conf_t *conf;
@@ -79,9 +79,12 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
conf = data->app;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
- if (nxt_slow_path(main_port == NULL || my_port == NULL)) {
+ if (nxt_slow_path(main_port == NULL || my_port == NULL
+ || router_port == NULL))
+ {
return NXT_ERROR;
}
@@ -90,6 +93,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
return NXT_ERROR;
}
+ rc = nxt_external_fd_no_cloexec(task, router_port->pair[1]);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
rc = nxt_external_fd_no_cloexec(task, my_port->pair[0]);
if (nxt_slow_path(rc != NXT_OK)) {
return NXT_ERROR;
@@ -101,9 +109,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
"%s;%uD;"
"%PI,%ud,%d;"
"%PI,%ud,%d;"
+ "%PI,%ud,%d;"
"%d,%z,%Z",
NXT_VERSION, my_port->process->stream,
main_port->pid, main_port->id, main_port->pair[1],
+ router_port->pid, router_port->id, router_port->pair[1],
my_port->pid, my_port->id, my_port->pair[0],
2, conf->shm_limit);
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 215c529c..5a01c21e 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -61,7 +61,7 @@ nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
{ 1, 0, 0, 0, 0 },
{ 1, 0, 0, 1, 0 },
{ 1, 0, 1, 0, 1 },
- { 1, 0, 0, 0, 0 },
+ { 1, 0, 0, 1, 0 },
};
nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 89998e3f..8c964c7a 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -38,16 +38,19 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
+nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
+nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
+nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
+nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
- nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
- uint32_t *shm_limit);
-static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- uint32_t stream);
+ nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
+ int *log_fd, uint32_t *stream, uint32_t *shm_limit);
+static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
@@ -96,8 +99,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
-static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, int i);
+nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
+nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, uint32_t id);
@@ -110,28 +113,35 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
+static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
+static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
+static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
nxt_unit_port_id_t *new_port, int fd);
-static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
+static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+static int nxt_unit_remove_port(nxt_unit_impl_t *lib,
+ nxt_unit_port_id_t *port_id);
+static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
+ nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port,
nxt_unit_process_t **process);
-static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
+static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
+static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
-
-static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
+static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
+static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
void *oob, size_t oob_size);
@@ -233,6 +243,8 @@ struct nxt_unit_read_buf_s {
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
+ nxt_atomic_t use_count;
+
pthread_mutex_t mutex;
nxt_unit_port_id_t read_port_id;
@@ -269,6 +281,8 @@ struct nxt_unit_impl_s {
nxt_unit_t unit;
nxt_unit_callbacks_t callbacks;
+ nxt_atomic_t use_count;
+
uint32_t request_data_size;
uint32_t shm_mmap_limit;
@@ -277,7 +291,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
- nxt_unit_port_id_t ready_port_id;
+ nxt_unit_port_id_t router_port_id;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@@ -341,7 +355,7 @@ nxt_unit_init(nxt_unit_init_t *init)
uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
- nxt_unit_port_t ready_port, read_port;
+ nxt_unit_port_t ready_port, router_port, read_port;
lib = nxt_unit_create(init);
if (nxt_slow_path(lib == NULL)) {
@@ -354,17 +368,20 @@ nxt_unit_init(nxt_unit_init_t *init)
{
ready_port = init->ready_port;
ready_stream = init->ready_stream;
+ router_port = init->router_port;
read_port = init->read_port;
lib->log_fd = init->log_fd;
nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
ready_port.id.id);
+ nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
+ router_port.id.id);
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
} else {
- rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
- &ready_stream, &shm_limit);
+ rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
+ &lib->log_fd, &ready_stream, &shm_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
@@ -380,14 +397,16 @@ nxt_unit_init(nxt_unit_init_t *init)
lib->pid = read_port.id.pid;
ctx = &lib->main_ctx.ctx;
- rc = lib->callbacks.add_port(ctx, &ready_port);
- if (rc != NXT_UNIT_OK) {
- nxt_unit_alert(NULL, "failed to add ready_port");
+ rc = nxt_unit_add_port(ctx, &router_port);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_alert(NULL, "failed to add router_port");
goto fail;
}
- rc = lib->callbacks.add_port(ctx, &read_port);
+ lib->router_port_id = router_port.id;
+
+ rc = nxt_unit_add_port(ctx, &read_port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to add read_port");
@@ -395,15 +414,16 @@ nxt_unit_init(nxt_unit_init_t *init)
}
lib->main_ctx.read_port_id = read_port.id;
- lib->ready_port_id = ready_port.id;
- rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
+ rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
goto fail;
}
+ close(ready_port.out_fd);
+
return ctx;
fail:
@@ -450,6 +470,8 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_queue_init(&lib->contexts);
+ lib->use_count = 0;
+
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
@@ -463,26 +485,6 @@ nxt_unit_create(nxt_unit_init_t *init)
goto fail;
}
- if (cb->add_port == NULL) {
- cb->add_port = nxt_unit_add_port;
- }
-
- if (cb->remove_port == NULL) {
- cb->remove_port = nxt_unit_remove_port;
- }
-
- if (cb->remove_pid == NULL) {
- cb->remove_pid = nxt_unit_remove_pid;
- }
-
- if (cb->quit == NULL) {
- cb->quit = nxt_unit_quit;
- }
-
- if (cb->port_send == NULL) {
- cb->port_send = nxt_unit_port_send_default;
- }
-
if (cb->port_recv == NULL) {
cb->port_recv = nxt_unit_port_recv_default;
}
@@ -506,8 +508,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
ctx_impl->ctx.data = data;
ctx_impl->ctx.unit = &lib->unit;
- nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
-
rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
@@ -515,6 +515,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
return NXT_UNIT_ERROR;
}
+ nxt_unit_lib_use(lib);
+
+ nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+
+ ctx_impl->use_count = 1;
+
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
@@ -541,6 +547,62 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void
+nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
+}
+
+
+nxt_inline void
+nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ long c;
+
+ c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
+
+ if (c == 1) {
+ nxt_unit_ctx_free(ctx_impl);
+ }
+}
+
+
+nxt_inline void
+nxt_unit_lib_use(nxt_unit_impl_t *lib)
+{
+ nxt_atomic_fetch_add(&lib->use_count, 1);
+}
+
+
+nxt_inline void
+nxt_unit_lib_release(nxt_unit_impl_t *lib)
+{
+ long c;
+ nxt_unit_process_t *process;
+
+ c = nxt_atomic_fetch_add(&lib->use_count, -1);
+
+ if (c == 1) {
+ for ( ;; ) {
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_pop_first(lib);
+ if (process == NULL) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ break;
+ }
+
+ nxt_unit_remove_process(lib, process);
+ }
+
+ pthread_mutex_destroy(&lib->mutex);
+
+ free(lib);
+ }
+}
+
+
+nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf)
{
@@ -585,15 +647,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
static int
-nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
- int *log_fd, uint32_t *stream, uint32_t *shm_limit)
+nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
+ nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
+ uint32_t *shm_limit)
{
int rc;
- int ready_fd, read_fd;
+ int ready_fd, router_fd, read_fd;
char *unit_init, *version_end;
long version_length;
- int64_t ready_pid, read_pid;
- uint32_t ready_stream, ready_id, read_id;
+ int64_t ready_pid, router_pid, read_pid;
+ uint32_t ready_stream, router_id, ready_id, read_id;
unit_init = getenv(NXT_UNIT_INIT_ENV);
if (nxt_slow_path(unit_init == NULL)) {
@@ -621,13 +684,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
+ "%"PRId64",%"PRIu32",%d;"
"%d,%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
+ &router_pid, &router_id, &router_fd,
&read_pid, &read_id, &read_fd,
log_fd, shm_limit);
- if (nxt_slow_path(rc != 9)) {
+ if (nxt_slow_path(rc != 12)) {
nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
return NXT_UNIT_ERROR;
@@ -639,6 +704,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
ready_port->out_fd = ready_fd;
ready_port->data = NULL;
+ nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
+
+ router_port->in_fd = -1;
+ router_port->out_fd = router_fd;
+ router_port->data = NULL;
+
nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
read_port->in_fd = read_fd;
@@ -652,8 +723,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
static int
-nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- uint32_t stream)
+nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
{
ssize_t res;
nxt_port_msg_t msg;
@@ -671,7 +741,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
@@ -684,13 +754,12 @@ int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
- int rc;
- pid_t pid;
- struct cmsghdr *cm;
- nxt_port_msg_t *port_msg;
- nxt_unit_impl_t *lib;
- nxt_unit_recv_msg_t recv_msg;
- nxt_unit_callbacks_t *cb;
+ int rc;
+ pid_t pid;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_recv_msg_t recv_msg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -749,14 +818,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
}
}
- cb = &lib->callbacks;
-
switch (port_msg->type) {
case _NXT_PORT_MSG_QUIT:
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
- cb->quit(ctx);
+ nxt_unit_quit(ctx);
rc = NXT_UNIT_OK;
break;
@@ -812,7 +879,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
port_msg->stream, (int) pid);
- cb->remove_pid(ctx, pid);
+ nxt_unit_remove_pid(lib, pid);
rc = NXT_UNIT_OK;
break;
@@ -839,7 +906,7 @@ fail:
}
if (recv_msg.process != NULL) {
- nxt_unit_process_use(ctx, recv_msg.process, -1);
+ nxt_unit_process_release(recv_msg.process);
}
return rc;
@@ -850,7 +917,6 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
- nxt_unit_impl_t *lib;
nxt_unit_port_t new_port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -894,9 +960,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->fd = -1;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- return lib->callbacks.add_port(ctx, &new_port);
+ return nxt_unit_add_port(ctx, &new_port);
}
@@ -1206,7 +1270,7 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
* existence.
*/
if (req_impl->process != NULL) {
- nxt_unit_process_use(req->ctx, req_impl->process, -1);
+ nxt_unit_process_release(req_impl->process);
req_impl->process = NULL;
}
@@ -1808,7 +1872,7 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
pthread_mutex_lock(&lib->mutex);
- recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
+ recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0);
pthread_mutex_unlock(&lib->mutex);
@@ -1869,15 +1933,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
}
-typedef struct {
- size_t len;
- const char *str;
-} nxt_unit_str_t;
-
-
-#define nxt_unit_str(str) { nxt_length(str), str }
-
-
int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
{
@@ -2064,8 +2119,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
(int) m.mmap_msg.chunk_id,
(int) m.mmap_msg.size);
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
- NULL, 0);
+ res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
+ NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
goto free_buf;
}
@@ -2114,10 +2169,10 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
stream,
(int) (sizeof(m.msg) + m.mmap_msg.size));
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
- buf->start - sizeof(m.msg),
- m.mmap_msg.size + sizeof(m.msg),
- NULL, 0);
+ res = nxt_unit_port_send(ctx, &mmap_buf->port_id,
+ buf->start - sizeof(m.msg),
+ m.mmap_msg.size + sizeof(m.msg),
+ NULL, 0);
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
goto free_buf;
}
@@ -2689,8 +2744,8 @@ skip_response_send:
msg.mf = 0;
msg.tracking = 0;
- (void) lib->callbacks.port_send(req->ctx, &req->response_port,
- &msg, sizeof(msg), NULL, 0);
+ (void) nxt_unit_port_send(req->ctx, &req->response_port,
+ &msg, sizeof(msg), NULL, 0);
nxt_unit_request_info_release(req);
}
@@ -3006,7 +3061,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3284,8 +3339,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3382,7 +3437,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_find(ctx, pid, 0);
+ process = nxt_unit_process_find(lib, pid, 0);
pthread_mutex_unlock(&lib->mutex);
@@ -3444,7 +3499,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
fail:
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
return rc;
}
@@ -3462,15 +3517,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
}
-static void
-nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
+nxt_inline void
+nxt_unit_process_use(nxt_unit_process_t *process)
+{
+ nxt_atomic_fetch_add(&process->use_count, 1);
+}
+
+
+nxt_inline void
+nxt_unit_process_release(nxt_unit_process_t *process)
{
long c;
- c = nxt_atomic_fetch_add(&process->use_count, i);
+ c = nxt_atomic_fetch_add(&process->use_count, -1);
- if (i < 0 && c == -i) {
- nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
+ if (c == 1) {
+ nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
nxt_unit_mmaps_destroy(&process->incoming);
nxt_unit_mmaps_destroy(&process->outgoing);
@@ -3727,7 +3789,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3772,26 +3834,23 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
static nxt_unit_process_t *
-nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
+nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
{
- nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_unit_process_lhq_pid(&lhq, &pid);
if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
process = lhq.value;
- nxt_unit_process_use(ctx, process, 1);
+ nxt_unit_process_use(process);
return process;
}
process = malloc(sizeof(nxt_unit_process_t));
if (nxt_slow_path(process == NULL)) {
- nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
+ nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
return NULL;
}
@@ -3815,7 +3874,7 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
break;
default:
- nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
+ nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
pthread_mutex_destroy(&process->outgoing.mutex);
pthread_mutex_destroy(&process->incoming.mutex);
@@ -3824,22 +3883,19 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
break;
}
- nxt_unit_process_use(ctx, process, 1);
+ nxt_unit_process_use(process);
return process;
}
static nxt_unit_process_t *
-nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
+nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
{
int rc;
- nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_unit_process_lhq_pid(&lhq, &pid);
if (remove) {
@@ -3853,7 +3909,7 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
process = lhq.value;
if (!remove) {
- nxt_unit_process_use(ctx, process, 1);
+ nxt_unit_process_use(process);
}
return process;
@@ -3873,8 +3929,13 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_unit_ctx_use(ctx_impl);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
@@ -3887,6 +3948,8 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
}
}
+ nxt_unit_ctx_release(ctx_impl);
+
return rc;
}
@@ -3900,6 +3963,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ nxt_unit_ctx_use(ctx_impl);
+
pthread_mutex_lock(&ctx_impl->mutex);
if (ctx_impl->pending_read_head != NULL) {
@@ -3915,6 +3980,9 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
} else {
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
if (nxt_slow_path(rbuf == NULL)) {
+
+ nxt_unit_ctx_release(ctx_impl);
+
return NXT_UNIT_ERROR;
}
@@ -3936,6 +4004,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
nxt_unit_read_buf_release(ctx, rbuf);
+ nxt_unit_ctx_release(ctx_impl);
+
return rc;
}
@@ -3968,34 +4038,11 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
-
- nxt_unit_ctx_free(&ctx_impl->ctx);
-
- } nxt_queue_loop;
-
- for ( ;; ) {
- pthread_mutex_lock(&lib->mutex);
-
- process = nxt_unit_process_pop_first(lib);
- if (process == NULL) {
- pthread_mutex_unlock(&lib->mutex);
-
- break;
- }
-
- nxt_unit_remove_process(ctx, process);
- }
-
- pthread_mutex_destroy(&lib->mutex);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- free(lib);
+ nxt_unit_ctx_release(ctx_impl);
}
@@ -4023,9 +4070,9 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
return NULL;
}
- rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
+ rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- lib->callbacks.remove_port(ctx, &new_port_id);
+ nxt_unit_remove_port(lib, &new_port_id);
close(fd);
@@ -4038,7 +4085,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- lib->callbacks.remove_port(ctx, &new_port_id);
+ nxt_unit_remove_port(lib, &new_port_id);
free(new_ctx);
@@ -4051,17 +4098,15 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
}
-void
-nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
+static void
+nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
{
nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
nxt_queue_each(req_impl, &ctx_impl->active_req,
nxt_unit_request_info_impl_t, link)
@@ -4102,6 +4147,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
if (ctx_impl != &lib->main_ctx) {
free(ctx_impl);
}
+
+ nxt_unit_lib_release(lib);
}
@@ -4127,36 +4174,6 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
}
-int
-nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *port_id)
-{
- int rc, fd;
- nxt_unit_impl_t *lib;
- nxt_unit_port_id_t new_port_id;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
- }
-
- rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
-
- if (nxt_fast_path(rc == NXT_UNIT_OK)) {
- *port_id = new_port_id;
-
- } else {
- lib->callbacks.remove_port(ctx, &new_port_id);
- }
-
- close(fd);
-
- return rc;
-}
-
-
static int
nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
{
@@ -4180,7 +4197,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_get(ctx, lib->pid);
+ process = nxt_unit_process_get(lib, lib->pid);
if (nxt_slow_path(process == NULL)) {
pthread_mutex_unlock(&lib->mutex);
@@ -4198,9 +4215,9 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
pthread_mutex_unlock(&lib->mutex);
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
- rc = lib->callbacks.add_port(ctx, &new_port);
+ rc = nxt_unit_add_port(ctx, &new_port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_warn(ctx, "create_port: add_port() failed");
@@ -4269,14 +4286,13 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
- res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
- return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
+ return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}
-int
+static int
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
@@ -4295,18 +4311,41 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
+ if (old_port->port.data == NULL) {
+ old_port->port.data = port->data;
+ port->data = NULL;
+ }
+
+ if (old_port->port.in_fd == -1) {
+ old_port->port.in_fd = port->in_fd;
+ port->in_fd = -1;
+ }
+
if (port->in_fd != -1) {
close(port->in_fd);
port->in_fd = -1;
}
+ if (old_port->port.out_fd == -1) {
+ old_port->port.out_fd = port->out_fd;
+ port->out_fd = -1;
+ }
+
if (port->out_fd != -1) {
close(port->out_fd);
port->out_fd = -1;
}
+ *port = old_port->port;
+
pthread_mutex_unlock(&lib->mutex);
+ if (lib->callbacks.add_port != NULL
+ && (port->in_fd != -1 || port->out_fd != -1))
+ {
+ lib->callbacks.add_port(ctx, &old_port->port);
+ }
+
return NXT_UNIT_OK;
}
@@ -4314,7 +4353,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
- process = nxt_unit_process_get(ctx, port->id.pid);
+ process = nxt_unit_process_get(lib, port->id.pid);
if (nxt_slow_path(process == NULL)) {
rc = NXT_UNIT_ERROR;
goto unlock;
@@ -4351,72 +4390,80 @@ unlock:
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
}
- return rc;
-}
-
+ if (lib->callbacks.add_port != NULL
+ && rc == NXT_UNIT_OK
+ && (port->in_fd != -1 || port->out_fd != -1))
+ {
+ lib->callbacks.add_port(ctx, &new_port->port);
+ }
-void
-nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
-{
- nxt_unit_find_remove_port(ctx, port_id, NULL);
+ return rc;
}
-void
-nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *r_port)
+static int
+nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
{
- nxt_unit_impl_t *lib;
+ int res;
+ nxt_unit_port_t *port;
nxt_unit_process_t *process;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ port = NULL;
+ process = NULL;
pthread_mutex_lock(&lib->mutex);
- process = NULL;
-
- nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
+ res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process);
pthread_mutex_unlock(&lib->mutex);
+ if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) {
+ lib->callbacks.remove_port(&lib->unit, port);
+ }
+
+ if (nxt_fast_path(port != NULL)) {
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+ }
+ }
+
if (nxt_slow_path(process != NULL)) {
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
+ }
+
+ if (nxt_fast_path(port != NULL)) {
+ free(port);
}
+
+ return res;
}
-static void
-nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *r_port, nxt_unit_process_t **process)
+static int
+nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id,
+ nxt_unit_port_t **r_port, nxt_unit_process_t **process)
{
- nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
+ nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
(int) port_id->pid, (int) port_id->id);
- return;
+ return NXT_UNIT_ERROR;
}
- nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
+ nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
port->port.in_fd, port->port.out_fd, port->port.data);
- if (port->port.in_fd != -1) {
- close(port->port.in_fd);
- }
-
- if (port->port.out_fd != -1) {
- close(port->port.out_fd);
- }
-
if (port->process != NULL) {
nxt_queue_remove(&port->link);
}
@@ -4426,60 +4473,55 @@ nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
}
if (r_port != NULL) {
- *r_port = port->port;
+ *r_port = &port->port;
}
- free(port);
+ return NXT_UNIT_OK;
}
-void
-nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
+static void
+nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
{
- nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_find(ctx, pid, 1);
+ process = nxt_unit_process_find(lib, pid, 1);
if (nxt_slow_path(process == NULL)) {
- nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
+ nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
pthread_mutex_unlock(&lib->mutex);
return;
}
- nxt_unit_remove_process(ctx, process);
+ nxt_unit_remove_process(lib, process);
+
+ if (lib->callbacks.remove_pid != NULL) {
+ lib->callbacks.remove_pid(&lib->unit, pid);
+ }
}
static void
-nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
+nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
{
nxt_queue_t ports;
- nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_queue_init(&ports);
nxt_queue_add(&ports, &process->ports);
nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
- nxt_unit_process_use(ctx, process, -1);
- port->process = NULL;
+ nxt_unit_process_release(process);
- /* Shortcut for default callback. */
- if (lib->callbacks.remove_port == nxt_unit_remove_port) {
- nxt_queue_remove(&port->link);
+ /* To avoid unlink port. */
+ port->process = NULL;
- nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
- }
+ nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL);
} nxt_queue_loop;
@@ -4489,15 +4531,27 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
nxt_queue_remove(&port->link);
- lib->callbacks.remove_port(ctx, &port->port.id);
+ if (lib->callbacks.remove_port != NULL) {
+ lib->callbacks.remove_port(&lib->unit, &port->port);
+ }
+
+ if (port->port.in_fd != -1) {
+ close(port->port.in_fd);
+ }
+
+ if (port->port.out_fd != -1) {
+ close(port->port.out_fd);
+ }
+
+ free(port);
} nxt_queue_loop;
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
}
-void
+static void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
@@ -4505,11 +4559,15 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->online = 0;
+
+ if (lib->callbacks.quit != NULL) {
+ lib->callbacks.quit(ctx);
+ }
}
static ssize_t
-nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
int fd;
@@ -4522,35 +4580,35 @@ nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
- if (nxt_fast_path(port != NULL)) {
+ if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) {
fd = port->port.out_fd;
- } else {
- nxt_unit_warn(ctx, "port_send: port %d,%d not found",
- (int) port_id->pid, (int) port_id->id);
- fd = -1;
- }
+ pthread_mutex_unlock(&lib->mutex);
- pthread_mutex_unlock(&lib->mutex);
+ } else {
+ pthread_mutex_unlock(&lib->mutex);
- if (nxt_slow_path(fd == -1)) {
- if (port != NULL) {
- nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
- (int) port_id->pid, (int) port_id->id);
- }
+ nxt_unit_alert(ctx, "port_send: port %d,%d not found",
+ (int) port_id->pid, (int) port_id->id);
- return -1;
+ return -NXT_UNIT_ERROR;
}
nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
(int) port_id->pid, (int) port_id->id, fd);
- return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
+ if (lib->callbacks.port_send == NULL) {
+ return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size);
+
+ } else {
+ return lib->callbacks.port_send(ctx, port_id, buf, buf_size,
+ oob, oob_size);
+ }
}
-ssize_t
-nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
+static ssize_t
+nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
ssize_t res;
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 596dd8b6..fa1fa843 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -130,15 +130,15 @@ struct nxt_unit_callbacks_s {
int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
/* Remove previously added port. Optional. */
- void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
+ void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
/* Remove all data associated with process pid including ports. Optional. */
- void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid);
+ void (*remove_pid)(nxt_unit_t *, pid_t pid);
/* Gracefully quit the application. Optional. */
void (*quit)(nxt_unit_ctx_t *);
- /* Shared memory release acknowledgement. */
+ /* Shared memory release acknowledgement. Optional. */
void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */
@@ -149,7 +149,6 @@ struct nxt_unit_callbacks_s {
/* Receive data on port id. Optional. */
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size);
-
};
@@ -165,6 +164,7 @@ struct nxt_unit_init_s {
nxt_unit_port_t ready_port;
uint32_t ready_stream;
+ nxt_unit_port_t router_port;
nxt_unit_port_t read_port;
int log_fd;
};
@@ -222,45 +222,9 @@ void nxt_unit_done(nxt_unit_ctx_t *);
*/
nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
-/* Free unused context. It is not required to free main context. */
-void nxt_unit_ctx_free(nxt_unit_ctx_t *);
-
/* Initialize port_id, calculate hash. */
void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
-/*
- * Create extra incoming port, perform all required actions to propogate
- * the port to Unit server. Fills structure referenced by port_id with
- * current pid and new port id.
- */
-int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *port_id);
-
-/* Default 'add_port' implementation. */
-int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
-
-/* Find previously added port. */
-nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *,
- nxt_unit_port_id_t *port_id);
-
-/* Find, fill output 'port' and remove port from storage. */
-void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *port);
-
-/* Default 'remove_port' implementation. */
-void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
-
-/* Default 'remove_pid' implementation. */
-void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid);
-
-/* Default 'quit' implementation. */
-void nxt_unit_quit(nxt_unit_ctx_t *);
-
-/* Default 'port_send' implementation. */
-ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd,
- const void *buf, size_t buf_size,
- const void *oob, size_t oob_size);
-
/* Default 'port_recv' implementation. */
ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
void *oob, size_t oob_size);