diff options
-rw-r--r-- | go/nxt_cgo_lib.c | 12 | ||||
-rw-r--r-- | src/nxt_unit.c | 678 | ||||
-rw-r--r-- | src/nxt_unit.h | 13 |
3 files changed, 346 insertions, 357 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c index 1bb38f3c..937996b0 100644 --- a/go/nxt_cgo_lib.c +++ b/go/nxt_cgo_lib.c @@ -15,9 +15,9 @@ static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length); static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port); -static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, +static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, +static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx); @@ -123,19 +123,19 @@ nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) static ssize_t -nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - return nxt_go_port_send(port_id->pid, port_id->id, + return nxt_go_port_send(port->id.pid, port->id.id, (void *) buf, buf_size, (void *) oob, oob_size); } static ssize_t -nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size) { - return nxt_go_port_recv(port_id->pid, port_id->id, + return nxt_go_port_recv(port->id.pid, port->id.id, buf, buf_size, oob, oob_size); } diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 8c964c7a..ddfd9c80 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -70,7 +70,7 @@ static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); -static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); @@ -84,17 +84,16 @@ static nxt_unit_mmap_buf_t *nxt_unit_request_preread( static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int *n, int min_n); -static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); + nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); -static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + nxt_unit_port_t *port, int n); +static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, + nxt_unit_port_t *port, uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); @@ -121,34 +120,36 @@ static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); -static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, int *fd); +static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); -static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd); +static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port); -static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); -static int nxt_unit_remove_port(nxt_unit_impl_t *lib, +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); +nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port); +static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); -static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, - nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, - nxt_unit_process_t **process); static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, + nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, +static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); -static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, +static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, @@ -166,7 +167,6 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; - nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_process_t *process; @@ -247,8 +247,7 @@ struct nxt_unit_ctx_impl_s { pthread_mutex_t mutex; - nxt_unit_port_id_t read_port_id; - int read_port_fd; + nxt_unit_port_t *read_port; nxt_queue_link_t link; @@ -291,7 +290,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t router_port_id; + nxt_unit_port_t *router_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -306,6 +305,8 @@ struct nxt_unit_impl_s { struct nxt_unit_port_impl_s { nxt_unit_port_t port; + nxt_atomic_t use_count; + nxt_queue_link_t link; nxt_unit_process_t *process; }; @@ -395,26 +396,23 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + ctx = &lib->main_ctx.ctx; - rc = nxt_unit_add_port(ctx, &router_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->router_port = nxt_unit_add_port(ctx, &router_port); + if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - lib->router_port_id = router_port.id; - - rc = nxt_unit_add_port(ctx, &read_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); + if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); goto fail; } - lib->main_ctx.read_port_id = read_port.id; - rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); @@ -428,7 +426,7 @@ nxt_unit_init(nxt_unit_init_t *init) fail: - free(lib); + nxt_unit_ctx_release(&lib->main_ctx); return NULL; } @@ -471,6 +469,7 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); lib->use_count = 0; + lib->router_port = NULL; rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -485,10 +484,6 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } - if (cb->port_recv == NULL) { - cb->port_recv = nxt_unit_port_recv_default; - } - return lib; fail: @@ -539,7 +534,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; - ctx_impl->read_port_fd = -1; + ctx_impl->read_port = NULL; ctx_impl->requests.slot = 0; return NXT_UNIT_OK; @@ -597,6 +592,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) pthread_mutex_destroy(&lib->mutex); + if (nxt_fast_path(lib->router_port != NULL)) { + nxt_unit_port_release(lib->router_port); + } + free(lib); } } @@ -751,7 +750,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, void *buf, size_t buf_size, void *oob, size_t oob_size) { int rc; @@ -917,7 +916,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { @@ -960,7 +959,14 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->fd = -1; - return nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port); + if (nxt_slow_path(port == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_port_release(port); + + return NXT_UNIT_OK; } @@ -968,6 +974,8 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; @@ -996,10 +1004,27 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); + + pthread_mutex_unlock(&lib->mutex); + + if (nxt_slow_path(port == NULL)) { + nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", + recv_msg->stream, + (int) recv_msg->pid, (int) recv_msg->reply_port); + + return NXT_UNIT_ERROR; + } + req = &req_impl->req; - nxt_unit_port_id_init(&req->response_port, recv_msg->pid, - recv_msg->reply_port); + req->response_port = port; req->request = recv_msg->start; @@ -1051,8 +1076,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->callbacks.request_handler(req); return NXT_UNIT_OK; @@ -1275,6 +1298,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req_impl->process = NULL; } + if (req->response_port != NULL) { + nxt_unit_port_release(req->response_port); + + req->response_port = NULL; + } + pthread_mutex_lock(&ctx_impl->mutex); nxt_queue_remove(&req_impl->link); @@ -1793,7 +1822,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_fast_path(rc == NXT_UNIT_OK)) { req->response = NULL; req->response_buf = NULL; @@ -1846,8 +1875,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, size, mmap_buf, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, + size, size, mmap_buf, NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -2035,7 +2064,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -2050,17 +2079,15 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) static void nxt_unit_buf_send_done(nxt_unit_buf_t *buf) { - int rc; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_t *req; - nxt_unit_request_info_impl_t *req_impl; + int rc; + nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_request_info_t *req; mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); req = mmap_buf->req; - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { nxt_unit_mmap_buf_free(mmap_buf); @@ -2073,7 +2100,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) static int -nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last) { struct { @@ -2081,22 +2108,24 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - int rc; - u_char *last_used, *first_free; - ssize_t res; - nxt_chunk_id_t first_free_chunk; - nxt_unit_buf_t *buf; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; + int rc; + u_char *last_used, *first_free; + ssize_t res; + nxt_chunk_id_t first_free_chunk; + nxt_unit_buf_t *buf; + nxt_unit_impl_t *lib; + nxt_port_mmap_header_t *hdr; + nxt_unit_request_info_impl_t *req_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); buf = &mmap_buf->buf; hdr = mmap_buf->hdr; m.mmap_msg.size = buf->free - buf->start; - m.msg.stream = stream; + m.msg.stream = req_impl->stream; m.msg.pid = lib->pid; m.msg.reply_port = 0; m.msg.type = _NXT_PORT_MSG_DATA; @@ -2113,13 +2142,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + req_impl->stream, (int) m.mmap_msg.mmap_id, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; @@ -2149,7 +2178,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, (int) m.mmap_msg.chunk_id - (int) first_free_chunk); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", + nxt_unit_debug(req->ctx, "process %d allocated_chunks %d", mmap_buf->process->pid, (int) mmap_buf->process->outgoing.allocated_chunks); @@ -2157,19 +2186,21 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, if (nxt_slow_path(mmap_buf->plain_ptr == NULL || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) { - nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" - ": no space reserved for message header", stream); + nxt_unit_alert(req->ctx, + "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", + req_impl->stream); goto free_buf; } memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); - nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", + req_impl->stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = nxt_unit_port_send(ctx, &mmap_buf->port_id, + res = nxt_unit_port_send(req->ctx, req->response_port, buf->start - sizeof(m.msg), m.mmap_msg.size + sizeof(m.msg), NULL, 0); @@ -2337,7 +2368,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, sent = 0; if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { - nxt_unit_req_warn(req, "write: response not initialized yet"); + nxt_unit_req_alert(req, "write: response not initialized yet"); return -NXT_UNIT_ERROR; } @@ -2369,8 +2400,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, min_part_size = nxt_min(min_size, part_size); min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, part_size, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; @@ -2385,7 +2415,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; } @@ -2415,8 +2445,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_alert(req, "write: response not initialized yet"); + + return NXT_UNIT_ERROR; + } + /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { /* Enable content in headers buf. */ rc = nxt_unit_response_add_content(req, "", 0); @@ -2463,8 +2499,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, buf_size, buf_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2486,7 +2521,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2744,7 +2779,7 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) nxt_unit_port_send(req->ctx, &req->response_port, + (void) nxt_unit_port_send(req->ctx, req->response_port, &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); @@ -2765,17 +2800,14 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size, alloc_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_unit_mmap_buf_t mmap_buf; - nxt_websocket_header_t *wh; - nxt_unit_request_info_impl_t *req_impl; - char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; - - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size, alloc_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; payload_len = 0; @@ -2786,8 +2818,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, buf_size = 10 + payload_len; alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2821,8 +2852,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, if (l > 0) { if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2831,8 +2861,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2845,8 +2874,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } if (buf->free > buf->start) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); } return rc; @@ -2919,17 +2947,26 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * -nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) +nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; nxt_unit_impl_t *lib; + nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + process = nxt_unit_port_process(port); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed", + (int) port->id.pid, (int) port->id.id); + + return NULL; + } + pthread_mutex_lock(&process->outgoing.mutex); retry: @@ -2941,7 +2978,7 @@ retry: for (mm = process->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { continue; } @@ -3000,7 +3037,7 @@ retry: /* Notify router about OOSM condition. */ - res = nxt_unit_send_oosm(ctx, port_id); + res = nxt_unit_send_oosm(ctx, port); if (nxt_slow_path(res != NXT_UNIT_OK)) { return NULL; } @@ -3026,7 +3063,7 @@ retry: } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); + hdr = nxt_unit_new_mmap(ctx, port, *n); unlock: @@ -3043,7 +3080,7 @@ unlock: static int -nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { ssize_t res; nxt_port_msg_t msg; @@ -3061,7 +3098,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3163,21 +3200,29 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) static nxt_port_mmap_header_t * -nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, int n) +nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; + nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; - lib = process->lib; + process = nxt_unit_port_process(port); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed", + (int) port->id.pid, (int) port->id.id); + + return NULL; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); + nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); return NULL; } @@ -3255,7 +3300,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, hdr->id = process->outgoing.size - 1; hdr->src_pid = lib->pid; hdr->dst_pid = process->pid; - hdr->sent_over = port_id->id; + hdr->sent_over = port->id.id; /* Mark first n chunk(s) as busy */ for (i = 0; i < n; i++) { @@ -3268,7 +3313,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, pthread_mutex_unlock(&process->outgoing.mutex); - rc = nxt_unit_send_mmap(ctx, port_id, fd); + rc = nxt_unit_send_mmap(ctx, port, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { munmap(mem, PORT_MMAP_SIZE); hdr = NULL; @@ -3295,7 +3340,7 @@ remove_fail: static int -nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) +nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; nxt_port_msg_t msg; @@ -3339,7 +3384,7 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), + res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; @@ -3350,8 +3395,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int -nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, +nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { int nchunks, min_nchunks; @@ -3376,8 +3421,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; + mmap_buf->process = nxt_unit_port_process(port); nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -3388,7 +3432,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); + hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { mmap_buf->hdr = NULL; @@ -3407,8 +3451,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; + mmap_buf->process = nxt_unit_port_process(port); mmap_buf->free_ptr = NULL; mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -3770,15 +3813,12 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) { - ssize_t res; - nxt_port_msg_t msg; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t port_id; + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_port_id_init(&port_id, pid, 0); - msg.stream = 0; msg.pid = lib->pid; msg.reply_port = 0; @@ -3789,7 +3829,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3893,7 +3933,6 @@ static nxt_unit_process_t * nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; nxt_unit_process_lhq_pid(&lhq, &pid); @@ -3906,13 +3945,11 @@ nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) } if (rc == NXT_OK) { - process = lhq.value; - if (!remove) { - nxt_unit_process_use(process); + nxt_unit_process_use(lhq.value); } - return process; + return lhq.value; } return NULL; @@ -3990,7 +4027,7 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, + rc = nxt_unit_process_msg(ctx, rbuf->buf, rbuf->size, rbuf->oob, sizeof(rbuf->oob)); @@ -4013,25 +4050,15 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - if (ctx_impl->read_port_fd != -1) { - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); - - } else { - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); - } + rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); } @@ -4049,52 +4076,49 @@ nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc, fd; + int rc; nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; + nxt_unit_port_t *port; nxt_unit_ctx_impl_t *new_ctx; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); if (nxt_slow_path(new_ctx == NULL)) { - nxt_unit_warn(ctx, "failed to allocate context"); + nxt_unit_alert(ctx, "failed to allocate context"); return NULL; } - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + port = nxt_unit_create_port(ctx); + if (nxt_slow_path(port == NULL)) { free(new_ctx); return NULL; } - rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); + rc = nxt_unit_send_port(ctx, lib->router_port, port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_remove_port(lib, &new_port_id); - - close(fd); - - free(new_ctx); - - return NULL; + goto fail; } - close(fd); - rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_remove_port(lib, &new_port_id); - - free(new_ctx); - - return NULL; + goto fail; } - new_ctx->read_port_id = new_port_id; + new_ctx->read_port = port; return &new_ctx->ctx; + +fail: + + nxt_unit_remove_port(lib, &port->id); + nxt_unit_port_release(port); + + free(new_ctx); + + return NULL; } @@ -4144,6 +4168,10 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_remove(&ctx_impl->link); + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_port_release(ctx_impl->read_port); + } + if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } @@ -4174,12 +4202,12 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -static int -nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) +static nxt_unit_port_t * +nxt_unit_create_port(nxt_unit_ctx_t *ctx) { int rc, port_sockets[2]; nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_unit_process_t *process; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4189,7 +4217,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", strerror(errno), errno); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", @@ -4204,39 +4232,34 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) close(port_sockets[0]); close(port_sockets[1]); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); new_port.in_fd = port_sockets[0]; - new_port.out_fd = -1; + new_port.out_fd = port_sockets[1]; new_port.data = NULL; pthread_mutex_unlock(&lib->mutex); nxt_unit_process_release(process); - rc = nxt_unit_add_port(ctx, &new_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_warn(ctx, "create_port: add_port() failed"); + port = nxt_unit_add_port(ctx, &new_port); + if (nxt_slow_path(port == NULL)) { + nxt_unit_alert(ctx, "create_port: add_port() failed"); close(port_sockets[0]); close(port_sockets[1]); - - return rc; } - *port_id = new_port.id; - *fd = port_sockets[1]; - - return rc; + return port; } static int -nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd) +nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port) { ssize_t res; nxt_unit_impl_t *lib; @@ -4263,8 +4286,8 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, m.msg.mf = 0; m.msg.tracking = 0; - m.new_port.id = new_port->id; - m.new_port.pid = new_port->pid; + m.new_port.id = port->id.id; + m.new_port.pid = port->id.pid; m.new_port.type = NXT_PROCESS_APP; m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; @@ -4284,7 +4307,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); @@ -4292,13 +4315,67 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, } -static int +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + nxt_atomic_fetch_add(&port_impl->use_count, 1); +} + + +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) +{ + long c; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + c = nxt_atomic_fetch_add(&port_impl->use_count, -1); + + if (c == 1) { + nxt_unit_debug(NULL, "destroy port %d,%d", + (int) port->id.pid, (int) port->id.id); + + nxt_unit_process_release(port_impl->process); + + if (port->in_fd != -1) { + close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + + port->out_fd = -1; + } + + free(port_impl); + } +} + + +nxt_inline nxt_unit_process_t * +nxt_unit_port_process(nxt_unit_port_t *port) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + return port_impl->process; +} + + +static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port, *old_port; + nxt_unit_port_impl_t *new_port; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4311,13 +4388,13 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); - if (old_port->port.data == NULL) { - old_port->port.data = port->data; + if (old_port->data == NULL) { + old_port->data = port->data; port->data = NULL; } - if (old_port->port.in_fd == -1) { - old_port->port.in_fd = port->in_fd; + if (old_port->in_fd == -1) { + old_port->in_fd = port->in_fd; port->in_fd = -1; } @@ -4326,8 +4403,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->in_fd = -1; } - if (old_port->port.out_fd == -1) { - old_port->port.out_fd = port->out_fd; + if (old_port->out_fd == -1) { + old_port->out_fd = port->out_fd; port->out_fd = -1; } @@ -4336,26 +4413,27 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->out_fd = -1; } - *port = old_port->port; + *port = *old_port; pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL && (port->in_fd != -1 || port->out_fd != -1)) { - lib->callbacks.add_port(ctx, &old_port->port); + lib->callbacks.add_port(ctx, old_port); } - return NXT_UNIT_OK; + return old_port; } + new_port = NULL; + nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", port->id.pid, port->id.id, port->in_fd, port->out_fd); process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { - rc = NXT_UNIT_ERROR; goto unlock; } @@ -4365,7 +4443,6 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { - rc = NXT_UNIT_ERROR; goto unlock; } @@ -4376,107 +4453,85 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", port->id.pid, port->id.id); + free(new_port); + + new_port = NULL; + goto unlock; } nxt_queue_insert_tail(&process->ports, &new_port->link); - rc = NXT_UNIT_OK; - + new_port->use_count = 2; new_port->process = process; + process = NULL; + unlock: pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { + if (nxt_slow_path(process != NULL)) { nxt_unit_process_release(process); } if (lib->callbacks.add_port != NULL - && rc == NXT_UNIT_OK + && new_port != NULL && (port->in_fd != -1 || port->out_fd != -1)) { lib->callbacks.add_port(ctx, &new_port->port); } - return rc; + return &new_port->port; } -static int +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - int res; - nxt_unit_port_t *port; - nxt_unit_process_t *process; - - port = NULL; - process = NULL; + nxt_unit_port_t *port; + nxt_unit_port_impl_t *port_impl; pthread_mutex_lock(&lib->mutex); - res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); - - pthread_mutex_unlock(&lib->mutex); - - if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { - lib->callbacks.remove_port(&lib->unit, port); - } + port = nxt_unit_remove_port_unsafe(lib, port_id); if (nxt_fast_path(port != NULL)) { - if (port->in_fd != -1) { - close(port->in_fd); - } + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - if (port->out_fd != -1) { - close(port->out_fd); - } + nxt_queue_remove(&port_impl->link); } - if (nxt_slow_path(process != NULL)) { - nxt_unit_process_release(process); + pthread_mutex_unlock(&lib->mutex); + + if (lib->callbacks.remove_port != NULL && port != NULL) { + lib->callbacks.remove_port(&lib->unit, port); } if (nxt_fast_path(port != NULL)) { - free(port); + nxt_unit_port_release(port); } - - return res; } -static int -nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, - nxt_unit_port_t **r_port, nxt_unit_process_t **process) +static nxt_unit_port_t * +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_port_impl_t *port; + nxt_unit_port_t *port; port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { nxt_unit_debug(NULL, "remove_port: port %d,%d not found", (int) port_id->pid, (int) port_id->id); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, - port->port.in_fd, port->port.out_fd, port->port.data); - - if (port->process != NULL) { - nxt_queue_remove(&port->link); - } + port->in_fd, port->out_fd, port->data); - if (process != NULL) { - *process = port->process; - } - - if (r_port != NULL) { - *r_port = &port->port; - } - - return NXT_UNIT_OK; + return port; } @@ -4516,12 +4571,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_release(process); - - /* To avoid unlink port. */ - port->process = NULL; - - nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); + nxt_unit_remove_port_unsafe(lib, &port->port.id); } nxt_queue_loop; @@ -4535,15 +4585,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) lib->callbacks.remove_port(&lib->unit, &port->port); } - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - - free(port); + nxt_unit_port_release(&port->port); } nxt_queue_loop; @@ -4567,43 +4609,23 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) static ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - int fd; - nxt_unit_impl_t *lib; - nxt_unit_port_impl_t *port; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - - if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { - fd = port->port.out_fd; - - pthread_mutex_unlock(&lib->mutex); - - } else { - pthread_mutex_unlock(&lib->mutex); - - nxt_unit_alert(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - - return -NXT_UNIT_ERROR; - } + nxt_unit_impl_t *lib; - nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", - (int) port_id->pid, (int) port_id->id, fd); + nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", + (int) port->id.pid, (int) port->id.id, port->out_fd); - if (lib->callbacks.port_send == NULL) { - return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - } else { - return lib->callbacks.port_send(ctx, port_id, buf, buf_size, + if (lib->callbacks.port_send != NULL) { + return lib->callbacks.port_send(ctx, port, buf, buf_size, oob, oob_size); } + + return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); } @@ -4652,56 +4674,22 @@ retry: static ssize_t -nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int fd; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *port; + int fd; + ssize_t res; + struct iovec iov[1]; + struct msghdr msg; + nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - - if (nxt_fast_path(port != NULL)) { - fd = port->port.in_fd; - - } else { - nxt_unit_debug(ctx, "port_recv: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; + if (lib->callbacks.port_recv != NULL) { + return lib->callbacks.port_recv(ctx, port, + buf, buf_size, oob, oob_size); } - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(fd == -1)) { - return -1; - } - - nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", - (int) port_id->pid, (int) port_id->id, fd); - - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { - ctx_impl->read_port_fd = fd; - } - - return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); -} - - -ssize_t -nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size) -{ - ssize_t res; - struct iovec iov[1]; - struct msghdr msg; - iov[0].iov_base = buf; iov[0].iov_len = buf_size; @@ -4713,6 +4701,8 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, msg.msg_control = oob; msg.msg_controllen = oob_size; + fd = port->in_fd; + retry: res = recvmsg(fd, &msg, 0); @@ -4813,7 +4803,7 @@ nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) } -static nxt_unit_port_impl_t * +static nxt_unit_port_t * nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove) { @@ -4833,6 +4823,10 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, switch (res) { case NXT_OK: + if (!remove) { + nxt_unit_port_use(lhq.value); + } + return lhq.value; default: diff --git a/src/nxt_unit.h b/src/nxt_unit.h index fa1fa843..6723026f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -91,8 +91,7 @@ struct nxt_unit_request_info_s { nxt_unit_t *unit; nxt_unit_ctx_t *ctx; - nxt_unit_port_id_t request_port; - nxt_unit_port_id_t response_port; + nxt_unit_port_t *response_port; nxt_unit_request_t *request; nxt_unit_buf_t *request_buf; @@ -142,12 +141,12 @@ struct nxt_unit_callbacks_s { void (*shm_ack_handler)(nxt_unit_ctx_t *); /* Send data and control to process pid using port id. Optional. */ - ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); /* Receive data on port id. Optional. */ - ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); }; @@ -195,7 +194,7 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); * from port socket should be initially processed by unit. This function * may invoke other application-defined callback for message processing. */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, +int nxt_unit_process_msg(nxt_unit_ctx_t *, void *buf, size_t buf_size, void *oob, size_t oob_size); /* @@ -225,10 +224,6 @@ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); /* Initialize port_id, calculate hash. */ void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); -/* Default 'port_recv' implementation. */ -ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size); - /* Calculates hash for given field name. */ uint16_t nxt_unit_field_hash(const char* name, size_t name_length); |