diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 580 |
1 files changed, 319 insertions, 261 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 89998e3f..8c964c7a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,16 +38,19 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); +nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream); + nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *log_fd, uint32_t *stream, uint32_t *shm_limit); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -96,8 +99,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); -static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, int i); +nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); +nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, uint32_t id); @@ -110,28 +113,35 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, nxt_unit_port_id_t *new_port, int fd); -static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static int nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, nxt_unit_process_t **process); -static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, +static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); +static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); - -static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, +static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); @@ -233,6 +243,8 @@ struct nxt_unit_read_buf_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + nxt_atomic_t use_count; + pthread_mutex_t mutex; nxt_unit_port_id_t read_port_id; @@ -269,6 +281,8 @@ struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; + nxt_atomic_t use_count; + uint32_t request_data_size; uint32_t shm_mmap_limit; @@ -277,7 +291,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t ready_port_id; + nxt_unit_port_id_t router_port_id; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -341,7 +355,7 @@ nxt_unit_init(nxt_unit_init_t *init) uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port; lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { @@ -354,17 +368,20 @@ nxt_unit_init(nxt_unit_init_t *init) { ready_port = init->ready_port; ready_stream = init->ready_stream; + router_port = init->router_port; read_port = init->read_port; lib->log_fd = init->log_fd; nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, ready_port.id.id); + nxt_unit_port_id_init(&router_port.id, router_port.id.pid, + router_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); } else { - rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream, &shm_limit); + rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &lib->log_fd, &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -380,14 +397,16 @@ nxt_unit_init(nxt_unit_init_t *init) lib->pid = read_port.id.pid; ctx = &lib->main_ctx.ctx; - rc = lib->callbacks.add_port(ctx, &ready_port); - if (rc != NXT_UNIT_OK) { - nxt_unit_alert(NULL, "failed to add ready_port"); + rc = nxt_unit_add_port(ctx, &router_port); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - rc = lib->callbacks.add_port(ctx, &read_port); + lib->router_port_id = router_port.id; + + rc = nxt_unit_add_port(ctx, &read_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to add read_port"); @@ -395,15 +414,16 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->main_ctx.read_port_id = read_port.id; - lib->ready_port_id = ready_port.id; - rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); goto fail; } + close(ready_port.out_fd); + return ctx; fail: @@ -450,6 +470,8 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); + lib->use_count = 0; + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; @@ -463,26 +485,6 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } - if (cb->add_port == NULL) { - cb->add_port = nxt_unit_add_port; - } - - if (cb->remove_port == NULL) { - cb->remove_port = nxt_unit_remove_port; - } - - if (cb->remove_pid == NULL) { - cb->remove_pid = nxt_unit_remove_pid; - } - - if (cb->quit == NULL) { - cb->quit = nxt_unit_quit; - } - - if (cb->port_send == NULL) { - cb->port_send = nxt_unit_port_send_default; - } - if (cb->port_recv == NULL) { cb->port_recv = nxt_unit_port_recv_default; } @@ -506,8 +508,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; - nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - rc = pthread_mutex_init(&ctx_impl->mutex, NULL); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); @@ -515,6 +515,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, return NXT_UNIT_ERROR; } + nxt_unit_lib_use(lib); + + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + + ctx_impl->use_count = 1; + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); @@ -541,6 +547,62 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void +nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); +} + + +nxt_inline void +nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +{ + long c; + + c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); + + if (c == 1) { + nxt_unit_ctx_free(ctx_impl); + } +} + + +nxt_inline void +nxt_unit_lib_use(nxt_unit_impl_t *lib) +{ + nxt_atomic_fetch_add(&lib->use_count, 1); +} + + +nxt_inline void +nxt_unit_lib_release(nxt_unit_impl_t *lib) +{ + long c; + nxt_unit_process_t *process; + + c = nxt_atomic_fetch_add(&lib->use_count, -1); + + if (c == 1) { + for ( ;; ) { + pthread_mutex_lock(&lib->mutex); + + process = nxt_unit_process_pop_first(lib); + if (process == NULL) { + pthread_mutex_unlock(&lib->mutex); + + break; + } + + nxt_unit_remove_process(lib, process); + } + + pthread_mutex_destroy(&lib->mutex); + + free(lib); + } +} + + +nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf) { @@ -585,15 +647,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int -nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit) +nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit) { int rc; - int ready_fd, read_fd; + int ready_fd, router_fd, read_fd; char *unit_init, *version_end; long version_length; - int64_t ready_pid, read_pid; - uint32_t ready_stream, ready_id, read_id; + int64_t ready_pid, router_pid, read_pid; + uint32_t ready_stream, router_id, ready_id, read_id; unit_init = getenv(NXT_UNIT_INIT_ENV); if (nxt_slow_path(unit_init == NULL)) { @@ -621,13 +684,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, + &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 9)) { + if (nxt_slow_path(rc != 12)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -639,6 +704,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, ready_port->out_fd = ready_fd; ready_port->data = NULL; + nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); + + router_port->in_fd = -1; + router_port->out_fd = router_fd; + router_port->data = NULL; + nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); read_port->in_fd = read_fd; @@ -652,8 +723,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) { ssize_t res; nxt_port_msg_t msg; @@ -671,7 +741,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -684,13 +754,12 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int rc; - pid_t pid; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -749,14 +818,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } } - cb = &lib->callbacks; - switch (port_msg->type) { case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); - cb->quit(ctx); + nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -812,7 +879,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", port_msg->stream, (int) pid); - cb->remove_pid(ctx, pid); + nxt_unit_remove_pid(lib, pid); rc = NXT_UNIT_OK; break; @@ -839,7 +906,7 @@ fail: } if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + nxt_unit_process_release(recv_msg.process); } return rc; @@ -850,7 +917,6 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; - nxt_unit_impl_t *lib; nxt_unit_port_t new_port; nxt_port_msg_new_port_t *new_port_msg; @@ -894,9 +960,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->fd = -1; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return lib->callbacks.add_port(ctx, &new_port); + return nxt_unit_add_port(ctx, &new_port); } @@ -1206,7 +1270,7 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) * existence. */ if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); + nxt_unit_process_release(req_impl->process); req_impl->process = NULL; } @@ -1808,7 +1872,7 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) pthread_mutex_lock(&lib->mutex); - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); + recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -1869,15 +1933,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) } -typedef struct { - size_t len; - const char *str; -} nxt_unit_str_t; - - -#define nxt_unit_str(str) { nxt_length(str), str } - - int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) { @@ -2064,8 +2119,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2114,10 +2169,10 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, - buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2689,8 +2744,8 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) lib->callbacks.port_send(req->ctx, &req->response_port, - &msg, sizeof(msg), NULL, 0); + (void) nxt_unit_port_send(req->ctx, &req->response_port, + &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); } @@ -3006,7 +3061,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3284,8 +3339,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3382,7 +3437,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 0); + process = nxt_unit_process_find(lib, pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -3444,7 +3499,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) fail: - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); return rc; } @@ -3462,15 +3517,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) } -static void -nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) +nxt_inline void +nxt_unit_process_use(nxt_unit_process_t *process) +{ + nxt_atomic_fetch_add(&process->use_count, 1); +} + + +nxt_inline void +nxt_unit_process_release(nxt_unit_process_t *process) { long c; - c = nxt_atomic_fetch_add(&process->use_count, i); + c = nxt_atomic_fetch_add(&process->use_count, -1); - if (i < 0 && c == -i) { - nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); + if (c == 1) { + nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); nxt_unit_mmaps_destroy(&process->incoming); nxt_unit_mmaps_destroy(&process->outgoing); @@ -3727,7 +3789,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3772,26 +3834,23 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) static nxt_unit_process_t * -nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) +nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { process = lhq.value; - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } process = malloc(sizeof(nxt_unit_process_t)); if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); + nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); return NULL; } @@ -3815,7 +3874,7 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; default: - nxt_unit_warn(ctx, "process %d insert failed", (int) pid); + nxt_unit_alert(NULL, "process %d insert failed", (int) pid); pthread_mutex_destroy(&process->outgoing.mutex); pthread_mutex_destroy(&process->incoming.mutex); @@ -3824,22 +3883,19 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; } - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } static nxt_unit_process_t * -nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) +nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (remove) { @@ -3853,7 +3909,7 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) process = lhq.value; if (!remove) { - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); } return process; @@ -3873,8 +3929,13 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_unit_ctx_use(ctx_impl); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; @@ -3887,6 +3948,8 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) } } + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3900,6 +3963,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx_impl); + pthread_mutex_lock(&ctx_impl->mutex); if (ctx_impl->pending_read_head != NULL) { @@ -3915,6 +3980,9 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); if (nxt_slow_path(rbuf == NULL)) { + + nxt_unit_ctx_release(ctx_impl); + return NXT_UNIT_ERROR; } @@ -3936,6 +4004,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3968,34 +4038,11 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) void nxt_unit_done(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { - - nxt_unit_ctx_free(&ctx_impl->ctx); - - } nxt_queue_loop; - - for ( ;; ) { - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_pop_first(lib); - if (process == NULL) { - pthread_mutex_unlock(&lib->mutex); - - break; - } - - nxt_unit_remove_process(ctx, process); - } - - pthread_mutex_destroy(&lib->mutex); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - free(lib); + nxt_unit_ctx_release(ctx_impl); } @@ -4023,9 +4070,9 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } - rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); + rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); close(fd); @@ -4038,7 +4085,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); free(new_ctx); @@ -4051,17 +4098,15 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) } -void -nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) +static void +nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); nxt_queue_each(req_impl, &ctx_impl->active_req, nxt_unit_request_info_impl_t, link) @@ -4102,6 +4147,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } + + nxt_unit_lib_release(lib); } @@ -4127,36 +4174,6 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -int -nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id) -{ - int rc, fd; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; - } - - rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); - - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - *port_id = new_port_id; - - } else { - lib->callbacks.remove_port(ctx, &new_port_id); - } - - close(fd); - - return rc; -} - - static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) { @@ -4180,7 +4197,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, lib->pid); + process = nxt_unit_process_get(lib, lib->pid); if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); @@ -4198,9 +4215,9 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); - rc = lib->callbacks.add_port(ctx, &new_port); + rc = nxt_unit_add_port(ctx, &new_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_warn(ctx, "create_port: add_port() failed"); @@ -4269,14 +4286,13 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); - return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; + return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } -int +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -4295,18 +4311,41 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); + if (old_port->port.data == NULL) { + old_port->port.data = port->data; + port->data = NULL; + } + + if (old_port->port.in_fd == -1) { + old_port->port.in_fd = port->in_fd; + port->in_fd = -1; + } + if (port->in_fd != -1) { close(port->in_fd); port->in_fd = -1; } + if (old_port->port.out_fd == -1) { + old_port->port.out_fd = port->out_fd; + port->out_fd = -1; + } + if (port->out_fd != -1) { close(port->out_fd); port->out_fd = -1; } + *port = old_port->port; + pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.add_port != NULL + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &old_port->port); + } + return NXT_UNIT_OK; } @@ -4314,7 +4353,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); - process = nxt_unit_process_get(ctx, port->id.pid); + process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { rc = NXT_UNIT_ERROR; goto unlock; @@ -4351,72 +4390,80 @@ unlock: pthread_mutex_unlock(&lib->mutex); if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } - return rc; -} - + if (lib->callbacks.add_port != NULL + && rc == NXT_UNIT_OK + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &new_port->port); + } -void -nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_unit_find_remove_port(ctx, port_id, NULL); + return rc; } -void -nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port) +static int +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_impl_t *lib; + int res; + nxt_unit_port_t *port; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port = NULL; + process = NULL; pthread_mutex_lock(&lib->mutex); - process = NULL; - - nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); + res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { + lib->callbacks.remove_port(&lib->unit, port); + } + + if (nxt_fast_path(port != NULL)) { + if (port->in_fd != -1) { + close(port->in_fd); + } + + if (port->out_fd != -1) { + close(port->out_fd); + } + } + if (nxt_slow_path(process != NULL)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); + } + + if (nxt_fast_path(port != NULL)) { + free(port); } + + return res; } -static void -nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port, nxt_unit_process_t **process) +static int +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, + nxt_unit_port_t **r_port, nxt_unit_process_t **process) { - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(ctx, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port %d,%d not found", (int) port_id->pid, (int) port_id->id); - return; + return NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->port.in_fd, port->port.out_fd, port->port.data); - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - if (port->process != NULL) { nxt_queue_remove(&port->link); } @@ -4426,60 +4473,55 @@ nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } if (r_port != NULL) { - *r_port = port->port; + *r_port = &port->port; } - free(port); + return NXT_UNIT_OK; } -void -nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) +static void +nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 1); + process = nxt_unit_process_find(lib, pid, 1); if (nxt_slow_path(process == NULL)) { - nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); + nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); pthread_mutex_unlock(&lib->mutex); return; } - nxt_unit_remove_process(ctx, process); + nxt_unit_remove_process(lib, process); + + if (lib->callbacks.remove_pid != NULL) { + lib->callbacks.remove_pid(&lib->unit, pid); + } } static void -nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) +nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) { nxt_queue_t ports; - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_init(&ports); nxt_queue_add(&ports, &process->ports); nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_use(ctx, process, -1); - port->process = NULL; + nxt_unit_process_release(process); - /* Shortcut for default callback. */ - if (lib->callbacks.remove_port == nxt_unit_remove_port) { - nxt_queue_remove(&port->link); + /* To avoid unlink port. */ + port->process = NULL; - nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); - } + nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); } nxt_queue_loop; @@ -4489,15 +4531,27 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) nxt_queue_remove(&port->link); - lib->callbacks.remove_port(ctx, &port->port.id); + if (lib->callbacks.remove_port != NULL) { + lib->callbacks.remove_port(&lib->unit, &port->port); + } + + if (port->port.in_fd != -1) { + close(port->port.in_fd); + } + + if (port->port.out_fd != -1) { + close(port->port.out_fd); + } + + free(port); } nxt_queue_loop; - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } -void +static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { nxt_unit_impl_t *lib; @@ -4505,11 +4559,15 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->online = 0; + + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } static ssize_t -nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { int fd; @@ -4522,35 +4580,35 @@ nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - if (nxt_fast_path(port != NULL)) { + if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { fd = port->port.out_fd; - } else { - nxt_unit_warn(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; - } + pthread_mutex_unlock(&lib->mutex); - pthread_mutex_unlock(&lib->mutex); + } else { + pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(fd == -1)) { - if (port != NULL) { - nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", - (int) port_id->pid, (int) port_id->id); - } + nxt_unit_alert(ctx, "port_send: port %d,%d not found", + (int) port_id->pid, (int) port_id->id); - return -1; + return -NXT_UNIT_ERROR; } nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", (int) port_id->pid, (int) port_id->id, fd); - return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); + if (lib->callbacks.port_send == NULL) { + return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); + + } else { + return lib->callbacks.port_send(ctx, port_id, buf, buf_size, + oob, oob_size); + } } -ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, +static ssize_t +nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { ssize_t res; |