diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:19:55 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:19:55 +0300 |
commit | ec3389b63bd7a9159d2be4a2863140f75095c7d3 (patch) | |
tree | f88972c19c713748b56c339dab1bdd60892eaeec | |
parent | 3a721e1d96720505d4d6638e77d2c296d962519c (diff) | |
download | unit-ec3389b63bd7a9159d2be4a2863140f75095c7d3.tar.gz unit-ec3389b63bd7a9159d2be4a2863140f75095c7d3.tar.bz2 |
Libunit refactoring: port management.
- Changed the port management callbacks to notifications, which e. g. avoids
the need to call the libunit function
- Added context and library instance reference counts for a safer resource
release
- Added the router main port initialization
-rw-r--r-- | go/nxt_cgo_lib.c | 13 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 46 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 2 | ||||
-rw-r--r-- | src/nxt_application.c | 14 | ||||
-rw-r--r-- | src/nxt_external.c | 14 | ||||
-rw-r--r-- | src/nxt_process.c | 2 | ||||
-rw-r--r-- | src/nxt_unit.c | 580 | ||||
-rw-r--r-- | src/nxt_unit.h | 44 |
8 files changed, 379 insertions, 336 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c index a4fef9ea..1bb38f3c 100644 --- a/go/nxt_cgo_lib.c +++ b/go/nxt_cgo_lib.c @@ -14,7 +14,7 @@ static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length); static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); -static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); +static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port); static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, @@ -108,16 +108,17 @@ nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_go_add_port(port->id.pid, port->id.id, port->in_fd, port->out_fd); - return nxt_unit_add_port(ctx, port); + port->in_fd = -1; + port->out_fd = -1; + + return NXT_UNIT_OK; } static void -nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) { - nxt_go_remove_port(port_id->pid, port_id->id); - - nxt_unit_remove_port(ctx, port_id); + nxt_go_remove_port(port->id.pid, port->id.id); } diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 555b21fa..468acf96 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -18,7 +18,8 @@ static void delete_port_data(uv_handle_t* handle); napi_ref Unit::constructor_; -struct nxt_nodejs_ctx_t { +struct port_data_t { + nxt_unit_ctx_t *ctx; nxt_unit_port_id_t port_id; uv_poll_t poll; }; @@ -360,8 +361,8 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int err; Unit *obj; uv_loop_t *loop; + port_data_t *data; napi_status status; - nxt_nodejs_ctx_t *node_ctx; if (port->in_fd != -1) { obj = reinterpret_cast<Unit *>(ctx->unit->data); @@ -378,27 +379,28 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) return NXT_UNIT_ERROR; } - node_ctx = new nxt_nodejs_ctx_t; + data = new port_data_t; - err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); + err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); return NXT_UNIT_ERROR; } - err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); + err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); return NXT_UNIT_ERROR; } - ctx->data = node_ctx; + port->data = data; - node_ctx->port_id = port->id; - node_ctx->poll.data = ctx; + data->ctx = ctx; + data->port_id = port->id; + data->poll.data = ctx; } - return nxt_unit_add_port(ctx, port); + return NXT_UNIT_OK; } @@ -410,35 +412,31 @@ operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) void -Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) { - nxt_nodejs_ctx_t *node_ctx; + port_data_t *data; - if (ctx->data != NULL) { - node_ctx = (nxt_nodejs_ctx_t *) ctx->data; + if (port->data != NULL) { + data = (port_data_t *) port->data; - if (node_ctx->port_id == *port_id) { - uv_poll_stop(&node_ctx->poll); + if (data->port_id == port->id) { + uv_poll_stop(&data->poll); - node_ctx->poll.data = node_ctx; - uv_close((uv_handle_t *) &node_ctx->poll, delete_port_data); - - ctx->data = NULL; + data->poll.data = data; + uv_close((uv_handle_t *) &data->poll, delete_port_data); } } - - nxt_unit_remove_port(ctx, port_id); } static void delete_port_data(uv_handle_t* handle) { - nxt_nodejs_ctx_t *node_ctx; + port_data_t *data; - node_ctx = (nxt_nodejs_ctx_t *) handle->data; + data = (port_data_t *) handle->data; - delete node_ctx; + delete data; } diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 18359118..07823c26 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -40,7 +40,7 @@ private: void shm_ack_handler(nxt_unit_ctx_t *ctx); static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); - static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); + static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port); static void quit_cb(nxt_unit_ctx_t *ctx); void quit(nxt_unit_ctx_t *ctx); diff --git a/src/nxt_application.c b/src/nxt_application.c index c331764f..372a88b4 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -1263,7 +1263,7 @@ nxt_app_parse_type(u_char *p, size_t length) nxt_int_t nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) { - nxt_port_t *my_port, *main_port; + nxt_port_t *my_port, *main_port, *router_port; nxt_runtime_t *rt; nxt_memzero(init, sizeof(nxt_unit_init_t)); @@ -1275,6 +1275,11 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) return NXT_ERROR; } + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + if (nxt_slow_path(router_port == NULL)) { + return NXT_ERROR; + } + my_port = nxt_runtime_port_find(rt, nxt_pid, 0); if (nxt_slow_path(my_port == NULL)) { return NXT_ERROR; @@ -1289,6 +1294,13 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) init->ready_stream = my_port->process->stream; + init->router_port.id.pid = router_port->pid; + init->router_port.id.id = router_port->id; + init->router_port.in_fd = -1; + init->router_port.out_fd = router_port->pair[1]; + + nxt_fd_blocking(task, router_port->pair[1]); + init->read_port.id.pid = my_port->pid; init->read_port.id.id = my_port->id; init->read_port.in_fd = my_port->pair[0]; diff --git a/src/nxt_external.c b/src/nxt_external.c index 6370a9c4..2471c812 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -69,7 +69,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) nxt_str_t str; nxt_int_t rc; nxt_uint_t i, argc; - nxt_port_t *my_port, *main_port; + nxt_port_t *my_port, *main_port, *router_port; nxt_runtime_t *rt; nxt_conf_value_t *value; nxt_common_app_conf_t *conf; @@ -79,9 +79,12 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) conf = data->app; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; my_port = nxt_runtime_port_find(rt, nxt_pid, 0); - if (nxt_slow_path(main_port == NULL || my_port == NULL)) { + if (nxt_slow_path(main_port == NULL || my_port == NULL + || router_port == NULL)) + { return NXT_ERROR; } @@ -90,6 +93,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } + rc = nxt_external_fd_no_cloexec(task, router_port->pair[1]); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + rc = nxt_external_fd_no_cloexec(task, my_port->pair[0]); if (nxt_slow_path(rc != NXT_OK)) { return NXT_ERROR; @@ -101,9 +109,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) "%s;%uD;" "%PI,%ud,%d;" "%PI,%ud,%d;" + "%PI,%ud,%d;" "%d,%z,%Z", NXT_VERSION, my_port->process->stream, main_port->pid, main_port->id, main_port->pair[1], + router_port->pid, router_port->id, router_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], 2, conf->shm_limit); diff --git a/src/nxt_process.c b/src/nxt_process.c index 215c529c..5a01c21e 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -61,7 +61,7 @@ nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { { 1, 0, 0, 0, 0 }, { 1, 0, 0, 1, 0 }, { 1, 0, 1, 0, 1 }, - { 1, 0, 0, 0, 0 }, + { 1, 0, 0, 1, 0 }, }; nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 89998e3f..8c964c7a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,16 +38,19 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); +nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream); + nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *log_fd, uint32_t *stream, uint32_t *shm_limit); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -96,8 +99,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); -static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, int i); +nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); +nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, uint32_t id); @@ -110,28 +113,35 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, nxt_unit_port_id_t *new_port, int fd); -static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static int nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, nxt_unit_process_t **process); -static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, +static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); +static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); - -static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, +static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); @@ -233,6 +243,8 @@ struct nxt_unit_read_buf_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + nxt_atomic_t use_count; + pthread_mutex_t mutex; nxt_unit_port_id_t read_port_id; @@ -269,6 +281,8 @@ struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; + nxt_atomic_t use_count; + uint32_t request_data_size; uint32_t shm_mmap_limit; @@ -277,7 +291,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t ready_port_id; + nxt_unit_port_id_t router_port_id; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -341,7 +355,7 @@ nxt_unit_init(nxt_unit_init_t *init) uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port; lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { @@ -354,17 +368,20 @@ nxt_unit_init(nxt_unit_init_t *init) { ready_port = init->ready_port; ready_stream = init->ready_stream; + router_port = init->router_port; read_port = init->read_port; lib->log_fd = init->log_fd; nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, ready_port.id.id); + nxt_unit_port_id_init(&router_port.id, router_port.id.pid, + router_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); } else { - rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream, &shm_limit); + rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &lib->log_fd, &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -380,14 +397,16 @@ nxt_unit_init(nxt_unit_init_t *init) lib->pid = read_port.id.pid; ctx = &lib->main_ctx.ctx; - rc = lib->callbacks.add_port(ctx, &ready_port); - if (rc != NXT_UNIT_OK) { - nxt_unit_alert(NULL, "failed to add ready_port"); + rc = nxt_unit_add_port(ctx, &router_port); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - rc = lib->callbacks.add_port(ctx, &read_port); + lib->router_port_id = router_port.id; + + rc = nxt_unit_add_port(ctx, &read_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to add read_port"); @@ -395,15 +414,16 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->main_ctx.read_port_id = read_port.id; - lib->ready_port_id = ready_port.id; - rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); goto fail; } + close(ready_port.out_fd); + return ctx; fail: @@ -450,6 +470,8 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); + lib->use_count = 0; + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; @@ -463,26 +485,6 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } - if (cb->add_port == NULL) { - cb->add_port = nxt_unit_add_port; - } - - if (cb->remove_port == NULL) { - cb->remove_port = nxt_unit_remove_port; - } - - if (cb->remove_pid == NULL) { - cb->remove_pid = nxt_unit_remove_pid; - } - - if (cb->quit == NULL) { - cb->quit = nxt_unit_quit; - } - - if (cb->port_send == NULL) { - cb->port_send = nxt_unit_port_send_default; - } - if (cb->port_recv == NULL) { cb->port_recv = nxt_unit_port_recv_default; } @@ -506,8 +508,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; - nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - rc = pthread_mutex_init(&ctx_impl->mutex, NULL); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); @@ -515,6 +515,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, return NXT_UNIT_ERROR; } + nxt_unit_lib_use(lib); + + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + + ctx_impl->use_count = 1; + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); @@ -541,6 +547,62 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void +nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); +} + + +nxt_inline void +nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +{ + long c; + + c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); + + if (c == 1) { + nxt_unit_ctx_free(ctx_impl); + } +} + + +nxt_inline void +nxt_unit_lib_use(nxt_unit_impl_t *lib) +{ + nxt_atomic_fetch_add(&lib->use_count, 1); +} + + +nxt_inline void +nxt_unit_lib_release(nxt_unit_impl_t *lib) +{ + long c; + nxt_unit_process_t *process; + + c = nxt_atomic_fetch_add(&lib->use_count, -1); + + if (c == 1) { + for ( ;; ) { + pthread_mutex_lock(&lib->mutex); + + process = nxt_unit_process_pop_first(lib); + if (process == NULL) { + pthread_mutex_unlock(&lib->mutex); + + break; + } + + nxt_unit_remove_process(lib, process); + } + + pthread_mutex_destroy(&lib->mutex); + + free(lib); + } +} + + +nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf) { @@ -585,15 +647,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int -nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit) +nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit) { int rc; - int ready_fd, read_fd; + int ready_fd, router_fd, read_fd; char *unit_init, *version_end; long version_length; - int64_t ready_pid, read_pid; - uint32_t ready_stream, ready_id, read_id; + int64_t ready_pid, router_pid, read_pid; + uint32_t ready_stream, router_id, ready_id, read_id; unit_init = getenv(NXT_UNIT_INIT_ENV); if (nxt_slow_path(unit_init == NULL)) { @@ -621,13 +684,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, + &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 9)) { + if (nxt_slow_path(rc != 12)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -639,6 +704,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, ready_port->out_fd = ready_fd; ready_port->data = NULL; + nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); + + router_port->in_fd = -1; + router_port->out_fd = router_fd; + router_port->data = NULL; + nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); read_port->in_fd = read_fd; @@ -652,8 +723,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) { ssize_t res; nxt_port_msg_t msg; @@ -671,7 +741,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -684,13 +754,12 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int rc; - pid_t pid; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -749,14 +818,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } } - cb = &lib->callbacks; - switch (port_msg->type) { case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); - cb->quit(ctx); + nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -812,7 +879,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", port_msg->stream, (int) pid); - cb->remove_pid(ctx, pid); + nxt_unit_remove_pid(lib, pid); rc = NXT_UNIT_OK; break; @@ -839,7 +906,7 @@ fail: } if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + nxt_unit_process_release(recv_msg.process); } return rc; @@ -850,7 +917,6 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; - nxt_unit_impl_t *lib; nxt_unit_port_t new_port; nxt_port_msg_new_port_t *new_port_msg; @@ -894,9 +960,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->fd = -1; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return lib->callbacks.add_port(ctx, &new_port); + return nxt_unit_add_port(ctx, &new_port); } @@ -1206,7 +1270,7 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) * existence. */ if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); + nxt_unit_process_release(req_impl->process); req_impl->process = NULL; } @@ -1808,7 +1872,7 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) pthread_mutex_lock(&lib->mutex); - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); + recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -1869,15 +1933,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) } -typedef struct { - size_t len; - const char *str; -} nxt_unit_str_t; - - -#define nxt_unit_str(str) { nxt_length(str), str } - - int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) { @@ -2064,8 +2119,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2114,10 +2169,10 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, - buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2689,8 +2744,8 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) lib->callbacks.port_send(req->ctx, &req->response_port, - &msg, sizeof(msg), NULL, 0); + (void) nxt_unit_port_send(req->ctx, &req->response_port, + &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); } @@ -3006,7 +3061,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3284,8 +3339,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3382,7 +3437,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 0); + process = nxt_unit_process_find(lib, pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -3444,7 +3499,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) fail: - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); return rc; } @@ -3462,15 +3517,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) } -static void -nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) +nxt_inline void +nxt_unit_process_use(nxt_unit_process_t *process) +{ + nxt_atomic_fetch_add(&process->use_count, 1); +} + + +nxt_inline void +nxt_unit_process_release(nxt_unit_process_t *process) { long c; - c = nxt_atomic_fetch_add(&process->use_count, i); + c = nxt_atomic_fetch_add(&process->use_count, -1); - if (i < 0 && c == -i) { - nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); + if (c == 1) { + nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); nxt_unit_mmaps_destroy(&process->incoming); nxt_unit_mmaps_destroy(&process->outgoing); @@ -3727,7 +3789,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3772,26 +3834,23 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) static nxt_unit_process_t * -nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) +nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { process = lhq.value; - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } process = malloc(sizeof(nxt_unit_process_t)); if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); + nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); return NULL; } @@ -3815,7 +3874,7 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; default: - nxt_unit_warn(ctx, "process %d insert failed", (int) pid); + nxt_unit_alert(NULL, "process %d insert failed", (int) pid); pthread_mutex_destroy(&process->outgoing.mutex); pthread_mutex_destroy(&process->incoming.mutex); @@ -3824,22 +3883,19 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; } - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } static nxt_unit_process_t * -nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) +nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (remove) { @@ -3853,7 +3909,7 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) process = lhq.value; if (!remove) { - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); } return process; @@ -3873,8 +3929,13 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_unit_ctx_use(ctx_impl); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; @@ -3887,6 +3948,8 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) } } + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3900,6 +3963,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx_impl); + pthread_mutex_lock(&ctx_impl->mutex); if (ctx_impl->pending_read_head != NULL) { @@ -3915,6 +3980,9 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); if (nxt_slow_path(rbuf == NULL)) { + + nxt_unit_ctx_release(ctx_impl); + return NXT_UNIT_ERROR; } @@ -3936,6 +4004,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3968,34 +4038,11 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) void nxt_unit_done(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { - - nxt_unit_ctx_free(&ctx_impl->ctx); - - } nxt_queue_loop; - - for ( ;; ) { - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_pop_first(lib); - if (process == NULL) { - pthread_mutex_unlock(&lib->mutex); - - break; - } - - nxt_unit_remove_process(ctx, process); - } - - pthread_mutex_destroy(&lib->mutex); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - free(lib); + nxt_unit_ctx_release(ctx_impl); } @@ -4023,9 +4070,9 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } - rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); + rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); close(fd); @@ -4038,7 +4085,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); free(new_ctx); @@ -4051,17 +4098,15 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) } -void -nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) +static void +nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); nxt_queue_each(req_impl, &ctx_impl->active_req, nxt_unit_request_info_impl_t, link) @@ -4102,6 +4147,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } + + nxt_unit_lib_release(lib); } @@ -4127,36 +4174,6 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -int -nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id) -{ - int rc, fd; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; - } - - rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); - - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - *port_id = new_port_id; - - } else { - lib->callbacks.remove_port(ctx, &new_port_id); - } - - close(fd); - - return rc; -} - - static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) { @@ -4180,7 +4197,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, lib->pid); + process = nxt_unit_process_get(lib, lib->pid); if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); @@ -4198,9 +4215,9 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); - rc = lib->callbacks.add_port(ctx, &new_port); + rc = nxt_unit_add_port(ctx, &new_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_warn(ctx, "create_port: add_port() failed"); @@ -4269,14 +4286,13 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); - return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; + return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } -int +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -4295,18 +4311,41 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); + if (old_port->port.data == NULL) { + old_port->port.data = port->data; + port->data = NULL; + } + + if (old_port->port.in_fd == -1) { + old_port->port.in_fd = port->in_fd; + port->in_fd = -1; + } + if (port->in_fd != -1) { close(port->in_fd); port->in_fd = -1; } + if (old_port->port.out_fd == -1) { + old_port->port.out_fd = port->out_fd; + port->out_fd = -1; + } + if (port->out_fd != -1) { close(port->out_fd); port->out_fd = -1; } + *port = old_port->port; + pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.add_port != NULL + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &old_port->port); + } + return NXT_UNIT_OK; } @@ -4314,7 +4353,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); - process = nxt_unit_process_get(ctx, port->id.pid); + process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { rc = NXT_UNIT_ERROR; goto unlock; @@ -4351,72 +4390,80 @@ unlock: pthread_mutex_unlock(&lib->mutex); if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } - return rc; -} - + if (lib->callbacks.add_port != NULL + && rc == NXT_UNIT_OK + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &new_port->port); + } -void -nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_unit_find_remove_port(ctx, port_id, NULL); + return rc; } -void -nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port) +static int +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_impl_t *lib; + int res; + nxt_unit_port_t *port; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port = NULL; + process = NULL; pthread_mutex_lock(&lib->mutex); - process = NULL; - - nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); + res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { + lib->callbacks.remove_port(&lib->unit, port); + } + + if (nxt_fast_path(port != NULL)) { + if (port->in_fd != -1) { + close(port->in_fd); + } + + if (port->out_fd != -1) { + close(port->out_fd); + } + } + if (nxt_slow_path(process != NULL)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); + } + + if (nxt_fast_path(port != NULL)) { + free(port); } + + return res; } -static void -nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port, nxt_unit_process_t **process) +static int +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, + nxt_unit_port_t **r_port, nxt_unit_process_t **process) { - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(ctx, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port %d,%d not found", (int) port_id->pid, (int) port_id->id); - return; + return NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->port.in_fd, port->port.out_fd, port->port.data); - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - if (port->process != NULL) { nxt_queue_remove(&port->link); } @@ -4426,60 +4473,55 @@ nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } if (r_port != NULL) { - *r_port = port->port; + *r_port = &port->port; } - free(port); + return NXT_UNIT_OK; } -void -nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) +static void +nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 1); + process = nxt_unit_process_find(lib, pid, 1); if (nxt_slow_path(process == NULL)) { - nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); + nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); pthread_mutex_unlock(&lib->mutex); return; } - nxt_unit_remove_process(ctx, process); + nxt_unit_remove_process(lib, process); + + if (lib->callbacks.remove_pid != NULL) { + lib->callbacks.remove_pid(&lib->unit, pid); + } } static void -nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) +nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) { nxt_queue_t ports; - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_init(&ports); nxt_queue_add(&ports, &process->ports); nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_use(ctx, process, -1); - port->process = NULL; + nxt_unit_process_release(process); - /* Shortcut for default callback. */ - if (lib->callbacks.remove_port == nxt_unit_remove_port) { - nxt_queue_remove(&port->link); + /* To avoid unlink port. */ + port->process = NULL; - nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); - } + nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); } nxt_queue_loop; @@ -4489,15 +4531,27 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) nxt_queue_remove(&port->link); - lib->callbacks.remove_port(ctx, &port->port.id); + if (lib->callbacks.remove_port != NULL) { + lib->callbacks.remove_port(&lib->unit, &port->port); + } + + if (port->port.in_fd != -1) { + close(port->port.in_fd); + } + + if (port->port.out_fd != -1) { + close(port->port.out_fd); + } + + free(port); } nxt_queue_loop; - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } -void +static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { nxt_unit_impl_t *lib; @@ -4505,11 +4559,15 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->online = 0; + + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } static ssize_t -nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { int fd; @@ -4522,35 +4580,35 @@ nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - if (nxt_fast_path(port != NULL)) { + if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { fd = port->port.out_fd; - } else { - nxt_unit_warn(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; - } + pthread_mutex_unlock(&lib->mutex); - pthread_mutex_unlock(&lib->mutex); + } else { + pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(fd == -1)) { - if (port != NULL) { - nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", - (int) port_id->pid, (int) port_id->id); - } + nxt_unit_alert(ctx, "port_send: port %d,%d not found", + (int) port_id->pid, (int) port_id->id); - return -1; + return -NXT_UNIT_ERROR; } nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", (int) port_id->pid, (int) port_id->id, fd); - return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); + if (lib->callbacks.port_send == NULL) { + return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); + + } else { + return lib->callbacks.port_send(ctx, port_id, buf, buf_size, + oob, oob_size); + } } -ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, +static ssize_t +nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { ssize_t res; diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 596dd8b6..fa1fa843 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -130,15 +130,15 @@ struct nxt_unit_callbacks_s { int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); /* Remove previously added port. Optional. */ - void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); + void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); /* Remove all data associated with process pid including ports. Optional. */ - void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid); + void (*remove_pid)(nxt_unit_t *, pid_t pid); /* Gracefully quit the application. Optional. */ void (*quit)(nxt_unit_ctx_t *); - /* Shared memory release acknowledgement. */ + /* Shared memory release acknowledgement. Optional. */ void (*shm_ack_handler)(nxt_unit_ctx_t *); /* Send data and control to process pid using port id. Optional. */ @@ -149,7 +149,6 @@ struct nxt_unit_callbacks_s { /* Receive data on port id. Optional. */ ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); - }; @@ -165,6 +164,7 @@ struct nxt_unit_init_s { nxt_unit_port_t ready_port; uint32_t ready_stream; + nxt_unit_port_t router_port; nxt_unit_port_t read_port; int log_fd; }; @@ -222,45 +222,9 @@ void nxt_unit_done(nxt_unit_ctx_t *); */ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); -/* Free unused context. It is not required to free main context. */ -void nxt_unit_ctx_free(nxt_unit_ctx_t *); - /* Initialize port_id, calculate hash. */ void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); -/* - * Create extra incoming port, perform all required actions to propogate - * the port to Unit server. Fills structure referenced by port_id with - * current pid and new port id. - */ -int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id); - -/* Default 'add_port' implementation. */ -int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); - -/* Find previously added port. */ -nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *, - nxt_unit_port_id_t *port_id); - -/* Find, fill output 'port' and remove port from storage. */ -void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *port); - -/* Default 'remove_port' implementation. */ -void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); - -/* Default 'remove_pid' implementation. */ -void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid); - -/* Default 'quit' implementation. */ -void nxt_unit_quit(nxt_unit_ctx_t *); - -/* Default 'port_send' implementation. */ -ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd, - const void *buf, size_t buf_size, - const void *oob, size_t oob_size); - /* Default 'port_recv' implementation. */ ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, void *oob, size_t oob_size); |